summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevron Rees <kevron.m.rees@intel.com>2015-01-14 15:39:56 -0800
committerKevron Rees <kevron.m.rees@intel.com>2015-01-14 15:41:09 -0800
commit089cb07fa2e4920a3e2c04df537ce004fd6a529e (patch)
treef1b05f8b21797bb3f2fb7bc7589aaa74580326ec
parent40c7d508692ec158927df3f5e2e67800f9556370 (diff)
downloadautomotive-message-broker-089cb07fa2e4920a3e2c04df537ce004fd6a529e.tar.gz
synchronize time between server and client
-rw-r--r--plugins/websocket/protocol.idl5
-rw-r--r--plugins/websocket/websocketsinkmanager.cpp1
-rw-r--r--plugins/websocket/websocketsource.cpp80
-rw-r--r--plugins/websocket/websocketsource.h6
4 files changed, 67 insertions, 25 deletions
diff --git a/plugins/websocket/protocol.idl b/plugins/websocket/protocol.idl
index cb24cf1e..18b46a3e 100644
--- a/plugins/websocket/protocol.idl
+++ b/plugins/websocket/protocol.idl
@@ -216,6 +216,11 @@ interface GetSupportedReply : BaseMessage {
const DOMString name = "getSupported";
/*!
+ * \brief systemTime of the other system. Used to synchronize time
+ */
+ attribute double systemTime;
+
+ /*!
* \brief data - array of properties supported by the system
*/
attribute Property[] data;
diff --git a/plugins/websocket/websocketsinkmanager.cpp b/plugins/websocket/websocketsinkmanager.cpp
index a371f273..c8e8d8ad 100644
--- a/plugins/websocket/websocketsinkmanager.cpp
+++ b/plugins/websocket/websocketsinkmanager.cpp
@@ -562,6 +562,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
reply["name"] = "getSupported";
reply["transactionid"] = id.c_str();
reply["data"] = list;
+ reply["systemTime"] = amb::Timestamp::instance()->epochTime();
lwsWriteVariant(wsi, reply);
}
diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp
index f4b2ff9e..a52cdb18 100644
--- a/plugins/websocket/websocketsource.cpp
+++ b/plugins/websocket/websocketsource.cpp
@@ -150,18 +150,18 @@ UniquePropertyCache properties;
static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
static struct libwebsocket_protocols protocols[] = {
- {
- "http-only",
- callback_http_only,
- 0,
- 128,
- },
- { /* end of list */
- NULL,
- NULL,
- 0,
- 0
- }
+{
+ "http-only",
+ callback_http_only,
+ 0,
+ 128,
+},
+{ /* end of list */
+ NULL,
+ NULL,
+ 0,
+ 0
+}
};
//Called when a client connects, subscribes, or unsubscribes.
@@ -333,7 +333,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
//printf("Connection closed!\n");
break;
- //case LWS_CALLBACK_PROTOCOL_INIT:
+ //case LWS_CALLBACK_PROTOCOL_INIT:
case LWS_CALLBACK_CLIENT_ESTABLISHED:
{
//This happens when a client initally connects. We need to request the support event types.
@@ -369,22 +369,45 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
manager->expectedMessageFrames = 0;
}
- QJsonDocument doc;
+ DebugOut(7) << "data received: " << d.data() << endl;
- if(doBinary)
- doc = QJsonDocument::fromBinaryData(d);
- else
+ int start = d.indexOf("{");
+
+ if(manager->incompleteMessage.isEmpty() && start > 0)
{
- doc = QJsonDocument::fromJson(d);
- DebugOut(7)<<d.data()<<endl;
+ DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl;
+ d = d.right(start-1);
+ }
+
+
+ int end = d.lastIndexOf("}");
+
+ if(end == -1)
+ {
+ manager->incompleteMessage += d;
+ break;
}
+ QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
+
+ DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
+
+ QJsonDocument doc;
+
+ QJsonParseError parseError;
+
+ doc = QJsonDocument::fromJson(tryMessage, &parseError);
+
if(doc.isNull())
{
- DebugOut(DebugOut::Warning)<<"Invalid message"<<endl;
+ DebugOut(7) << "Invalid or incomplete message" << endl;
+ DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
+ manager->incompleteMessage += d;
break;
}
+ manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
+
QVariantMap call = doc.toVariant().toMap();
string type = call["type"].toString().toStdString();
@@ -457,6 +480,12 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
+ double serverTime = call["systemTime"].toDouble();
+
+ DebugOut() << "Server time is: " << serverTime << endl;
+
+ if(serverTime)
+ source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
Q_FOREACH(QVariant p, supported)
{
@@ -484,7 +513,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
std::string name = obj["property"].toString().toStdString();
std::string value = obj["value"].toString().toStdString();
- double timestamp = obj["timestamp"].toDouble();
+ double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
int sequence = obj["sequence"].toInt();
AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
@@ -620,7 +649,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
break;
}
- return 0;
+ return 0;
}
}
void WebSocketSource::updateSupported()
@@ -635,7 +664,8 @@ void WebSocketSource::updateSupported()
m_re->updateSupported(list, PropertyList(), this);
}
-WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
+ serverTimeOffset(0)
{
m_sslEnabled = false;
clientConnected = false;
@@ -738,8 +768,8 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
replyvar["type"] = "method";
replyvar["name"] = "getRanged";
replyvar["transactionid"] = uuid.c_str();
- replyvar["timeBegin"] = reply->timeBegin;
- replyvar["timeEnd"] = reply->timeEnd;
+ replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
+ replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
replyvar["sequenceBegin"] = reply->sequenceBegin;
replyvar["sequenceEnd"] = reply->sequenceEnd;
diff --git a/plugins/websocket/websocketsource.h b/plugins/websocket/websocketsource.h
index 4af23ba2..966fee3d 100644
--- a/plugins/websocket/websocketsource.h
+++ b/plugins/websocket/websocketsource.h
@@ -62,6 +62,12 @@ public:
int expectedMessageFrames;
PropertyInfo getPropertyInfo(const VehicleProperty::Property & property);
+
+ /*!
+ * \brief serverTimeOffset offset between server time and local time
+ */
+ double serverTimeOffset;
+private:
};
#endif // WEBSOCKETSOURCE_H