summaryrefslogtreecommitdiff
path: root/plugins/websocket/websocketsource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websocket/websocketsource.cpp')
-rw-r--r--plugins/websocket/websocketsource.cpp80
1 files changed, 55 insertions, 25 deletions
diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp
index f4b2ff9e..a52cdb18 100644
--- a/plugins/websocket/websocketsource.cpp
+++ b/plugins/websocket/websocketsource.cpp
@@ -150,18 +150,18 @@ UniquePropertyCache properties;
static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
static struct libwebsocket_protocols protocols[] = {
- {
- "http-only",
- callback_http_only,
- 0,
- 128,
- },
- { /* end of list */
- NULL,
- NULL,
- 0,
- 0
- }
+{
+ "http-only",
+ callback_http_only,
+ 0,
+ 128,
+},
+{ /* end of list */
+ NULL,
+ NULL,
+ 0,
+ 0
+}
};
//Called when a client connects, subscribes, or unsubscribes.
@@ -333,7 +333,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
//printf("Connection closed!\n");
break;
- //case LWS_CALLBACK_PROTOCOL_INIT:
+ //case LWS_CALLBACK_PROTOCOL_INIT:
case LWS_CALLBACK_CLIENT_ESTABLISHED:
{
//This happens when a client initally connects. We need to request the support event types.
@@ -369,22 +369,45 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
manager->expectedMessageFrames = 0;
}
- QJsonDocument doc;
+ DebugOut(7) << "data received: " << d.data() << endl;
- if(doBinary)
- doc = QJsonDocument::fromBinaryData(d);
- else
+ int start = d.indexOf("{");
+
+ if(manager->incompleteMessage.isEmpty() && start > 0)
{
- doc = QJsonDocument::fromJson(d);
- DebugOut(7)<<d.data()<<endl;
+ DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl;
+ d = d.right(start-1);
+ }
+
+
+ int end = d.lastIndexOf("}");
+
+ if(end == -1)
+ {
+ manager->incompleteMessage += d;
+ break;
}
+ QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
+
+ DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
+
+ QJsonDocument doc;
+
+ QJsonParseError parseError;
+
+ doc = QJsonDocument::fromJson(tryMessage, &parseError);
+
if(doc.isNull())
{
- DebugOut(DebugOut::Warning)<<"Invalid message"<<endl;
+ DebugOut(7) << "Invalid or incomplete message" << endl;
+ DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
+ manager->incompleteMessage += d;
break;
}
+ manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
+
QVariantMap call = doc.toVariant().toMap();
string type = call["type"].toString().toStdString();
@@ -457,6 +480,12 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
+ double serverTime = call["systemTime"].toDouble();
+
+ DebugOut() << "Server time is: " << serverTime << endl;
+
+ if(serverTime)
+ source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
Q_FOREACH(QVariant p, supported)
{
@@ -484,7 +513,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
std::string name = obj["property"].toString().toStdString();
std::string value = obj["value"].toString().toStdString();
- double timestamp = obj["timestamp"].toDouble();
+ double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
int sequence = obj["sequence"].toInt();
AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
@@ -620,7 +649,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
break;
}
- return 0;
+ return 0;
}
}
void WebSocketSource::updateSupported()
@@ -635,7 +664,8 @@ void WebSocketSource::updateSupported()
m_re->updateSupported(list, PropertyList(), this);
}
-WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
+ serverTimeOffset(0)
{
m_sslEnabled = false;
clientConnected = false;
@@ -738,8 +768,8 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
replyvar["type"] = "method";
replyvar["name"] = "getRanged";
replyvar["transactionid"] = uuid.c_str();
- replyvar["timeBegin"] = reply->timeBegin;
- replyvar["timeEnd"] = reply->timeEnd;
+ replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
+ replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
replyvar["sequenceBegin"] = reply->sequenceBegin;
replyvar["sequenceEnd"] = reply->sequenceEnd;