Commit db516c61 authored by Johan Ström's avatar Johan Ström

Move connection/transport/mqtt/qpid to separate logging channels, allow...

Move connection/transport/mqtt/qpid to separate logging channels, allow individual levels to be set.

qpid & mqtt low-level libraries are capped to INFO, while agotransport and agoconnection have their separate levels set to TRACE.
Please see system.conf for more details.
parent 03cc5446
Pipeline #448 passed with stage
in 2 minutes and 43 seconds
......@@ -19,3 +19,28 @@ units=SI
log_method=syslog
# log levels: TRACE, DEBUG, INFO, WARNING, ERROR, FATAL
log_level=INFO
[loggers]
# The main log_level controls the overall logging level.
# It is then possible to limit different loggers (python) / channels (C++) to a higher level.
# For example, if log_level is set to INFO, nothing lower than INFO will be logged,
# even if a specific logger is set to TRACE.
# Names are shared between both python & C++ implementations (where applicable).
# The low-level qpid python library is very verbose, so do not log anything lower
# than INFO from that library:
qpid=INFO
# Same goes for low-level mqtt library, both on C++ and Python. Cap it to INFO.
mqtt=INFO
# The following are internal application logging in the AgoClient library
# transport is the abstraction of the MQTT/QPID connectivity, and only deals with simple messaging.
transport = TRACE
# connection is a higher-level interface between each application and the transport
connection = TRACE
# This is the "main" logger for C++, if no specific one is used. It is also the default for all applications.
app = TRACE
......@@ -233,7 +233,7 @@ class AgoApp:
self.log_handler.setFormatter(self.log_formatter)
root.addHandler(self.log_handler)
self.log = logging.getLogger(self.app_name)
self.log = logging.getLogger("app")
def setup_connection(self):
"""Create an AgoConnection instance, assigned to self.connection"""
......
......@@ -29,7 +29,7 @@ class AgoConnection:
"""The constructor."""
self.instance = instance
self.uuidmap_file = get_config_path('uuidmap/' + self.instance + '.json')
self.log = logging.getLogger('AgoConnection')
self.log = logging.getLogger('connection')
messaging = str(get_config_option("system", "messaging", "qpid"))
broker = str(get_config_option("system", "broker", "localhost"))
......
......@@ -14,9 +14,10 @@ from agoclient import agoproto
TOPIC_BASE = 'com.agocontrol'
PUBLISH_TOPIC = TOPIC_BASE + '/legacy'
class AgoMqttTransport(agoclient.agotransport.AgoTransport):
def __init__(self, client_id, broker, username, password):
self.log = logging.getLogger('AgoMqttTransport')
self.log = logging.getLogger('transport')
self.client_id = client_id
self.broker = broker
self.username = username
......@@ -44,7 +45,7 @@ class AgoMqttTransport(agoclient.agotransport.AgoTransport):
self.mqtt.on_connect = self._on_connect
self.mqtt.on_subscribe = self._on_subscribe
self.mqtt.on_message = self._on_message
self.mqtt.on_log = self._on_log
self.mqtt.enable_logger(logging.getLogger('mqtt')) # Ues custom logger for lowlevel mqtt
host = self.broker
port = 1883
......@@ -73,12 +74,7 @@ class AgoMqttTransport(agoclient.agotransport.AgoTransport):
except KeyboardInterrupt:
break
def _on_log(self, client, userdata, level, message):
if level == mqtt.MQTT_LOG_DEBUG:
# mqtt DEBUG is very verbose..
self.log.trace(message)
else:
self.log.log(mqtt.LOGGING_LEVEL[level], message)
return False
def _on_connect(self, client, userdata, flags, rc):
self.pending_subscribes = 2
......
......@@ -15,7 +15,7 @@ from agoclient import agoproto
class AgoQpidTransport(agoclient.agotransport.AgoTransport):
def __init__(self, broker, username, password):
self.log = logging.getLogger('AgoQpidTransport')
self.log = logging.getLogger('transport')
self.broker = broker
self.username = username
self.password = password
......@@ -24,10 +24,6 @@ class AgoQpidTransport(agoclient.agotransport.AgoTransport):
self.receiver = None
self.sender = None
# Forcibly cap QPID logging to INFO
root = logging.getLogger()
logging.getLogger('qpid').setLevel(max(root.level, logging.INFO))
def start(self):
self.connection = qpid.messaging.Connection(self.broker, username=self.username, password=self.password, reconnect=True)
try:
......
......@@ -23,6 +23,9 @@
namespace fs = ::boost::filesystem;
static AGO_LOGGER_ALIAS(connection, "connnection");
bool agocontrol::nameval(const std::string& in, std::string& name, std::string& value) {
std::string::size_type i = in.find("=");
if (i == std::string::npos) {
......@@ -185,7 +188,7 @@ void agocontrol::AgoConnection::run() {
}
if (!m.message.isObject() || !m.message.isMember("content")) {
AGO_ERROR() << "Invalid message: " << m.message;
AGOL_ERROR(connection) << "Invalid message: " << m.message;
continue;
}
......@@ -216,16 +219,16 @@ void agocontrol::AgoConnection::run() {
// Catch any non-updated applications HARD.
if(!commandResponse.empty() && !commandResponse.isMember("result") && !commandResponse.isMember("error")) {
AGO_ERROR() << "Application " << instance << " has not been updated properly and command handler returns non-valid responses.";
AGO_ERROR() << "Input: " << content;
AGO_ERROR() << "Output: " << content;
AGOL_ERROR(connection) << "Application " << instance << " has not been updated properly and command handler returns non-valid responses.";
AGOL_ERROR(connection) << "Input: " << content;
AGOL_ERROR(connection) << "Output: " << content;
commandResponse = responseError(RESPONSE_ERR_INTERNAL,
"Component "+instance+" has not been updated properly, please contact developers with logs");
}
}catch(const AgoCommandException& ex) {
commandResponse = ex.toResponse();
}catch(const std::exception &ex) {
AGO_ERROR() << "Unhandled exception in command handler:" << ex.what();
AGOL_ERROR(connection) << "Unhandled exception in command handler:" << ex.what();
commandResponse = responseError(RESPONSE_ERR_INTERNAL, "Unhandled exception in command handler");
}
......@@ -235,7 +238,7 @@ void agocontrol::AgoConnection::run() {
// to reply to "anonymous" requests not destined to any specific uuid
if (isOurDevice || (content["command"]=="inventory" && filterCommands==false)) {
if(m.replyFuction.empty())
AGO_WARNING() << "Attempted to send a reply to a incoming message which did not expect a reply: " << content;
AGOL_WARNING(connection) << "Attempted to send a reply to a incoming message which did not expect a reply: " << content;
else
m.replyFuction(commandResponse);
}
......@@ -247,7 +250,7 @@ void agocontrol::AgoConnection::run() {
}
}
}
AGO_TRACE() << "Leaving run() message loop";
AGOL_TRACE(connection) << "Leaving run() message loop";
}
void agocontrol::AgoConnection::shutdown() {
......@@ -414,7 +417,7 @@ bool agocontrol::AgoConnection::loadUuidMap() {
if (uuidMap.type() == Json::ValueType::objectValue)
return true;
AGO_ERROR() << "Invalid contents in " << uuidMapFile;
AGOL_ERROR(connection) << "Invalid contents in " << uuidMapFile;
}
return false;
......@@ -526,7 +529,7 @@ int agocontrol::AgoConnection::isDeviceStale(const std::string& internalId)
}
else
{
AGO_WARNING() << "internalid '" << internalId << "' doesn't exist in deviceMap";
AGOL_WARNING(connection) << "internalid '" << internalId << "' doesn't exist in deviceMap";
return 0;
}
}
......@@ -547,10 +550,10 @@ Json::Value agocontrol::AgoConnection::getInventory() {
AgoResponse r = sendRequest(content);
if(r.isOk()) {
AGO_TRACE() << "Inventory obtained";
AGOL_TRACE(connection) << "Inventory obtained";
return r.getData();
}else{
AGO_WARNING() << "Failed to obtain inventory: " << r.response;
AGOL_WARNING(connection) << "Failed to obtain inventory: " << r.response;
}
// TODO: Some way to report error?
......@@ -567,20 +570,20 @@ std::string agocontrol::AgoConnection::getAgocontroller() {
Json::Value& devices(inventory["devices"]);
for (auto it = devices.begin(); it != devices.end(); it++) {
if ((*it)["devicetype"] == "agocontroller") {
AGO_DEBUG() << "Found Agocontroller: " << it.name();
AGOL_DEBUG(connection) << "Found Agocontroller: " << it.name();
agocontroller = it.name();
}
}
}
if (agocontroller == "" && retry) {
AGO_WARNING() << "Unable to resolve agocontroller, retrying";
AGOL_WARNING(connection) << "Unable to resolve agocontroller, retrying";
sleep(1);
}
}
if (agocontroller == "")
AGO_WARNING() << "Failed to resolve agocontroller, giving up";
AGOL_WARNING(connection) << "Failed to resolve agocontroller, giving up";
return agocontroller;
}
......
This diff is collapsed.
......@@ -14,6 +14,9 @@
namespace agotransport = agocontrol::transport;
//static AGO_LOGGER(qpid); // no logging from qpid C++ library
static AGO_LOGGER(transport);
/**
* Factory function for creating AgoQpidTransport via dlsym.
*/
......@@ -45,12 +48,12 @@ agotransport::AgoQpidTransport::AgoQpidTransport(const std::string &uri, const s
agotransport::AgoQpidTransport::~AgoQpidTransport() {
if(impl->receiver.isValid()) {
AGO_DEBUG() << "Closing notification receiver";
AGOL_DEBUG(transport) << "Closing notification receiver";
impl->receiver.close();
}
if(impl->session.isValid() && impl->connection.isValid()) {
AGO_DEBUG() << "Closing pending broker connection";
AGOL_DEBUG(transport) << "Closing pending broker connection";
// Not yet connected, break out of connection attempt
// TODO: This does not actually abort on old qpid
impl->connection.close();
......@@ -58,22 +61,22 @@ agotransport::AgoQpidTransport::~AgoQpidTransport() {
try {
if(impl->connection.isOpen()) {
AGO_DEBUG() << "Closing broker connection";
AGOL_DEBUG(transport) << "Closing broker connection";
impl->connection.close();
}
} catch(const std::exception& error) {
AGO_ERROR() << "Failed to close broker connection: " << error.what();
AGOL_ERROR(transport) << "Failed to close broker connection: " << error.what();
}
}
bool agotransport::AgoQpidTransport::start() {
try {
AGO_DEBUG() << "Opening QPid broker connection";
AGOL_DEBUG(transport) << "Opening QPid broker connection";
impl->connection.open();
impl->session = impl->connection.createSession();
impl->sender = impl->session.createSender("agocontrol; {create: always, node: {type: topic}}");
} catch(const std::exception& error) {
AGO_FATAL() << "Failed to connect to broker: " << error.what();
AGOL_FATAL(transport) << "Failed to connect to broker: " << error.what();
impl->connection.close();
return false;
}
......@@ -81,7 +84,7 @@ bool agotransport::AgoQpidTransport::start() {
try {
impl->receiver = impl->session.createReceiver("agocontrol; {create: always, node: {type: topic}}");
} catch(const std::exception& error) {
AGO_FATAL() << "Failed to create broker receiver: " << error.what();
AGOL_FATAL(transport) << "Failed to create broker receiver: " << error.what();
return false;
}
......@@ -102,11 +105,11 @@ bool agotransport::AgoQpidTransport::sendMessage(Json::Value& message)
if (!subject.empty())
qpmessage.setSubject(subject);
AGO_TRACE() << "Sending message [src=" << qpmessage.getReplyTo() <<
AGOL_TRACE(transport) << "Sending message [src=" << qpmessage.getReplyTo() <<
", sub="<< qpmessage.getSubject()<<"]: " << msgMap;
impl->sender.send(qpmessage);
} catch(const std::exception& error) {
AGO_ERROR() << "Exception in sendMessage: " << error.what();
AGOL_ERROR(transport) << "Exception in sendMessage: " << error.what();
return false;
}
......@@ -136,7 +139,7 @@ agocontrol::AgoResponse agotransport::AgoQpidTransport::sendRequest(Json::Value&
responseReceiver = recvsession.createReceiver(responseQueue);
qpmessage.setReplyTo(responseQueue);
AGO_TRACE() << "Sending request [sub=" << subject << ", replyTo=" << responseQueue <<"]" << msgMap;
AGOL_TRACE(transport) << "Sending message [sub=" << subject << ", reply-to=" << responseQueue <<"]:" << msgMap;
impl->sender.send(qpmessage);
qpid::messaging::Message message = responseReceiver.fetch(qpid::messaging::Duration(timeout.count()));
......@@ -154,9 +157,9 @@ agocontrol::AgoResponse agotransport::AgoQpidTransport::sendRequest(Json::Value&
}
r.init(response);
AGO_TRACE() << "Remote response received: " << r.response;
AGOL_TRACE(transport) << "Received response: " << r.response;
}catch(const std::invalid_argument& ex) {
AGO_ERROR() << "Failed to initate response, wrong response format? Error: "
AGOL_ERROR(transport) << "Failed to initiate response, wrong response format? Error: "
<< ex.what()
<< ". Message: " << r.response;
......@@ -165,11 +168,11 @@ agocontrol::AgoResponse agotransport::AgoQpidTransport::sendRequest(Json::Value&
recvsession.acknowledge();
} catch (const qpid::messaging::NoMessageAvailable&) {
AGO_WARNING() << "No reply for message sent to subject " << subject;
AGOL_WARNING(transport) << "Timeout waiting for reply (subject: " << subject << ")";
r.init(responseError(RESPONSE_ERR_NO_REPLY, "Timeout"));
} catch(const std::exception& ex) {
AGO_ERROR() << "Exception in sendRequest: " << ex.what();
AGOL_ERROR(transport) << "Exception in sendRequest: " << ex.what();
r.init(responseError(RESPONSE_ERR_INTERNAL, ex.what()));
}
......@@ -194,7 +197,7 @@ agotransport::AgoTransportMessage agotransport::AgoQpidTransport::fetchMessage(s
qpid::messaging::decode(message, contentMap);
variantMapToJson(contentMap, ret.message["content"]);
AGO_TRACE() << "Incoming message [src=" << message.getReplyTo() <<
AGOL_TRACE(transport) << "Incoming message [src=" << message.getReplyTo() <<
", sub="<< message.getSubject()<<"]: " << ret.message;
const qpid::messaging::Address replyaddress = message.getReplyTo();
......@@ -210,10 +213,10 @@ agotransport::AgoTransportMessage agotransport::AgoQpidTransport::fetchMessage(s
if(shutdownSignaled)
return ret;
AGO_ERROR() << "Exception in message loop: " << error.what();
AGOL_ERROR(transport) << "Exception in message loop: " << error.what();
if (impl->session.hasError()) {
AGO_ERROR() << "Session has error, recreating";
AGOL_ERROR(transport) << "Session has error, recreating";
impl->session.close();
impl->session = impl->connection.createSession();
impl->receiver = impl->session.createReceiver("agocontrol; {create: always, node: {type: topic}}");
......@@ -230,7 +233,7 @@ void agotransport::QpidImpl::sendReply(const Json::Value& content, const qpid::m
{
qpid::messaging::Message response;
qpid::types::Variant::Map responseMap = jsonToVariantMap(content);
AGO_TRACE() << "[qpid] sending reply " << content;
AGOL_TRACE(transport) << "Sending reply " << content;
qpid::messaging::Session replysession = connection.createSession();
try {
......@@ -239,7 +242,7 @@ void agotransport::QpidImpl::sendReply(const Json::Value& content, const qpid::m
//response.setSubject(instance);
replysender.send(response);
} catch(const std::exception& error) {
AGO_ERROR() << "[qpid] failed to send reply: " << error.what();;
AGOL_ERROR(transport) << "failed to send reply: " << error.what();;
}
replysession.close();
}
......@@ -97,6 +97,9 @@ class TestConfig:
assert d['broker'] == 'localhost'
assert d['uuid'] == '00000000-0000-0000-000000000000'
d = config.get_config_section(['loggers'], ['test', 'system'])
assert d['mqtt'] == 'INFO'
assert d['qpid'] == 'INFO'
def test_set_basic(self, setup_config):
assert sco('system', 'new_value', '666')
......@@ -157,7 +160,6 @@ class TestAppConfig:
# Will give system.confs password
assert app_sut.get_config_option('password', app=['other', 'system'], section='system') == 'letmein'
def test_get_section(self, app_sut):
d = app_sut.get_config_section()
assert d == dict(test_value_0='100',
......@@ -176,6 +178,10 @@ class TestAppConfig:
assert d['uuid'] == '00000000-0000-0000-000000000000'
assert 'test_value_blank' not in d # from test section
d = app_sut.get_config_section(['loggers'], ['test', 'system'])
assert d['mqtt'] == 'INFO'
assert d['qpid'] == 'INFO'
def test_set_basic(self, app_sut):
assert app_sut.set_config_option('new_value', '666') == True
assert app_sut.get_config_option('new_value') == '666'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment