agorpc.cpp 29.9 KB
Newer Older
1
/*
2
   Copyright (C) 2012 Harald Klein <hari@vt100.at>
3

4 5 6
   This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License.
   This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
7

8
   See the GNU General Public License for more details.
9

10 11
   this is a lightweight RPC/HTTP interface for ago control for platforms where the regular cherrypy based admin interface is too slow
   */
12 13 14

#include <errno.h>

15
#include <deque>
16 17
#include <map>
#include <string>
18

19 20
#include <boost/foreach.hpp>
#include <boost/filesystem.hpp>
21
#include <boost/preprocessor/stringize.hpp>
22
#include <boost/shared_ptr.hpp>
23 24
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
Johan Ström's avatar
Johan Ström committed
25
#include <boost/thread/reverse_lock.hpp>
26
#include <boost/tokenizer.hpp>
Tang's avatar
Tang committed
27

28
#include "agoapp.h"
29
#include "agoutils.h"
30

31
#include "agohttp/agohttp.h"
32

33
#define GETEVENT_DEFAULT_TIMEOUT_SECONDS 28 // long-polling getevent
34 35 36 37 38 39

//upload file path
#define UPLOAD_PATH "/tmp/"

//default auth file
#define HTPASSWD ".htpasswd"
Harald Klein's avatar
Harald Klein committed
40

41 42 43 44 45 46 47 48 49 50
/* JSON-RPC 2.0 standard error codes
 * http://www.jsonrpc.org/specification#error_object
 */
#define JSONRPC_PARSE_ERROR -32700
#define JSONRPC_INVALID_REQUEST -32600
#define JSONRPC_METHOD_NOT_FOUND -32601
#define JSONRPC_INVALID_PARAMS -32602
#define JSONRPC_INTERNAL_ERROR -32603

// -32000 to -32099 impl-defined server errors
51 52
#define AGO_JSONRPC_NO_EVENT            -32000
#define AGO_JSONRPC_MESSAGE_ERROR       -32001
53
#define AGO_JSONRPC_COMMAND_ERROR       -31999
54

55
namespace fs = ::boost::filesystem;
56
using namespace agocontrol;
57
using namespace agocontrol::agohttp;
58 59 60 61

// struct and map for json-rpc event subscriptions
struct Subscriber
{
62
    std::deque<Json::Value> queue;
63
    time_t lastAccess;
64 65
};

66 67 68 69 70 71 72 73 74 75 76 77
class AgoRpc;

class JsonRpcReqRep : public HttpReqJsonRep {
friend class AgoRpc;
protected:
    AgoRpc *appInstance;

    // Raw JsonRCP request & response
    Json::Value jsonrpcRequest;

    // For GetEvent call, subscriptionId.
    // No batching supported for getEvent
78
    std::string subscriptionId;
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110

    uint64_t timeout;

public:
    JsonRpcReqRep(AgoRpc* _appInstance)
        : appInstance(_appInstance)
        , timeout(GETEVENT_DEFAULT_TIMEOUT_SECONDS*1000)
    {
        // JsonRPC does not know of anything else
        setResponseCode(200);
    }

    bool isResponseReady();
    bool isResponseReady(bool isTimeout);
    void onTimeout();
    uint16_t getTimeout();
};

class FileDownloadReqRep : public HttpReqRep {
public:
    FileDownloadReqRep(AgoRpc* _appInstance, struct http_message* _hm_req)
        : appInstance(_appInstance)
        , hm_req(_hm_req)
    {
    }

    // Reading appInstance/requests should be safe without holding mutex.
    // Writing to response is NOT safe.

    AgoRpc *appInstance;

    struct http_message* hm_req;
111
    Json::Value request;
112 113 114 115 116 117 118 119 120 121 122 123 124
    std::string error;

    fs::path filepath;

    void writeResponseData(struct mg_connection *conn);
    bool isResponseReady() { return (getResponseCode() != 0); }
    void onTimeout();
};

class FileUploadReqRep : public HttpReqJsonRep {
public:
    FileUploadReqRep(AgoRpc* _appInstance, struct http_message* _hm_req)
        : appInstance(_appInstance)
125
        , requests(Json::arrayValue)
126 127
    {
    }
128 129 130 131 132 133

    // Reading appInstance/requests should be safe without holding mutex.
    // Writing to response is NOT safe.

    AgoRpc *appInstance;

134
    Json::Value requests;
135
    void onTimeout();
136 137
};

138 139
class AgoRpc: public AgoApp {
private:
140
    std::map<std::string, Subscriber> subscriptions;
141
    boost::mutex mutexSubscriptions;
142

143 144 145 146 147 148 149 150
    AgoHttp agoHttp;


    bool getEventsFor(JsonRpcReqRep* reqRep, bool isTimeout=false);
    bool handleJsonRpcRequest(JsonRpcReqRep *reqRep, const Json::Value &request, Json::Value &responseRoot);
    bool handleJsonRpcRequests(boost::shared_ptr<JsonRpcReqRep> reqRep);

    boost::shared_ptr<JsonRpcReqRep> jsonrpc(struct mg_connection *conn, struct http_message *hm);
151

152 153 154 155 156
    boost::shared_ptr<HttpReqRep> downloadFile(struct mg_connection *conn, struct http_message *hm);
    void downloadFile_thread(boost::shared_ptr<FileDownloadReqRep> reqRep);

    boost::shared_ptr<HttpReqRep> uploadFiles(struct mg_connection *conn, struct http_message *hm);
    void uploadFile_thread(boost::shared_ptr<FileUploadReqRep> reqRep);
157

158
    void eventHandler(const std::string& subject , const Json::Value& content) ;
159

160 161
    void jsonrpc_message(JsonRpcReqRep* reqRep, boost::unique_lock<boost::mutex> &lock, const Json::Value& params, Json::Value& responseRoot);
    void jsonrpc_thread(boost::shared_ptr<JsonRpcReqRep> conn);
162

163
    void setupApp();
164 165

    void doShutdown();
166
    void cleanupApp();
167
public:
168
    AGOAPP_CONSTRUCTOR_HEAD(AgoRpc) {}
169

170
friend class JsonRpcReqRep;
171 172 173
};


174 175 176 177
static bool jsonrpcErrorResponse(Json::Value& responseRoot, int code, const std::string& message) {
    assert(responseRoot.isObject());
    if(!responseRoot.isMember("jsonrpc"))
        responseRoot["jsonrpc"] = "2.0";
178

179 180
    if(!responseRoot.isMember("id"))
        responseRoot["id"] = Json::Value();
181

182 183 184 185 186
    Json::Value error(Json::objectValue);
    error["code"] = code;
    error["message"] = message;
    responseRoot["error"] = error;
    return true;
187
}
188

189

190 191 192 193 194 195
// Lock shall be held on entry and exit, and should be released when not interacting with reqRep
// or responseRoot
void AgoRpc::jsonrpc_message(JsonRpcReqRep* reqRep, boost::unique_lock<boost::mutex> &lock, const Json::Value& params, Json::Value& responseRoot) {
    // prepare message
    const Json::Value& content = params["content"];
    const Json::Value& subject = params["subject"];
196

197
    const Json::Value& replytimeout = params["replytimeout"];
198
    std::chrono::milliseconds timeout(3000);
199
    if (replytimeout.isNumeric()) {
200
        timeout = std::chrono::milliseconds((long) (replytimeout.asFloat() * 1000.0));
201
    }
202

203
    //send message and handle response
204
    AGO_TRACE() << "JsonRPC Request on " << reqRep << ": " << content << "(timeout=" << timeout.count() << "ms : << "<<replytimeout<< ")";
205

206
    agocontrol::AgoResponse response;
Johan Ström's avatar
Johan Ström committed
207 208 209
    {
        // ReqRep must not be locked when blocking in another thread
        boost::reverse_lock<boost::unique_lock<boost::mutex>> unlock(lock);
210
        response = agoConnection->sendRequest(subject.asString(), content, timeout);
211
    }
212

213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
    AGO_TRACE() << "RPC Response: " << response.getResponse();

    Json::Value& remoteResponse(response.getResponse());
    if(response.isError()) {
        // Need to add "code" to make this a proper JsonRPC error.
        responseRoot["error"] = Json::Value(Json::objectValue);
        responseRoot["error"]["code"] = AGO_JSONRPC_COMMAND_ERROR;
        responseRoot["error"]["message"] = remoteResponse["error"]["message"];
        if(remoteResponse["error"].isMember("data")) {
            responseRoot["error"]["data"].swap(remoteResponse["error"]["data"]);
        }
        // This is not permitted in jsonrpc spec..
        responseRoot["error"]["identifier"] = remoteResponse["error"]["identifier"];
    } else {
        // Swap the result 1:1 from backend
        responseRoot["result"].swap(remoteResponse["result"]);
    }
230
}
231 232


233
/**
234 235
 * If handleJsonRpcRequests decided we could not be executed synchronously, it will
 * dispatch the rest of the work in a pooled thread:
236
 */
237 238 239 240 241 242 243 244 245 246 247
void AgoRpc::jsonrpc_thread(boost::shared_ptr<JsonRpcReqRep> reqRep) {
    AGO_TRACE() << "Entering jsonrcp thread for " << reqRep.get();
    boost::unique_lock<boost::mutex> lock(reqRep->mutex);
    // Execute any remote calls
    if (reqRep->jsonrpcRequest.isArray())
    {
        // Batch mode: array of events, find out which are "message"
        // Abort if response is ready (timeout)
        for (unsigned int i = 0; i< reqRep->jsonrpcRequest.size() && !reqRep->responseReady; i++) {
            if(reqRep->jsonrpcRequest[i]["method"] != "message")
                continue;
248

249
            const Json::Value& params = reqRep->jsonrpcRequest[i]["params"];
250

251 252 253 254 255 256 257 258 259 260
            Json::Value& responseRoot(reqRep->jsonResponse[i]);
            jsonrpc_message(reqRep.get(), lock, params, responseRoot);
        }
    }
    else
    {
        assert(reqRep->jsonrpcRequest["method"] == "message");
        const Json::Value& params = reqRep->jsonrpcRequest["params"];
        jsonrpc_message(reqRep.get(), lock, params, reqRep->jsonResponse);
    }
261

262 263
    // Should be done now
    reqRep->responseReady = true;
264

265 266
    // Wakeup mongoose main poll loop, it will POLL our client
    // and write the response.
Johan Ström's avatar
Johan Ström committed
267
    lock.unlock(); // Must NOT hold lock when calling wakup
268 269
    AGO_TRACE() << "Response stored, Exiting jsonrpc thread for " << reqRep.get();
    agoHttp.wakeup();
270 271
}

272 273 274 275 276 277 278 279

bool AgoRpc::getEventsFor(JsonRpcReqRep* reqRep, bool isTimeout) {
    // Do we have any event pending?
    boost::unique_lock<boost::mutex> lock(mutexSubscriptions);
    std::map<std::string, Subscriber>::iterator it = subscriptions.find(reqRep->subscriptionId);
    if (it == subscriptions.end()) {
        AGO_TRACE() << "getEventsFor " << reqRep <<": unknown subscription ID " <<reqRep->subscriptionId;
        return jsonrpcErrorResponse(reqRep->jsonResponse, JSONRPC_INVALID_PARAMS, "Invalid request, no subscription for uuid");
280
    }
281

282
    if(!(it->second.queue.empty())) {
283
        Json::Value event(std::move(it->second.queue.front()));
284 285 286 287
        it->second.queue.pop_front();
        lock.unlock();

        AGO_TRACE() << "getEventsFor " << reqRep <<": found event";
288

289
        // Write event
290
        reqRep->jsonResponse["result"].swap(event);
291 292 293 294 295 296 297 298 299
        return true;
    }

    // No event.
    // Is it time to tell client to quit anyway?
    if(isExitSignaled() || isTimeout) {
        AGO_TRACE() << "getEventsFor " << reqRep <<": timeout";
        return jsonrpcErrorResponse(reqRep->jsonResponse, AGO_JSONRPC_NO_EVENT, "No messages available");
    }
300

301
    AGO_TRACE() << "getEventsFor " << reqRep <<": no events (timeout=" << reqRep->timeout << ")";
302

303
    // Nothing to report yet.
304
    return false;
305 306
}

307 308
bool JsonRpcReqRep::isResponseReady() {
    return isResponseReady(false);
309 310
}

311 312 313 314
void JsonRpcReqRep::onTimeout() {
    // Triggers immediate write, unless already written.
    isResponseReady(true);
}
315

316 317
bool JsonRpcReqRep::isResponseReady(bool isTimeout) {
    if(subscriptionId.empty() || responseReady)
318
    {
319 320 321 322 323
        if(isTimeout)
            // Ensures an array of jsonrpc messages are not continued on
            // in jsonrpc_thread
            responseReady = true;
        return responseReady;
324 325
    }

326 327 328 329
    // If there is a pending getEvent call for this connection, this may set responseReady
    // with either an event, or timeout error.
    return appInstance->getEventsFor(this, isTimeout);
}
330

331 332
uint16_t JsonRpcReqRep::getTimeout() {
    return timeout;
333 334
}

335
/**
336 337 338 339
 * Try to execute this request, if it can be done locally.
 * If it requries a remote call, do nothing, it will be executed in background thread instead.
 *
 * request/response is passed explicitly here since we might be part of a batch.
340
 *
341 342 343 344
 * @param reqRep
 * @param request The particular request object we're parsing
 * @param responseRoot Json::Value object where result should be placed
 * @return True if process is finished and resonse is ready to send, else more processing is required.
345
 */
346 347 348
bool AgoRpc::handleJsonRpcRequest(JsonRpcReqRep *reqRep, const Json::Value &request, Json::Value &responseRoot) {
    assert(responseRoot.isObject());
    AGO_TRACE() << "JSONRPC request: " << request;
349

350 351
    if (!request.isObject())
        return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_REQUEST, "Invalid request, not an object");
352

353 354 355
    // If ID is missing, this is a notification and no response should be sent (unless error).
    if(!request.isMember("id")) {
        //... but we do not have any methods which are notifications..
356
        return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_REQUEST, "Invalid request, 'id' missing");
357 358
    }
    responseRoot["id"] = request["id"];
359

360 361 362 363
    // Version is required
    if(!request.isMember("jsonrpc") || request["jsonrpc"].asString() != "2.0")
        return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_REQUEST, "Invalid request, 'jsonrpc' unknown/missing");
    responseRoot["jsonrpc"] = request["jsonrpc"];
364

365 366 367 368
    // Method is required
    const Json::Value &methodV = request["method"];
    if(!methodV.isString())
        return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_REQUEST, "Invalid request, 'method' invalid/missing");
369

370
    const std::string method = methodV.asString();
371

372 373 374 375 376 377
    // Params may or may not be required depending on method
    if (method == "message" )
    {
        const Json::Value& params = request["params"];
        if (!params.isObject() || params.empty())
            return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_PARAMS, "Invalid request, 'params' invalid/missing");
378

379 380
        // This cannot be processed remote remote call
        return false;
381 382 383
    }
    else if (method == "subscribe")
    {
384
        // Local call possible
385
        const std::string subscriptionId = agocontrol::utils::generateUuid();
386 387
        if (subscriptionId == "")
            return jsonrpcErrorResponse(responseRoot, JSONRPC_INTERNAL_ERROR, "Failed to generate UUID");
388

389
        std::deque<Json::Value> empty;
390 391 392 393 394
        Subscriber subscriber;
        subscriber.lastAccess=time(0);
        subscriber.queue = empty;
        {
            boost::lock_guard<boost::mutex> lock(mutexSubscriptions);
395
            subscriptions[subscriptionId] = subscriber;
396
        }
397 398
        responseRoot["result"] = subscriptionId;
        return true;
399

400
    }
401
    else if (method == "unsubscribe" || method == "getevent")
402
    {
403 404 405
        const Json::Value& params = request["params"];
        if (!params.isObject() || params.empty())
            return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_PARAMS, "Invalid request, 'params' invalid/missing");
406

407 408 409
        const Json::Value& subscriptionId = params["uuid"];
        if (!subscriptionId.isString())
            return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_PARAMS, "Invalid request, param 'uuid' missing");
410

411 412
        if (method == "unsubscribe") {
            AGO_DEBUG() << "removing subscription: " << subscriptionId.asString();
413
            {
414 415 416 417 418
                boost::lock_guard <boost::mutex> lock(mutexSubscriptions);
                std::map<std::string, Subscriber>::iterator it = subscriptions.find(subscriptionId.asString());
                if (it != subscriptions.end()) {
                    subscriptions.erase(subscriptionId.asString());
                }
419 420
            }

421 422 423 424 425 426 427
            responseRoot["result"] = "success";
            return true;
        }else {
            // getevent
            if(!reqRep->jsonrpcRequest.isObject()) {
                return jsonrpcErrorResponse(responseRoot, JSONRPC_INVALID_REQUEST, "Invalid request, getevent is not batchable");
            }
428

429 430 431
            if(params.isMember("timeout"))
                // Custom timeout in seconds
                reqRep->timeout = params["timeout"].asDouble()*1000;
432

433
            reqRep->subscriptionId = subscriptionId.asString();
434

435 436
            // If we have pending events for this subscription, it returns immediately
            return getEventsFor(reqRep);
437 438 439
        }
    }

440
    return jsonrpcErrorResponse(responseRoot, JSONRPC_METHOD_NOT_FOUND, "Invalid request, method not found");
441 442
}

443
/**
444 445
 * Verify all JSONRPC requests in jsonrpcRequest, and determine if any requires a remote
 * call/background thread.
446
 *
447 448
 * @param reqRep
 * @return
449
 */
450 451 452 453 454 455 456 457 458 459 460 461
bool AgoRpc::handleJsonRpcRequests(boost::shared_ptr<JsonRpcReqRep> reqRep) {
    bool finished;
    if (reqRep->jsonrpcRequest.isArray())
    {
        // Batch, array of events
        reqRep->jsonResponse = Json::Value(Json::arrayValue);
        finished = true;
        for (unsigned int i = 0; i< reqRep->jsonrpcRequest.size(); i++) {
            reqRep->jsonResponse[i] = Json::Value(Json::objectValue);
            if(!handleJsonRpcRequest(reqRep.get(), reqRep->jsonrpcRequest[i], reqRep->jsonResponse[i])) {
                // Not finished; background work required
                finished = false;
462 463 464 465 466
            }
        }
    }
    else
    {
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
        reqRep->jsonResponse = Json::Value(Json::objectValue);
        finished = handleJsonRpcRequest(reqRep.get(), reqRep->jsonrpcRequest, reqRep->jsonResponse);
    }

    // Special polled case, when we have subscriptionId we do not need background thread.
    if(!finished && reqRep->subscriptionId.empty()) {
        threadPool().post(boost::bind(&AgoRpc::jsonrpc_thread, this, reqRep));
    }

    return finished;
}

/**
 * Prepare processing of one or more JSONRPC requests
 */
boost::shared_ptr<JsonRpcReqRep> AgoRpc::jsonrpc(struct mg_connection *conn, struct http_message *hm)
{
    boost::shared_ptr<JsonRpcReqRep> reqRep(new JsonRpcReqRep(this));
485 486 487 488
    Json::CharReaderBuilder builder;
    std::string errors;
    std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
    if ( !reader->parse(hm->body.p, hm->body.p + hm->body.len, &reqRep->jsonrpcRequest, &errors) ) {
489
        reqRep->jsonResponse = Json::Value(Json::objectValue);
490 491
        AGO_WARNING() << "Failed to parse JSON: " << errors;
        reqRep->responseReady = jsonrpcErrorResponse(reqRep->jsonResponse, JSONRPC_PARSE_ERROR, "Failed to parse JSON request:" + errors);
492 493
    } else {
        reqRep->responseReady = handleJsonRpcRequests(reqRep);
494 495
    }

496
    return reqRep;
Harald Klein's avatar
Harald Klein committed
497 498
}

499 500 501 502
/**
 * Upload files
 * @info source from https://github.com/cesanta/mongoose/blob/master/examples/upload.c
 */
503
boost::shared_ptr<HttpReqRep> AgoRpc::uploadFiles(struct mg_connection *conn, struct http_message *hm)
504
{
505 506 507 508
    boost::shared_ptr<FileUploadReqRep> reqRep(
            new FileUploadReqRep(this, hm)
        );

509
    const char *data;
510
    size_t data_len, ofs = 0;
511 512 513
    char var_name[100], file_name[100];
    std::string uuid = "";

514 515 516 517 518 519 520 521 522
#define FILE_UPLOAD_ERROR(error_message)  \
    response["error"] = Json::Value(Json::objectValue); \
    response["error"]["message"] = (error_message);

    // Index matches "requests"
    reqRep->jsonResponse["files"] = Json::Value(Json::arrayValue);

    // upload files
    while ((ofs = mg_parse_multipart(hm->body.p + ofs, hm->body.len - ofs, var_name, sizeof(var_name),
523 524
                    file_name, sizeof(file_name), &data, &data_len)) > 0)
    {
525 526 527 528 529 530 531 532 533
        if( strlen(file_name)>0 )
        {
            //check if uuid found
            if(uuid.size() == 0)
            {
                //no uuid found yet, drop file
                continue;
            }

534 535
            // One response per valid object, we put either result or error in this (as received from remote)
            Json::Value response(Json::objectValue);
536

537 538
            // at same index as in jsonrcpResponse, we have our request
            // this is the actual message sent to device
539
            Json::Value request;
540 541
            request["uuid"] = std::string(uuid);
            request["command"] = "uploadfile";
542

543 544 545 546 547 548 549 550 551 552 553 554 555
            // Sanitize filename, it should only be a filename.
            fs::path orig_fn(file_name);
            fs::path safe_fn = orig_fn.filename();
            if(std::string(file_name) != safe_fn.string()){
                AGO_ERROR() << "Rejecting file upload, unsafe path \"" << file_name << "\" ";
                FILE_UPLOAD_ERROR("Invalid filename");
            }else{
                response["name"] = safe_fn.string();

                // Save file to a temporary path
                fs::path tempfile = fs::path(UPLOAD_PATH) / fs::unique_path().replace_extension(safe_fn.extension());
                FILE* fp = fopen(tempfile.c_str(), "wb");
                if( !fp )
556
                {
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
                    std::string err(strerror(errno));
                    AGO_ERROR() << "Failed to open file " << tempfile.string() << " for writing: " << err;
                    FILE_UPLOAD_ERROR(std::string("Failed to open file: ") + err);
                }else {
                    request["filepath"] = tempfile.string();
                    request["filename"] = safe_fn.string();

                    AGO_DEBUG() << "Uploading file \"" << safe_fn.string() << "\" file to " << uuid << " via " << tempfile;
                    size_t written = fwrite(data, sizeof(char), data_len, fp);
                    fclose(fp);
                    if( written!=data_len )
                    {
                        //error writting file, drop it
                        fs::remove(tempfile);
                        AGO_ERROR() << "Uploaded file \"" << tempfile.string() << "\" not fully written (no space left?)";
                        FILE_UPLOAD_ERROR("Failed to write file, no space left?");
                    }else{
Johan Ström's avatar
Johan Ström committed
574
                        request["filesize"] = (Json::UInt64)data_len;
575 576
                        response["size"] = (Json::UInt64)data_len;
                    }
577
                }
578
            }
579

580
            reqRep->requests.append(request);
581
            reqRep->jsonResponse["files"].append(response);
582 583 584 585 586 587
        }
        else
        {
            //it's a posted value
            if( strcmp(var_name, "uuid")==0 )
            {
588
                uuid = std::string(data, data_len);
589 590 591 592
            }
        }
    }

593 594 595 596 597 598 599 600 601 602
#undef FILE_UPLOAD_ERROR

    // Dispatch remote request in background thread.
    threadPool().post(boost::bind(&AgoRpc::uploadFile_thread, this, reqRep));

    return reqRep;
}

void AgoRpc::uploadFile_thread(boost::shared_ptr<FileUploadReqRep> reqRep) {
    int i=0;
603 604 605
    for(auto it = reqRep->requests.begin(); it != reqRep->requests.end(); it++) {
        Json::Value &request(*it);
        std::string tempfile(request["filepath"].asString());
606 607 608 609 610 611 612 613
        AgoResponse r = agoConnection->sendRequest(request);
        if(r.isError())
            AGO_ERROR() << "Uploading file \"" << tempfile << "\" failed: " << r.getMessage();
        else if( r.isOk() )
            AGO_INFO() << "Uploading file " << request["filename"] << " was successful";

        boost::unique_lock<boost::mutex> lock(reqRep->mutex);
        // Copy full remote result 1:1, adds result or error.
614
        reqRep->jsonResponse["files"][i++].swap(r.getResponse());
615 616 617 618 619 620 621 622 623 624 625

        // delete file (it should be processed by sendcommand)
        //XXX: maybe a purge process could be interesting to implement
        fs::remove(tempfile);
    }

    boost::unique_lock<boost::mutex> lock(reqRep->mutex);
    reqRep->jsonResponse["count"] = i;
    reqRep->responseReady = true;

    AGO_TRACE() << "Leaving upload thread " << reqRep.get();
Johan Ström's avatar
Johan Ström committed
626
    lock.unlock(); // Must NOT hold lock when calling wakup
627 628 629 630 631 632
    agoHttp.wakeup();
}

void FileUploadReqRep::onTimeout() {
    setResponseCode(503);
    jsonResponse["error"] = "Backend timeout";
633
}
634

635
/**
636 637 638 639 640
 * downloadFile is implemented by sending a downloadfile message to the target device,
 * which returns a "filepath" response pointing to a file on the local filesystem.
 *
 * This is then served to the client.
 * TODO XXX: Without any filtering on what filepath is... can theoretically read anything.
641
 */
642
boost::shared_ptr<HttpReqRep> AgoRpc::downloadFile(struct mg_connection *conn, struct http_message *hm)
643
{
644 645 646 647 648
    boost::shared_ptr<FileDownloadReqRep> reqRep(
            new FileDownloadReqRep(this, hm)
        );

    // Verify parameters first, then dispatch to background thread.
649
    char param[1024];
650
    Json::Value content;
651

652 653 654
    // get params
    if( mg_get_http_var(&hm->query_string, "filename", param, sizeof(param)) > 0 )
        reqRep->request["filename"] = std::string(param);
655

656 657 658
    if( mg_get_http_var(&hm->query_string, "uuid", param, sizeof(param)) > 0 )
        reqRep->request["uuid"] = std::string(param);

659
    if( !reqRep->request.isMember("filename") || !reqRep->request.isMember("uuid") )
660 661 662
    {
        //missing parameters!
        AGO_ERROR() << "Download file, missing parameters. Nothing done";
663 664 665
        reqRep->setResponseCode(400);
        reqRep->error = "Invalid request parameters";
        return reqRep;
666 667
    }

668 669 670 671 672 673
    reqRep->request["command"] = "downloadfile";

    // Dispatch remote request in background thread.
    threadPool().post(boost::bind(&AgoRpc::downloadFile_thread, this, reqRep));

    return reqRep;
674 675
}

676 677 678 679
void AgoRpc::downloadFile_thread(boost::shared_ptr<FileDownloadReqRep> reqRep) {
    AGO_TRACE() << "Entering downloadFile_thread for " << reqRep.get();
    // send command
    AgoResponse r = agoConnection->sendRequest(reqRep->request);
680

681 682
    boost::unique_lock<boost::mutex> lock(reqRep->mutex);
    if( r.isOk() )
683
    {
684
        const Json::Value& response(r.getData());
685 686

        //command sent successfully
687
        if( response.isMember("filepath") && response["filepath"].asString().length()>0 )
688
        {
689 690
            // 404??
            // all seems valid
691
            reqRep->filepath = fs::path(response["filepath"].asString());
692 693
            AGO_DEBUG() << "Downloading file " << reqRep->filepath;
            reqRep->setResponseCode(200);
694 695 696
        }
        else
        {
697 698 699 700
            //invalid command response
            AGO_ERROR() << "Download file, sendCommand returned invalid response (need filepath)";
            reqRep->error = "Internal error";
            reqRep->setResponseCode(500);
701 702
        }
    }
703
    else
704
    {
705 706 707 708 709
        //command failed
        AGO_ERROR() << "Download file, sendCommand failed, unable to send file: " << r.getMessage();
        reqRep->error = r.getMessage();
        reqRep->setResponseCode(500);
    }
710

711 712
    AGO_TRACE() << "Leaving downloadFile_thread for " << reqRep.get();
    // will trigger writeResponseData from mongoose thread.
Johan Ström's avatar
Johan Ström committed
713
    lock.unlock(); // Must NOT hold lock when calling wakup
714 715
    agoHttp.wakeup();
}
716

717 718 719 720 721 722
void FileDownloadReqRep::writeResponseData(struct mg_connection *conn) {
    if(getResponseCode() != 200) {
        AGO_TRACE() << "Writing " << this << " error response: " << error;
        mg_send_head(conn, getResponseCode(), error.size(), "Content-Type: text/plain");
        mg_send(conn, error.c_str(), error.size());
        return;
723 724
    }

725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
    std::stringstream headers;
    headers << "Content-Disposition: attachment; filename="
        // .filename() returns filename within ""
        << filepath.filename();

    AGO_TRACE() << "Responding with file " << filepath;
    // XXX: We do not have any mime info.
    // XXX: Danger danger, filepath can be anything given by remote agoapp
    mg_http_serve_file(conn, hm_req, filepath.c_str(),
            mg_mk_str("application/octet-stream"),
            mg_mk_str(headers.str().c_str()));
}

void FileDownloadReqRep::onTimeout() {
    setResponseCode(503);
    error = "Backend timeout";
741 742
}

Tang's avatar
Tang committed
743 744 745
/**
 * Agoclient event handler
 */
746
void AgoRpc::eventHandler(const std::string& subject, const Json::Value& content_) {
747
    // don't flood clients with unneeded events
748
    if (subject == "event.environment.timechanged" || subject == "event.device.discover") {
749
        return;
750
    }
751

752 753 754 755 756
    // Create clone which we can mutate.
    Json::Value content(content_);

    // remove (empty) command from content
    content.removeMember("command");
757

758
    // prepare event content
759
    content["event"] = subject;
760 761
    if (subject.find("event.environment.") != std::string::npos && subject.find("changed") != std::string::npos) {
        std::string quantity = subject;
762 763 764
        replaceString(quantity, "event.environment.", "");
        replaceString(quantity, "changed", "");
        content["quantity"] = quantity;
765 766
    } else if (subject == "event.device.batterylevelchanged") {
        std::string quantity = subject;
767 768 769 770 771 772
        replaceString(quantity, "event.device.", "");
        replaceString(quantity, "changed", "");
        content["quantity"] = quantity;
    }

    {
773
        boost::lock_guard <boost::mutex> lock(mutexSubscriptions);
774
        //AGO_TRACE() << "Incoming notify: " << content;
775
        for (std::map<std::string, Subscriber>::iterator it = subscriptions.begin(); it != subscriptions.end();) {
776 777
            if (it->second.queue.size() > 100) {
                // this subscription seems to be abandoned, let's remove it to save resources
778
                AGO_INFO() << "removing subscription as the queue size exceeds limits: " << it->first;
779 780 781 782 783 784 785 786
                subscriptions.erase(it++);
            } else {
                it->second.queue.push_back(content);
                ++it;
            }
        }
    }

787 788
    // Wakeup sleeping sockets, any event subscribers will see their updated queue.
    agoHttp.wakeup();
Tang's avatar
Tang committed
789 790
}

791
void AgoRpc::setupApp() {
792
    std::string ports_cfg;
793 794
    fs::path htdocs;
    fs::path certificate;
795
    std::string domainname;
796 797

    //get parameters
798
    ports_cfg = getConfigOption("ports", "8008,8009s");
799 800 801
    htdocs = getConfigOption("htdocs", fs::path(BOOST_PP_STRINGIZE(DEFAULT_HTMLDIR)));
    certificate = getConfigOption("certificate", getConfigPath("/rpc/rpc_cert.pem"));
    domainname = getConfigOption("domainname", "agocontrol");
802

803 804 805 806 807 808 809 810 811 812 813 814 815
    agoHttp.setDocumentRoot(htdocs.string());
    agoHttp.setAuthDomain(domainname);

    // Expose any custom python extra include paths via MONGOOSE_CGI env vars
    setenv("MONGOOSE_CGI", getConfigOption("python_extra_paths", "").c_str(), 1);

    // Parse bindings/ports
    typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
    boost::char_separator<char> sep(", ");
    tokenizer tok(ports_cfg, sep);
    for(tokenizer::iterator gen=tok.begin(); gen != tok.end(); ++gen) {
        std::string addr(*gen);
        if(addr[addr.length() -1] == 's') {
816
#if MG_ENABLE_SSL
817 818
            addr.assign(addr, 0, addr.length()-1);
            agoHttp.addBinding(addr, certificate);
819 820 821
#else
            throw ConfigurationError("Not compiled with https support, cannot use binding "+addr);
#endif
822 823
        }else
            agoHttp.addBinding(addr);
824 825 826 827
    }

    fs::path authPath = htdocs / HTPASSWD;
    if( fs::exists(authPath) )
828
        agoHttp.setAuthFile(authPath);
829 830 831
    else
        AGO_INFO() << "Disabling authentication: file does not exist";

832 833 834
    agoHttp.addHandler("/jsonrpc", boost::bind(&AgoRpc::jsonrpc, this, _1, _2));
    agoHttp.addHandler("/upload", boost::bind(&AgoRpc::uploadFiles, this, _1, _2));
    agoHttp.addHandler("/download", boost::bind(&AgoRpc::downloadFile, this, _1, _2));
835

836 837 838 839
    try {
        agoHttp.start();
    }catch(const std::runtime_error &err) {
        throw ConfigurationError(err.what());
840 841 842
    }

    addEventHandler();
843
}
844

845 846 847 848 849
void AgoRpc::doShutdown() {
    agoHttp.shutdown();
    AgoApp::doShutdown();
}

850
void AgoRpc::cleanupApp() {
851 852
    // Wait for Http to close and cleanup
    agoHttp.close();
853
}
854 855 856

AGOAPP_ENTRY_POINT(AgoRpc);