Commit 9aafd93e authored by Johan Ström's avatar Johan Ström

Finally remove AgoClient::sendMessageReply, use sendRequest instead

parent 068018a0
......@@ -489,8 +489,8 @@ int AgoLua::luaSendMessage(lua_State *L)
//execute sendMessage
AGO_DEBUG() << "Sending message: " << subject << " " << content;
qpid::types::Variant::Map replyMap = agoConnection->sendMessageReply(subject.c_str(), content);
pushTableFromMap(L, replyMap);
AgoResponse response = agoConnection->sendRequest(subject.c_str(), content);
pushTableFromMap(L, response.getResponse());
return 1;
}
......
......@@ -14,7 +14,6 @@
#include "agoclient.h"
using namespace std;
using namespace agocontrol;
using std::stringstream;
......@@ -22,7 +21,7 @@ using std::string;
int main(int argc, char **argv) {
if (argc < 2) {
cout << "Usage example: " << argv[0] << " uuid=ca9424e6-406d-4144-8931-584046eaaa34 command=setlevel level=50" << endl;
std::cout << "Usage example: " << argv[0] << " uuid=ca9424e6-406d-4144-8931-584046eaaa34 command=setlevel level=50" << std::endl;
return -1;
}
AgoConnection agoConnection = AgoConnection("messagesend");
......@@ -40,9 +39,14 @@ int main(int argc, char **argv) {
}
}
}
cout << "Sending message: " << content << endl;
qpid::types::Variant::Map replyMap = agoConnection.sendMessageReply(subject.c_str(), content);
cout << "Reply: " << replyMap << endl;
std::cout << "Sending message: " << content << std::endl;
AgoResponse response = agoConnection.sendRequest(subject.c_str(), content);
if(response.isOk()) {
std::cout << "Success: " << response.getResponse() << std::endl;
}else{
std::cout << "Error: " << response.getResponse() << std::endl;
}
}
......@@ -214,79 +214,17 @@ void AgoRpc::jsonrpc_message(JsonRpcReqRep* reqRep, boost::unique_lock<boost::mu
//send message and handle response
AGO_TRACE() << "Request on " << reqRep << ": " << command << "(timeout=" << timeout.getMilliseconds() << " : << "<<replytimeout<< ")";
qpid::types::Variant::Map responseMap;
agocontrol::AgoResponse response;
try {
lock.unlock();
// TODO: change this to sendRequest when all backends have been updated
responseMap = agoConnection->sendMessageReply(subject.asString().c_str(), command, timeout);
response = agoConnection->sendRequest(subject.asString(), command, timeout);
} catch(...) {
lock.lock();
throw;
}
// AGO_TRACE() << "Response: " << responseMap;
if(responseMap.size() == 0 || !responseRoot.isMember("id") ) // only send reply when id is not null
{
// TODO: revisit id / notifications
// no response
if(responseMap.size() == 0)
AGO_ERROR() << "No reply message to fetch. Failed message: " << "subject=" <<subject<<": " << command;
jsonrpcErrorResponse(responseRoot, AGO_JSONRPC_MESSAGE_ERROR, "error.no.reply");
return;
}
// allow on the fly behavior during migration
if (responseMap.count("_newresponse"))
{
AGO_TRACE() << "New style response on " << reqRep << ": " << responseMap;
if (responseMap["error"].isVoid())
{
// no error
if (responseMap["result"].isVoid() || responseMap["result"].getType() != qpid::types::VAR_MAP)
{
AGO_ERROR() << "New style response does not contain result nor error";
jsonrpcErrorResponse(responseRoot, AGO_JSONRPC_MESSAGE_ERROR, "message returned neither error or result");
return;
}
else
{
responseRoot["result"] = Json::Value(Json::objectValue);
variantMapToJson(responseMap["result"].asMap(), responseRoot["result"]);
return;
}
}
else
{
// error
if (responseMap["error"].getType() != qpid::types::VAR_MAP)
{
AGO_ERROR() << "Error response is not a map";
jsonrpcErrorResponse(responseRoot, AGO_JSONRPC_MESSAGE_ERROR, "message returned error and error map is malformed");
return;
}
else
{
qpid::types::Variant::Map errorMap = responseMap["error"].asMap();
responseRoot["error"] = Json::Value(Json::objectValue);
responseRoot["error"]["code"] = AGO_JSONRPC_COMMAND_ERROR;
responseRoot["error"]["message"] = "Command returned error";
responseRoot["error"]["data"] = Json::Value(Json::objectValue);
variantMapToJson(responseMap["error"].asMap(), responseRoot["error"]["data"]);
return;
}
}
}
else
{
#if 0
// old style responses
mg_rpc_reply_map(conn, request, responseMap);
#endif
AGO_ERROR() << "Old-style response not supported anymore: " << responseMap;
jsonrpcErrorResponse(responseRoot, AGO_JSONRPC_MESSAGE_ERROR, "message returned old-style response, not supported");
}
AGO_TRACE() << "Response: " << response.getResponse();
variantMapToJson(response.getResponse(), responseRoot);
}
......
......@@ -503,7 +503,7 @@ void AgoSecurity::sendAlarm(std::string zone, std::string uuid, std::string mess
if( send )
{
agoConnection->sendMessageReply("", *content);
agoConnection->sendMessage(*content);
}
AGO_TRACE() << "sendAlarm() END";
}
......
......@@ -803,44 +803,6 @@ agocontrol::AgoResponse agocontrol::AgoConnection::sendRequest(const std::string
return r;
}
// DEPRECATED, USE sendRequest
qpid::types::Variant::Map agocontrol::AgoConnection::sendMessageReply(const char *subject, const qpid::types::Variant::Map& content) {
return sendMessageReply(subject, content, Duration::SECOND * 3);
}
qpid::types::Variant::Map agocontrol::AgoConnection::sendMessageReply(const char *subject, const qpid::types::Variant::Map& content, qpid::messaging::Duration timeout) {
Message message;
qpid::types::Variant::Map responseMap;
Receiver responseReceiver;
Session recvsession = connection.createSession();
try {
encode(content, message);
if(*subject != 0)
message.setSubject(subject);
Address responseQueue("#response-queue; {create:always, delete:always}");
responseReceiver = recvsession.createReceiver(responseQueue);
message.setReplyTo(responseQueue);
AGO_TRACE() << "Sending message [sub=" << subject << ", reply=" << responseQueue <<"]" << content;
sender.send(message);
Message response = responseReceiver.fetch(timeout);
recvsession.acknowledge();
if (response.getContentSize() > 3) {
decode(response,responseMap);
} else {
responseMap["response"] = response.getContent();
}
AGO_TRACE() << "Reply received: " << responseMap;
} catch (qpid::messaging::NoMessageAvailable) {
AGO_WARNING() << "No reply for message sent to subject " << subject;
} catch(const std::exception& error) {
AGO_ERROR() << "Exception in sendMessageReply: " << error.what();
}
recvsession.close();
return responseMap;
}
bool agocontrol::AgoConnection::sendMessage(qpid::types::Variant::Map content) {
return sendMessage("",content);
}
......
......@@ -116,16 +116,10 @@ namespace agocontrol {
bool sendMessage(const char *subject, qpid::types::Variant::Map content);
bool sendMessage(qpid::types::Variant::Map content);
// New-style API.
// TEMP NOTE: Only use towards apps using new style command handlers!
AgoResponse sendRequest(const qpid::types::Variant::Map& content);
AgoResponse sendRequest(const std::string& subject, const qpid::types::Variant::Map& content);
AgoResponse sendRequest(const std::string& subject, const qpid::types::Variant::Map& content, qpid::messaging::Duration timeout);
// Deprecated
qpid::types::Variant::Map sendMessageReply(const char *subject, const qpid::types::Variant::Map& content, qpid::messaging::Duration timeout);
qpid::types::Variant::Map sendMessageReply(const char *subject, const qpid::types::Variant::Map& content);
bool emitEvent(const char *internalId, const char *eventType, const char *level, const char *units);
bool emitEvent(const char *internalId, const char *eventType, double level, const char *units);
bool emitEvent(const char *internalId, const char *eventType, int level, const char *units);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment