Commit fd96d1d5 authored by Johan Ström's avatar Johan Ström

Merge remote-tracking branch 'origin/mqtt-buster' into develop

parents c2fd6b96 edac5106
......@@ -16,3 +16,6 @@ version.h
*.save
.idea/
*.pyc
*.swp
......@@ -51,7 +51,7 @@ else()
set (DATADIR /var/opt/agocontrol CACHE STRING "Directory for variable data")
set (LOCALSTATEDIR /var/opt/agocontrol CACHE STRING "Directory for state info")
if(NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
set (CMAKE_EXE_LINKER_FLAGS "-Wl,-z,now")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-z,now")
endif()
endif()
......@@ -90,6 +90,51 @@ set_property( DIRECTORY
-Wno-error=unknown-pragmas
)
# Add optional building with/without ASAN
# https://github.com/google/sanitizers/wiki/AddressSanitizer
string(TOUPPER "${CMAKE_BUILD_TYPE}" uppercase_CMAKE_BUILD_TYPE)
# In DEBUG builds we enable it by default, except on ARM
IF(${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm")
option(WITH_ASAN_DEBUG "Build with Address Sanitizer in debug mode" OFF)
else()
option(WITH_ASAN_DEBUG "Build with Address Sanitizer in debug mode" ON)
endif()
# With RELEASE build default to off.
option(WITH_ASAN_RELEASE "Build with Address Sanitizer in release mode" OFF)
if(uppercase_CMAKE_BUILD_TYPE MATCHES DEBUG)
set(WITH_ASAN ${WITH_ASAN_DEBUG})
else()
set(WITH_ASAN ${WITH_ASAN_RELEASE})
endif()
if(WITH_ASAN)
message(STATUS "Building with AddressSanitizer enabled")
#https://github.com/google/sanitizers/wiki/AddressSanitizerFlags
set_property(DIRECTORY APPEND PROPERTY COMPILE_OPTIONS -fsanitize=address -fno-omit-frame-pointer)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
if(OS_LINUX)
# On linux, enable leak sanitizer too
set_property(DIRECTORY APPEND PROPERTY COMPILE_OPTIONS -fsanitize=leak )
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=leak")
endif()
if(OS_DARWIN)
# MacOS needs to link shared library with these flags too.
SET(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -fsanitize=address")
endif()
# gcc (Or just Linux?) needs to link with -lasan
if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
set(ASAN_LIBRARIES asan)
endif()
else()
message(STATUS "Building with AddressSanitizer disabled")
endif()
# add the binary tree to the search path for include files
# so that we will find version.h and other configured files
include_directories("${PROJECT_BINARY_DIR}")
......@@ -142,6 +187,18 @@ log_pkg_found(cppdb "${CPPDB_FOUND}" "" WARNING ", parts of core will not be bui
pkg_search_module(LIBRRD librrd)
log_pkg_found(librrd "${LIBRRD_FOUND}" "${LIBRRD_VERSION}" WARNING ", parts of core will not be built")
find_package(Qpid)
log_pkg_found(qpid "${QPID_FOUND}" "${QPID_VERSION}" STATUS ", QPID communication support will not be built")
set(WITH_QPID ${QPID_FOUND})
find_package(Mosquittopp)
log_pkg_found(mosquittopp "${MOSQUITTOPP_FOUND}" "${MOSQUITTOPP_VERSION}" WARNING ", MQTT communication support will not be built")
set(WITH_MQTT ${MOSQUITTOPP_FOUND})
if(NOT ${MOSQUITTOPP_FOUND} AND NOT ${QPID_FOUND})
message(FATAL_ERROR "Neither mosquittopp nor qpid found. One or both is required")
endif()
if(OS_LINUX)
# used by agosystem
pkg_search_module(LIBPROCPS REQUIRED libprocps)
......@@ -206,10 +263,8 @@ endif()
install(DIRECTORY DESTINATION ${BINDIR})
# add sub directories
option(BUILD_SHARED "Build the agoclient shared library" ON)
if (BUILD_SHARED)
add_subdirectory (shared)
endif()
add_subdirectory (shared)
add_subdirectory (python)
option(BUILD_TESTS "Build the unittests" ON)
if (BUILD_TESTS AND CPPUNIT_FOUND)
......
import json
import os
import threading
import time
import traceback
from collections import namedtuple
import pytest
import logging
try:
import paho.mqtt.client as mqtt
HAS_MQTT = True
except ImportError:
HAS_MQTT = False
@pytest.fixture(scope='session')
def mqttc():
assert HAS_MQTT
#broker = os.environ.get('AGO_BROKER', 'localhost')
#username = os.environ.get('AGO_USERNAME', 'agocontrol')
#password = os.environ.get('AGO_PASSWORD', 'letmein')
mqttc = mqtt.Client(client_id='ago-autotest', clean_session=True)
mqttc.loop_start()
ready_cond = threading.Condition()
def mqtt_on_connect(client, userdata, flags, rc):
#print("MQTT connected, subscibing")
client.subscribe('com.agocontrol/legacy')
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
#print("MQTT subscribed")
ready_cond.acquire()
ready_cond.notify()
ready_cond.release()
mqttc.on_connect = mqtt_on_connect
mqttc.on_subscribe = mqtt_on_subscribe
mqttc.connect('localhost')
ready_cond.acquire()
ready_cond.wait(10)
ready_cond.release()
# print("Setup ready")
yield mqttc
mqttc.disconnect()
mqttc.loop_stop()
@pytest.fixture(scope='function')
def mqtt_transport_adapter(mqttc):
adapter = MqttTransportAdapter(mqttc)
return adapter
MqttTransportMessage = namedtuple('MqttTransportMessage', 'content,subject')
class MqttTransportAdapter:
def __init__(self, mqttc, timeout=10):
self.mqttc = mqttc
def set_handler(self, handler):
self.handler = handler
self.mqttc.on_message = self.on_message
def on_message(self, client, userdata, message):
#print("Got message on ", message.topic, ":", message.payload)
try:
json_payload = json.loads(message.payload)
if 'UT-EXP' not in json_payload['content']:
# Ignore other spurious msgs
#print("Ignoring message")
return
tpm = MqttTransportMessage(json_payload['content'], json_payload.get('subject', None))
rep = self.handler(tpm)
if rep:
logging.info("Got msg %s, replying with %s", (json_payload, rep))
self.mqttc.publish(json_payload['reply-to'], json.dumps(rep))
#else:
#print("Handler called, but no reply")
except:
traceback.print_exc()
def send(self, content, subject):
topic = 'com.agocontrol/legacy'
self.mqttc.publish(topic, payload=json.dumps(dict(content=content, subject=subject)))
def shutdown(self):
self.mqttc.on_message = None
import os
import threading
import time
import pytest
import logging
try:
from qpid.messaging import Connection, Message, Empty, LinkClosed
HAS_QPID = True
except ImportError:
HAS_QPID = False
@pytest.fixture(scope='session')
def qpid_details():
assert HAS_QPID
broker = os.environ.get('AGO_BROKER', 'localhost')
username = os.environ.get('AGO_USERNAME', 'agocontrol')
password = os.environ.get('AGO_PASSWORD', 'letmein')
connection = Connection(broker,
username=username,
password=password,
reconnect=True)
connection.open()
session = connection.session()
details = dict(
connection=connection,
session=session,
sender=session.sender("agocontrol; {create: always, node: {type: topic}}")
)
yield details
connection.close()
@pytest.fixture(scope='function')
def qpid_transport_adapter(qpid_details):
adapter = QpidTransportAdapter(qpid_details['session'])
return adapter
class QpidTransportAdapter(threading.Thread):
"""Simple class which will look for Qpid messages, and
execute a handler fn which the test can use to generate a response.
Only reacts to messages with key 'UT-EXP' in content!
"""
def __init__(self, session, timeout=10):
super(QpidTransportAdapter, self).__init__()
self.session = session
self.receiver = self.session.receiver("agocontrol; {create: always, node: {type: topic}}")
self.sender = self.session.sender("agocontrol; {create: always, node: {type: topic}}")
self.timeout = timeout
def set_handler(self, handler):
self.handler = handler
self.start()
def run(self):
self.stop = False
stop_after = time.time() + self.timeout
while not self.stop and time.time() < stop_after:
try:
msg = self.receiver.fetch(timeout=10)
self.session.acknowledge()
if not 'UT-EXP' in msg.content:
# Ignore other spurious msgs
continue
rep = self.handler(msg)
if rep:
logging.info("Got msg %s, replying with %s", (msg.content, rep))
snd = self.session.sender(msg.reply_to)
snd.send(Message(rep))
except (Empty, LinkClosed):
continue
def send(self, content, subject):
message = Message(content=content, subject=subject)
self.sender.send(message)
def shutdown(self):
self.stop = True
self.receiver.close()
if self.is_alive():
self.join()
import logging
import pytest
# Automatically imports
pytest_plugins = ['testlib.qpid_transport', 'testlib.mqtt_transport']
logging.basicConfig()
# TODO: use pytest-variables instead?
@pytest.fixture
def variables(request):
return dict(transport=request.config.getoption("--transport"))
def pytest_addoption(parser):
parser.addoption(
"--transport", action="store", default="qpid", help="transport to test: qpid or mqtt"
)
@pytest.fixture
def transport_adapter(request, variables):
if variables['transport'] == 'qpid':
adapter = request.getfixturevalue('qpid_transport_adapter')
elif variables['transport'] == 'mqtt':
adapter = request.getfixturevalue('mqtt_transport_adapter')
else:
raise AssertionError('Invalid transport configured')
yield adapter
adapter.shutdown()
import os
import json
import urllib2
import random
import time
RPC_PARSE_ERROR = -32700
RPC_INVALID_REQUEST = -32600
RPC_METHOD_NOT_FOUND = -32601
RPC_INVALID_PARAMS = -32602
RPC_INTERNAL_ERROR = -32603
# -32000 to -32099 impl-defined server errors
RPC_NO_EVENT = -32000
RPC_MESSAGE_ERROR = -32001
RPC_COMMAND_ERROR = -31999
url = os.environ.get('AGO_URL', 'http://localhost:8008')
url_jsonrpc = url + '/jsonrpc'
def bus_message(subject, content, expect_identifier=None, expect_rpc_error_code=None, timeout=None):
"""Send a Agocontrol Bus message via the RPC interface"""
params = {
'content': content,
'subject': subject
}
if timeout is not None:
params['replytimeout'] = timeout
# Give HTTP request a bit more time.
timeout = timeout + 0.5
return jsonrpc_request('message', params, expect_identifier=expect_identifier, expect_rpc_error_code=expect_rpc_error_code, timeout=timeout)
def jsonrpc_request(method, params=None, req_id=None, expect_identifier=None, expect_rpc_error_code=None, timeout=None):
"""Execute a JSON-RPC 2.0 request with the specified method, and specified params
dict. If req_id is None, a random ID number is selected.
Spec: http://www.jsonrpc.org/specification
"""
if req_id is None:
req_id = random.randint(1, 10000)
msg = {'jsonrpc': '2.0',
'id': req_id,
'method': method
}
if params:
msg['params'] = params
req_raw = json.dumps(msg)
http_req = urllib2.Request(url_jsonrpc, req_raw)
if not timeout: timeout = 5
http_rep = urllib2.urlopen(http_req, timeout=timeout)
assert http_rep.code == 200
assert 'application/json' == http_rep.info()['Content-Type']
rep_raw = http_rep.read()
rep_body = json.loads(rep_raw)
assert '2.0' == rep_body['jsonrpc']
assert req_id == rep_body['id']
if expect_rpc_error_code:
assert 'error' in rep_body
assert 'result' not in rep_body
err = rep_body['error']
assert 'code' in err
assert expect_rpc_error_code == err['code']
if expect_identifier:
assert expect_identifier == err['identifier']
else:
assert 'identifier' not in err
else:
assert 'error' not in rep_body
assert 'result' in rep_body
if expect_identifier:
assert expect_identifier == rep_body['result']['identifier']
else:
assert 'identifier' not in rep_body['result']
if not expect_rpc_error_code:
return rep_body['result']
else:
return rep_body['error']
class TestRPC(object):
def test_unknown_command(self):
jsonrpc_request('no-real', expect_rpc_error_code=RPC_METHOD_NOT_FOUND)
def test_bus_message(self, transport_adapter):
def mock(msg):
assert 'command' in msg.content
assert 'some-command' == msg.content['command']
assert 1234 == msg.content['int-param']
return {'_newresponse':True, 'result':{'identifier': 'success', 'data': {'int':4321, 'string':'test'}}}
transport_adapter.set_handler(mock)
rep = bus_message('', {'command':'some-command', 'int-param':1234, 'UT-EXP':True}, expect_identifier='success')
# rep is result from response
assert 4321 == rep['data']['int']
assert 'test' == rep['data']['string']
def test_bus_message_err(self, transport_adapter):
def mock(msg):
assert 'command' in msg.content
assert 'some-err-command' == msg.content['command']
assert 12345 == msg.content['int-param']
return {'_newresponse':True, 'error':{'identifier': 'some.error', 'message': 'err', 'data': {'int':4321, 'string':'test'}}}
transport_adapter.set_handler(mock)
rep = bus_message('', {'command':'some-err-command', 'int-param':12345, 'UT-EXP':True},
expect_identifier='some.error',
expect_rpc_error_code=RPC_COMMAND_ERROR)
# rep is error from response; contains message, code and data.
assert 'some.error' == rep['identifier']
assert 'err' == rep['message']
assert 4321 == rep['data']['int']
assert 'test' == rep['data']['string']
def test_empty_bus_message(self):
jsonrpc_request('message', None, expect_rpc_error_code=RPC_INVALID_PARAMS)
def test_no_reply_timeout(self):
s = time.time()
# Must take at least 0.5s
ret = bus_message('', {'command':'non-existent'},
expect_identifier='error.no.reply',
expect_rpc_error_code=RPC_COMMAND_ERROR, timeout=0.5)
e = time.time()
assert "Timeout" == ret['message']
assert e-s >= 0.5
assert e-s < 1.0 # but quite near 0.5..
def test_inventory(self):
"""Executes an inventory request, assumes agoresolver is alive"""
rep = bus_message('', {'command':'inventory'}, expect_identifier='success')
assert dict == type(rep)
assert rep.get('identifier') == 'success'
assert 'data' in rep
data = rep['data']
assert 'devices' in data
assert 'schema' in data
assert 'rooms' in data
assert 'floorplans' in data
assert 'system' in data
assert 'variables' in data
assert 'environment' in data
def test_get_event_errors(self):
jsonrpc_request('getevent', None, expect_rpc_error_code= RPC_INVALID_PARAMS)
jsonrpc_request('getevent', {}, expect_rpc_error_code=RPC_INVALID_PARAMS)
jsonrpc_request('getevent', {'uuid':'123'}, expect_rpc_error_code=RPC_INVALID_PARAMS)
def test_subscribe(self, transport_adapter):
sub_id = jsonrpc_request('subscribe', None)
assert type(sub_id) in (str,unicode)
err = jsonrpc_request('getevent', {'uuid':sub_id, 'timeout':0}, expect_rpc_error_code=RPC_NO_EVENT)
assert err['message'] == 'No messages available'
# Send a message
transport_adapter.send(content={'test':1, 'notify':True}, subject='event.something.subject')
while True:
res = jsonrpc_request('getevent', {'uuid':sub_id, 'timeout':1})
if res['event'] == 'event.device.announce':
# Ignore announce from other devices.
# Besides that, nothing shall talk on our test network
continue
assert 'notify' in res
assert res['test'] == 1
assert res['event'] =='event.something.subject'
break
jsonrpc_request('unsubscribe', None, expect_rpc_error_code=RPC_INVALID_PARAMS)
jsonrpc_request('unsubscribe', {'uuid':None}, expect_rpc_error_code=RPC_INVALID_PARAMS)
rep = jsonrpc_request('unsubscribe', {'uuid':sub_id})
assert "success" == rep
def test_subscribe_timeout(self):
# Ensure we can do subseccond precision on getevent timeouts
sub_id = jsonrpc_request('subscribe', None)
assert type(sub_id)in (str,unicode)
s = time.time()
err = jsonrpc_request('getevent', {'uuid':sub_id, 'timeout':0.5}, expect_rpc_error_code=RPC_NO_EVENT)
e = time.time()
assert err['message'] == 'No messages available'
assert e-s >= 0.5
assert e-s < 0.6
......@@ -8,4 +8,7 @@
#cmakedefine HAVE_XLOCALE_H @HAVE_XLOCALE_H@
#cmakedefine WITH_MQTT @WITH_MQTT@
#cmakedefine WITH_QPID @WITH_QPID@
#endif
\ No newline at end of file
......@@ -14,11 +14,10 @@ function(CopyFilesFromSource TARGET SOURCE_FILES)
if (NOT IN_SOURCE_BUILD)
add_custom_target(${TARGET} ALL SOURCES ${SOURCE_FILES})
foreach (infile ${SOURCE_FILES})
string(REPLACE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} outfile ${infile})
#message("-- staging for copy: ${CMAKE_CURRENT_SOURCE_DIR}/${infile} -> ${CMAKE_CURRENT_BINARY_DIR}/${outfile}")
#message("-- staging for copy: ${CMAKE_CURRENT_SOURCE_DIR}/${infile} -> ${CMAKE_CURRENT_BINARY_DIR}/${infile}")
add_custom_command(
TARGET ${TARGET}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/${infile} ${outfile}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/${infile} ${infile}
VERBATIM
)
endforeach()
......
# From https://github.com/romaniucradu/Calculated/blob/master/cmake_modules/FindMosquittopp.cmake
# - Find libmosquitto
# Find the native libmosquitto includes and libraries
#
# MOSQUITTOPP_INCLUDE_DIR - where to find mosquitto.h, etc.
# MOSQUITTOPP_LIBRARIES - List of libraries when using libmosquitto.
# MOSQUITTOPP_FOUND - True if libmosquitto found.
if(MOSQUITTOPP_INCLUDE_DIR)
# Already in cache, be silent
set(MOSQUITTOPP_FIND_QUIETLY TRUE)
endif(MOSQUITTOPP_INCLUDE_DIR)
find_path(MOSQUITTOPP_INCLUDE_DIR mosquitto.h)
find_library(MOSQUITTOPP_LIBRARY NAMES libmosquittopp mosquittopp)
find_library(MOSQUITTO_LIBRARY NAMES libmosquitto mosquitto)
# Handle the QUIETLY and REQUIRED arguments and set MOSQUITTO_FOUND to TRUE if
# all listed variables are TRUE.
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(MOSQUITTOPP DEFAULT_MSG MOSQUITTOPP_LIBRARY MOSQUITTOPP_INCLUDE_DIR)
if(MOSQUITTOPP_FOUND)
set(MOSQUITTOPP_LIBRARIES ${MOSQUITTOPP_LIBRARY} ${MOSQUITTO_LIBRARY})
else(MOSQUITTOPP_FOUND)
set(MOSQUITTOPP_LIBRARIES)
endif(MOSQUITTOPP_FOUND)
mark_as_advanced(MOSQUITTOPP_INCLUDE_DIR MOSQUITTOPP_LIBRARY)
# Find qpid libraries and headers
#
# QPID_INCLUDE_DIR - where to find qpid/messaging/*.h, etc.
# QPID_LIBRARIES - List of libraries when using qpid.
# QPID_FOUND - True if qpid found.
if(QPID_INCLUDE_DIR)
# Already in cache, be silent
set(QPID_FIND_QUIETLY TRUE)
endif(QPID_INCLUDE_DIR)
find_path(QPID_INCLUDE_DIR qpid/messaging/Connection.h)
find_library(QPID_MESSAGING_LIBRARY NAMES qpidmessaging)
find_library(QPID_TYPES_LIBRARY NAMES qpidtypes)
# Handle the QUIETLY and REQUIRED arguments and set QPID_FOUND to TRUE if
# all listed variables are TRUE.
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(QPID DEFAULT_MSG QPID_MESSAGING_LIBRARY QPID_INCLUDE_DIR)
if(QPID_FOUND)
set(QPID_LIBRARIES ${QPID_MESSAGING_LIBRARY} ${QPID_TYPES_LIBRARY})
else(QPID_FOUND)
set(QPID_LIBRARIES)
endif(QPID_FOUND)
mark_as_advanced(QPID_INCLUDE_DIR QPID_LIBRARY)
......@@ -16,6 +16,8 @@ foreach (infile ${INIT_FILES})
endforeach (infile)
CopyFilesFromSource(augeas_lens "agocontrol.aug")
add_subdirectory (conf.d)
add_subdirectory (schema.d)
add_subdirectory (sysvinit)
......
cmake_minimum_required (VERSION 3.0)
file (GLOB CONFD_FILES *.in)
file (GLOB CONFD_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *.in)
foreach (infile ${CONFD_FILES})
string(REGEX REPLACE ".in$" "" outfile ${infile})
string(REGEX REPLACE ".*/" "" outfile ${outfile})
configure_file(
"${infile}"
"${CMAKE_CURRENT_BINARY_DIR}/${outfile}"
@ONLY
)
LIST(APPEND CONFD_FILES_DONE ${CMAKE_CURRENT_BINARY_DIR}/${outfile})
LIST(APPEND CONFD_FILES_DONE ${outfile})
endforeach (infile)
file (GLOB CONFD_FILES *.conf)
install (FILES ${CONFD_FILES_DONE} ${CONFD_FILES} DESTINATION ${CONFDIR}/conf.d)
file (GLOB CONFD_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *.conf)
CopyFilesFromSource(conf_files "${CONFD_FILES}")
InstallFiles(${CONFDIR}/conf.d "${CONFD_FILES_DONE};${CONFD_FILES}")
[system]
uuid=00000000-0000-0000-000000000000
# mqtt or qpid
messaging=qpid
# address to broker, hostname[:port]
broker=localhost
# when you change username and/or password, you also need to adjust the qpid sasl db to reflect the change
username=agocontrol
password=letmein
units=SI
# Please see http://wiki.agocontrol.com/index.php/Logging for more details on Logging
......@@ -11,3 +19,31 @@ units=SI
log_method=syslog
# log levels: TRACE, DEBUG, INFO, WARNING, ERROR, FATAL
log_level=INFO
[loggers]
# The main log_level controls the overall logging level.
# It is then possible to limit different loggers (python) / channels (C++) to a higher level.
# For example, if log_level is set to INFO, nothing lower than INFO will be logged,
# even if a specific logger is set to TRACE.