diff options
author | Kevron Rees <kevron.m.rees@intel.com> | 2015-01-14 15:39:56 -0800 |
---|---|---|
committer | Kevron Rees <kevron.m.rees@intel.com> | 2015-01-14 15:41:09 -0800 |
commit | 089cb07fa2e4920a3e2c04df537ce004fd6a529e (patch) | |
tree | f1b05f8b21797bb3f2fb7bc7589aaa74580326ec | |
parent | 40c7d508692ec158927df3f5e2e67800f9556370 (diff) | |
download | automotive-message-broker-089cb07fa2e4920a3e2c04df537ce004fd6a529e.tar.gz |
synchronize time between server and client
-rw-r--r-- | plugins/websocket/protocol.idl | 5 | ||||
-rw-r--r-- | plugins/websocket/websocketsinkmanager.cpp | 1 | ||||
-rw-r--r-- | plugins/websocket/websocketsource.cpp | 80 | ||||
-rw-r--r-- | plugins/websocket/websocketsource.h | 6 |
4 files changed, 67 insertions, 25 deletions
diff --git a/plugins/websocket/protocol.idl b/plugins/websocket/protocol.idl index cb24cf1e..18b46a3e 100644 --- a/plugins/websocket/protocol.idl +++ b/plugins/websocket/protocol.idl @@ -216,6 +216,11 @@ interface GetSupportedReply : BaseMessage { const DOMString name = "getSupported"; /*! + * \brief systemTime of the other system. Used to synchronize time + */ + attribute double systemTime; + + /*! * \brief data - array of properties supported by the system */ attribute Property[] data; diff --git a/plugins/websocket/websocketsinkmanager.cpp b/plugins/websocket/websocketsinkmanager.cpp index a371f273..c8e8d8ad 100644 --- a/plugins/websocket/websocketsinkmanager.cpp +++ b/plugins/websocket/websocketsinkmanager.cpp @@ -562,6 +562,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb reply["name"] = "getSupported"; reply["transactionid"] = id.c_str(); reply["data"] = list; + reply["systemTime"] = amb::Timestamp::instance()->epochTime(); lwsWriteVariant(wsi, reply); } diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp index f4b2ff9e..a52cdb18 100644 --- a/plugins/websocket/websocketsource.cpp +++ b/plugins/websocket/websocketsource.cpp @@ -150,18 +150,18 @@ UniquePropertyCache properties; static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len); static struct libwebsocket_protocols protocols[] = { - { - "http-only", - callback_http_only, - 0, - 128, - }, - { /* end of list */ - NULL, - NULL, - 0, - 0 - } +{ + "http-only", + callback_http_only, + 0, + 128, +}, +{ /* end of list */ + NULL, + NULL, + 0, + 0 +} }; //Called when a client connects, subscribes, or unsubscribes. @@ -333,7 +333,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket //printf("Connection closed!\n"); break; - //case LWS_CALLBACK_PROTOCOL_INIT: + //case LWS_CALLBACK_PROTOCOL_INIT: case LWS_CALLBACK_CLIENT_ESTABLISHED: { //This happens when a client initally connects. We need to request the support event types. @@ -369,22 +369,45 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket manager->expectedMessageFrames = 0; } - QJsonDocument doc; + DebugOut(7) << "data received: " << d.data() << endl; - if(doBinary) - doc = QJsonDocument::fromBinaryData(d); - else + int start = d.indexOf("{"); + + if(manager->incompleteMessage.isEmpty() && start > 0) { - doc = QJsonDocument::fromJson(d); - DebugOut(7)<<d.data()<<endl; + DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl; + d = d.right(start-1); + } + + + int end = d.lastIndexOf("}"); + + if(end == -1) + { + manager->incompleteMessage += d; + break; } + QByteArray tryMessage = manager->incompleteMessage + d.left(end+1); + + DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl; + + QJsonDocument doc; + + QJsonParseError parseError; + + doc = QJsonDocument::fromJson(tryMessage, &parseError); + if(doc.isNull()) { - DebugOut(DebugOut::Warning)<<"Invalid message"<<endl; + DebugOut(7) << "Invalid or incomplete message" << endl; + DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl; + manager->incompleteMessage += d; break; } + manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end); + QVariantMap call = doc.toVariant().toMap(); string type = call["type"].toString().toStdString(); @@ -457,6 +480,12 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl; + double serverTime = call["systemTime"].toDouble(); + + DebugOut() << "Server time is: " << serverTime << endl; + + if(serverTime) + source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime; Q_FOREACH(QVariant p, supported) { @@ -484,7 +513,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket std::string name = obj["property"].toString().toStdString(); std::string value = obj["value"].toString().toStdString(); - double timestamp = obj["timestamp"].toDouble(); + double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset; int sequence = obj["sequence"].toInt(); AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value); @@ -620,7 +649,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket break; } - return 0; + return 0; } } void WebSocketSource::updateSupported() @@ -635,7 +664,8 @@ void WebSocketSource::updateSupported() m_re->updateSupported(list, PropertyList(), this); } -WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0) +WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0), + serverTimeOffset(0) { m_sslEnabled = false; clientConnected = false; @@ -738,8 +768,8 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) replyvar["type"] = "method"; replyvar["name"] = "getRanged"; replyvar["transactionid"] = uuid.c_str(); - replyvar["timeBegin"] = reply->timeBegin; - replyvar["timeEnd"] = reply->timeEnd; + replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset; + replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset; replyvar["sequenceBegin"] = reply->sequenceBegin; replyvar["sequenceEnd"] = reply->sequenceEnd; diff --git a/plugins/websocket/websocketsource.h b/plugins/websocket/websocketsource.h index 4af23ba2..966fee3d 100644 --- a/plugins/websocket/websocketsource.h +++ b/plugins/websocket/websocketsource.h @@ -62,6 +62,12 @@ public: int expectedMessageFrames; PropertyInfo getPropertyInfo(const VehicleProperty::Property & property); + + /*! + * \brief serverTimeOffset offset between server time and local time + */ + double serverTimeOffset; +private: }; #endif // WEBSOCKETSOURCE_H |