...
 
Commits (2)
......@@ -318,10 +318,18 @@ int AgoLua::luaSendMessage(lua_State *L)
lua_pop(L, 1);
}
//execute sendMessage
AGO_DEBUG() << "Sending message: " << subject << " " << content;
AgoResponse response = agoConnection->sendRequest(subject, content);
pushTableFromJson(L, response.getResponse());
// execute "sendMessage". if subject is set, assume its a sendMessage, else it is a sendRequest which
// wants a response. The Blockly code does not create anything with subject, so this is purely for
// plain LUA scripts.
if(subject.empty()) {
AGO_DEBUG() << "Sending request: " << content;
AgoResponse response = agoConnection->sendRequest(content);
pushTableFromJson(L, response.getResponse());
}else{
AGO_DEBUG() << "Sending message on " << subject << ": " << content;
agoConnection->sendMessage(subject, content);
// TODO: Would caller expect a response?
}
return 1;
}
......@@ -332,7 +340,6 @@ int AgoLua::luaSendMessage(lua_State *L)
int AgoLua::luaSetVariable(lua_State *L)
{
Json::Value content;
std::string subject;
//get input arguments
std::string variable(lua_tostring(L,1));
......@@ -343,7 +350,7 @@ int AgoLua::luaSetVariable(lua_State *L)
content["uuid"]=agocontroller;
AGO_DEBUG() << "Set variable: " << content;
AgoResponse resp = agoConnection->sendRequest(subject, content);
AgoResponse resp = agoConnection->sendRequest(content);
//manage result
if( resp.isOk() )
......
......@@ -39,11 +39,15 @@ int main(int argc, char **argv) {
}
std::cout << "Sending message: " << content << std::endl;
AgoResponse response = agoConnection.sendRequest(subject, content);
if(response.isOk()) {
std::cout << "Success: " << response.getResponse() << std::endl;
}else{
std::cout << "Error: " << response.getResponse() << std::endl;
if(subject.empty()) {
AgoResponse response = agoConnection.sendRequest(content);
if(response.isOk()) {
std::cout << "Success: " << response.getResponse() << std::endl;
}else{
std::cout << "Error: " << response.getResponse() << std::endl;
}
} else {
agoConnection.sendMessage(subject, content);
}
}
......
......@@ -192,7 +192,6 @@ static bool jsonrpcErrorResponse(Json::Value& responseRoot, int code, const std:
void AgoRpc::jsonrpc_message(JsonRpcReqRep* reqRep, boost::unique_lock<boost::mutex> &lock, const Json::Value& params, Json::Value& responseRoot) {
// prepare message
const Json::Value& content = params["content"];
const Json::Value& subject = params["subject"];
const Json::Value& replytimeout = params["replytimeout"];
std::chrono::milliseconds timeout(3000);
......@@ -207,7 +206,7 @@ void AgoRpc::jsonrpc_message(JsonRpcReqRep* reqRep, boost::unique_lock<boost::mu
{
// ReqRep must not be locked when blocking in another thread
boost::reverse_lock<boost::unique_lock<boost::mutex>> unlock(lock);
response = agoConnection->sendRequest(subject.asString(), content, timeout);
response = agoConnection->sendRequest(content, timeout);
}
AGO_TRACE() << "RPC Response: " << response.getResponse();
......
This diff is collapsed.
......@@ -457,18 +457,12 @@ bool agocontrol::AgoConnection::sendMessage(const std::string& subject, const Js
}
agocontrol::AgoResponse agocontrol::AgoConnection::sendRequest(const Json::Value& content) {
return sendRequest("", content, std::chrono::seconds(3));
return sendRequest(content, std::chrono::seconds(3));
}
agocontrol::AgoResponse agocontrol::AgoConnection::sendRequest(const std::string& subject, const Json::Value& content) {
return sendRequest("", content, std::chrono::seconds(3));
}
agocontrol::AgoResponse agocontrol::AgoConnection::sendRequest(const std::string& subject, const Json::Value& content, std::chrono::milliseconds timeout) {
agocontrol::AgoResponse agocontrol::AgoConnection::sendRequest(const Json::Value& content, std::chrono::milliseconds timeout) {
Json::Value message;
message["content"] = content;
if(!subject.empty())
message["subject"] = subject;
return transport->sendRequest(message, timeout);
}
......
......@@ -86,8 +86,7 @@ namespace agocontrol {
bool sendMessage(const Json::Value& content);
AgoResponse sendRequest(const Json::Value& content);
AgoResponse sendRequest(const std::string& subject, const Json::Value& content);
AgoResponse sendRequest(const std::string& subject, const Json::Value& content, std::chrono::milliseconds timeout);
AgoResponse sendRequest(const Json::Value& content, std::chrono::milliseconds timeout);
bool emitEvent(const std::string& internalId, const std::string& eventType, const std::string& level, const std::string& units);
bool emitEvent(const std::string& internalId, const std::string& eventType, double level, const std::string& units);
......
......@@ -125,21 +125,15 @@ agocontrol::AgoResponse agotransport::AgoQpidTransport::sendRequest(Json::Value&
qpid::messaging::Session recvsession = impl->connection.createSession();
qpid::types::Variant::Map msgMap = jsonToVariantMap(message["content"]);
std::string subject;
if(message.isMember("subject")) {
subject = message["subject"].asString();
}
try {
encode(msgMap, qpmessage);
if(!subject.empty())
qpmessage.setSubject(subject);
qpid::messaging::Address responseQueue("#response-queue; {create:always, delete:always}");
responseReceiver = recvsession.createReceiver(responseQueue);
qpmessage.setReplyTo(responseQueue);
AGOL_TRACE(transport) << "Sending message [sub=" << subject << ", reply-to=" << responseQueue <<"]:" << msgMap;
AGOL_TRACE(transport) << "Sending message [reply-to=" << responseQueue <<"]:" << msgMap;
impl->sender.send(qpmessage);
qpid::messaging::Message message = responseReceiver.fetch(qpid::messaging::Duration(timeout.count()));
......@@ -168,7 +162,7 @@ agocontrol::AgoResponse agotransport::AgoQpidTransport::sendRequest(Json::Value&
recvsession.acknowledge();
} catch (const qpid::messaging::NoMessageAvailable&) {
AGOL_WARNING(transport) << "Timeout waiting for reply (subject: " << subject << ")";
AGOL_WARNING(transport) << "Timeout waiting for reply";
r.init(responseError(RESPONSE_ERR_NO_REPLY, "Timeout"));
} catch(const std::exception& ex) {
......