diff options
Diffstat (limited to 'plugins/websocket/websocketsource.cpp')
-rw-r--r-- | plugins/websocket/websocketsource.cpp | 80 |
1 files changed, 55 insertions, 25 deletions
diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp index f4b2ff9e..a52cdb18 100644 --- a/plugins/websocket/websocketsource.cpp +++ b/plugins/websocket/websocketsource.cpp @@ -150,18 +150,18 @@ UniquePropertyCache properties; static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len); static struct libwebsocket_protocols protocols[] = { - { - "http-only", - callback_http_only, - 0, - 128, - }, - { /* end of list */ - NULL, - NULL, - 0, - 0 - } +{ + "http-only", + callback_http_only, + 0, + 128, +}, +{ /* end of list */ + NULL, + NULL, + 0, + 0 +} }; //Called when a client connects, subscribes, or unsubscribes. @@ -333,7 +333,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket //printf("Connection closed!\n"); break; - //case LWS_CALLBACK_PROTOCOL_INIT: + //case LWS_CALLBACK_PROTOCOL_INIT: case LWS_CALLBACK_CLIENT_ESTABLISHED: { //This happens when a client initally connects. We need to request the support event types. @@ -369,22 +369,45 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket manager->expectedMessageFrames = 0; } - QJsonDocument doc; + DebugOut(7) << "data received: " << d.data() << endl; - if(doBinary) - doc = QJsonDocument::fromBinaryData(d); - else + int start = d.indexOf("{"); + + if(manager->incompleteMessage.isEmpty() && start > 0) { - doc = QJsonDocument::fromJson(d); - DebugOut(7)<<d.data()<<endl; + DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl; + d = d.right(start-1); + } + + + int end = d.lastIndexOf("}"); + + if(end == -1) + { + manager->incompleteMessage += d; + break; } + QByteArray tryMessage = manager->incompleteMessage + d.left(end+1); + + DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl; + + QJsonDocument doc; + + QJsonParseError parseError; + + doc = QJsonDocument::fromJson(tryMessage, &parseError); + if(doc.isNull()) { - DebugOut(DebugOut::Warning)<<"Invalid message"<<endl; + DebugOut(7) << "Invalid or incomplete message" << endl; + DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl; + manager->incompleteMessage += d; break; } + manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end); + QVariantMap call = doc.toVariant().toMap(); string type = call["type"].toString().toStdString(); @@ -457,6 +480,12 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl; + double serverTime = call["systemTime"].toDouble(); + + DebugOut() << "Server time is: " << serverTime << endl; + + if(serverTime) + source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime; Q_FOREACH(QVariant p, supported) { @@ -484,7 +513,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket std::string name = obj["property"].toString().toStdString(); std::string value = obj["value"].toString().toStdString(); - double timestamp = obj["timestamp"].toDouble(); + double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset; int sequence = obj["sequence"].toInt(); AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value); @@ -620,7 +649,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket break; } - return 0; + return 0; } } void WebSocketSource::updateSupported() @@ -635,7 +664,8 @@ void WebSocketSource::updateSupported() m_re->updateSupported(list, PropertyList(), this); } -WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0) +WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0), + serverTimeOffset(0) { m_sslEnabled = false; clientConnected = false; @@ -738,8 +768,8 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) replyvar["type"] = "method"; replyvar["name"] = "getRanged"; replyvar["transactionid"] = uuid.c_str(); - replyvar["timeBegin"] = reply->timeBegin; - replyvar["timeEnd"] = reply->timeEnd; + replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset; + replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset; replyvar["sequenceBegin"] = reply->sequenceBegin; replyvar["sequenceEnd"] = reply->sequenceEnd; |