diff options
author | Kevron Rees <kevron.m.rees@intel.com> | 2014-01-02 16:18:47 -0800 |
---|---|---|
committer | Kevron Rees <kevron.m.rees@intel.com> | 2014-01-02 16:18:47 -0800 |
commit | ff5290afca78664da71444582b9d8bff9965f185 (patch) | |
tree | 109da86642e5bee132adbb7c2becc70dce196d82 | |
parent | 2884fe08bf62b414d4c74727fc282e2a82f94fb9 (diff) | |
download | automotive-message-broker-binarywebsockets.tar.gz |
add option to use json or binary protocolbinarywebsockets
-rw-r--r-- | TODO | 3 | ||||
-rw-r--r-- | examples/configwebsocketsink | 3 | ||||
-rw-r--r-- | examples/configwebsocketsource | 3 | ||||
-rw-r--r-- | plugins/opencvlux/CMakeLists.txt | 3 | ||||
-rw-r--r-- | plugins/websocket/test/test.js | 25 | ||||
-rw-r--r-- | plugins/websocket/websocketsink.cpp | 34 | ||||
-rw-r--r-- | plugins/websocket/websocketsink.h | 3 | ||||
-rw-r--r-- | plugins/websocket/websocketsinkmanager.cpp | 77 | ||||
-rw-r--r-- | plugins/websocket/websocketsource.cpp | 90 |
9 files changed, 180 insertions, 61 deletions
@@ -1,3 +1,6 @@ +For 0.12 + +- Rewrite websocket test html For 0.11 diff --git a/examples/configwebsocketsink b/examples/configwebsocketsink index 9f55d203..2f95e479 100644 --- a/examples/configwebsocketsink +++ b/examples/configwebsocketsink @@ -12,7 +12,8 @@ "path" : "/usr/lib/automotive-message-broker/websocketsinkplugin.so", "interface" : "lo", "ssl" : "false", - "port" : "23000" + "port" : "23000", + "binaryProtocol" : "true" } ] } diff --git a/examples/configwebsocketsource b/examples/configwebsocketsource index 5ead5d17..25a80886 100644 --- a/examples/configwebsocketsource +++ b/examples/configwebsocketsource @@ -5,7 +5,8 @@ "path" : "/usr/lib/automotive-message-broker/websocketsourceplugin.so", "port" : "23000", "ssl" : "false", - "ip" : "127.0.0.1" + "ip" : "127.0.0.1", + "binaryProtocol" : "true" } ], "sinks": [ diff --git a/plugins/opencvlux/CMakeLists.txt b/plugins/opencvlux/CMakeLists.txt index 64a8d656..1d93e6c6 100644 --- a/plugins/opencvlux/CMakeLists.txt +++ b/plugins/opencvlux/CMakeLists.txt @@ -8,9 +8,6 @@ else(OpenCV_LIBS) message(FATAL_ERROR "opencv missing. please install opencv") endif(OpenCV_LIBS) -find_package(Boost COMPONENTS thread REQUIRED) -find_package(Threads REQUIRED) - #find opencv ocl headers: find_path(ocl ocl.hpp PATH_SUFFIXES opencv/ocl opencv2/ocl DOC "opencv ocl headers") diff --git a/plugins/websocket/test/test.js b/plugins/websocket/test/test.js index 90106f21..9d54cc66 100644 --- a/plugins/websocket/test/test.js +++ b/plugins/websocket/test/test.js @@ -172,9 +172,9 @@ function eventListener(e) { } } -function subscribe(eventlist) { - var zoneList = getZone(eventlist); - window.vehicle.subscribe(eventlist, zoneList, +function subscribe(event) { + var zoneList = getZone(event); + window.vehicle.subscribe(event, zoneList, function(data) { PRINT.pass("Subscribe success for: " + data); for (var i = 0; i < data.length; i++) { @@ -191,14 +191,13 @@ function subscribe(eventlist) { ); } -function unsubscribe(eventlist, zoneList) { - zoneList = getZone(eventlist); +function unsubscribe(event, zoneList) { + zoneList = getZone(event); /* kill the handers first, so even if the service fails to acknowledge */ /* we've stopped listening */ - for (var i = 0; i < eventlist.length; i++) { - document.removeEventListener(eventlist[i], eventListener, false); - } - window.vehicle.unsubscribe(eventlist, zoneList, + document.removeEventListener(event, eventListener, false); + + window.vehicle.unsubscribe(event, zoneList, function(data) { PRINT.pass("Unsubscribe success for: " + data); for (var i = 0; i < data.length; i++) { @@ -254,12 +253,12 @@ function select(elem) { return; var idx = selected.indexOf(name); - if (elem.className == "propinfo") { + if (elem.className === "propinfo") { if (idx < 0) { selected[selected.length] = name; } elem.className = "propinfo select"; - } else if (elem.className == "propinfo select") { + } else if (elem.className === "propinfo select") { if (idx >= 0) { selected.splice(idx, 1); } @@ -279,8 +278,8 @@ function start(msg) { var part = ['<div class="proptest"><div class="propinfo" onclick=select(this)>', '</div><div class="buttons"><div class="testbutton types" onclick=getTypes("', '")></div><div id="', - '_subscribe" class="testbutton subscribe" onclick=subscribe(["', - '"])></div><div id="', + '_subscribe" class="testbutton subscribe" onclick=subscribe("', + '")></div><div id="', '_unsubscribe" class="testbutton unsubscribe" onclick=unsubscribe(["', '"])></div><div class="testbutton get" onclick=getValue(["', '"])></div><div class="testbutton set" onclick=setValue(["', diff --git a/plugins/websocket/websocketsink.cpp b/plugins/websocket/websocketsink.cpp index 4a7ff288..37f88e39 100644 --- a/plugins/websocket/websocketsink.cpp +++ b/plugins/websocket/websocketsink.cpp @@ -33,15 +33,27 @@ #include <QJsonDocument> #include <QVariantMap> +bool WebSocketSink::doBinary = false; + +// libwebsocket_write helper function static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len) { - /*std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); - - char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; - strcpy(buf, strToWrite.c_str()); -*/ - //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used - return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); + int retval = -1; + + if(WebSocketSink::doBinary) + { + retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); + } + else + { + std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]); + char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; + strcpy(buf, strToWrite); + + retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT); + } + + return retval; } @@ -75,7 +87,11 @@ void WebSocketSink::propertyChanged(AbstractPropertyType *value) reply["name"]=property.c_str(); reply["transactionid"]=m_uuid.c_str(); - QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + QByteArray replystr; + if(WebSocketSink::doBinary) + replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(reply).toJson(); lwsWrite(m_wsi, replystr.data(),replystr.length()); } @@ -91,3 +107,5 @@ PropertyList WebSocketSink::subscriptions() return PropertyList(); } +/// 6% and 4% cpu with json +/// 5% and 4% with binary diff --git a/plugins/websocket/websocketsink.h b/plugins/websocket/websocketsink.h index 94d4b695..68d5cf1a 100644 --- a/plugins/websocket/websocketsink.h +++ b/plugins/websocket/websocketsink.h @@ -34,6 +34,9 @@ public: void supportedChanged(PropertyList supportedProperties); PropertyList subscriptions(); libwebsocket *socket() { return m_wsi; } + + static bool doBinary; + private: char *webSocketBuffer; string m_amdbproperty; diff --git a/plugins/websocket/websocketsinkmanager.cpp b/plugins/websocket/websocketsinkmanager.cpp index 1fbe4b6b..1cb7bacb 100644 --- a/plugins/websocket/websocketsinkmanager.cpp +++ b/plugins/websocket/websocketsinkmanager.cpp @@ -41,16 +41,27 @@ WebSocketSinkManager *sinkManager; static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len); bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data); +static bool doBinary = false; + // libwebsocket_write helper function static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len) { - /*std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); + int retval = -1; - char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; - strcpy(buf, strToWrite.c_str()); -*/ - //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used - return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); + if(doBinary) + { + retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); + } + else + { + std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]); + char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; + strcpy(buf, strToWrite); + + retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT); + } + + return retval; } WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config) @@ -58,6 +69,12 @@ WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<st m_engine = engine; + if(config.find("binaryProtocol") != config.end()) + { + doBinary = config["binaryProtocol"] == "true"; + WebSocketSink::doBinary = doBinary; + } + //Create a listening socket on port 23000 on localhost. @@ -177,7 +194,12 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper replyvar["data"]= data; replyvar["transactionid"]=id.c_str(); - QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(replyvar).toJson(); lwsWrite(socket, replystr.data(), replystr.length()); @@ -217,7 +239,12 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Propert replyvar["data"]=list; replyvar["transactionid"]=id.c_str(); - QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(replyvar).toJson(); lwsWrite(socket, replystr.data(), replystr.length()); @@ -246,7 +273,12 @@ void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Prop reply["data"]=property.c_str(); reply["transactionid"]= uuid.c_str(); - QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(reply).toJson(); lwsWrite(socket, replystr.data(), replystr.length()); } @@ -272,7 +304,12 @@ void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Proper replyvar["data"]= data; replyvar["transactionid"]=uuid.c_str(); - QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(replyvar).toJson(); lwsWrite(socket, replystr.data(), replystr.length()); @@ -300,7 +337,12 @@ void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Proper reply["data"] = property.c_str(); reply["transactionid"] = uuid.c_str(); - QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(reply).toJson(); lwsWrite(socket, replystr.data(), replystr.length()); @@ -429,7 +471,11 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb QByteArray d((char*)in,len); - QJsonDocument doc = QJsonDocument::fromBinaryData(d); + QJsonDocument doc; + if(doBinary) + doc = QJsonDocument::fromBinaryData(d); + else + doc = QJsonDocument::fromJson(d); if(doc.isNull()) { @@ -520,7 +566,12 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb reply["transactionid"] = id.c_str(); reply["data"] = list; - QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(reply).toJson(); lwsWrite(wsi, replystr.data(), replystr.length()); } diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp index 31562015..21de55a4 100644 --- a/plugins/websocket/websocketsource.cpp +++ b/plugins/websocket/websocketsource.cpp @@ -42,28 +42,28 @@ double totalTime=0; double numUpdates=0; double averageLatency=0; +static bool doBinary = false; + static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len) { - //std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]); - - //char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; - //strcpy(buf, strToWrite); - - return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); -} + int retval = -1; -static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite) -{ - std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); + if(doBinary) + { + retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); + } + else + { + std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]); + char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; + strcpy(buf, strToWrite); - char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; - strcpy(buf, strToWrite.c_str()); + retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT); + } - //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used - return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT); + return retval; } - static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len); static struct libwebsocket_protocols protocols[] = { { @@ -100,7 +100,12 @@ void WebSocketSource::checkSubscriptions() reply["data"] = prop.c_str(); reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66"; - QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(reply).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(reply).toJson(); lwsWrite(clientsocket, replystr.data(), replystr.length()); } @@ -111,6 +116,12 @@ void WebSocketSource::setConfiguration(map<string, string> config) std::string ip; int port; configuration = config; + + if(config.find("binaryProtocol") != config.end()) + { + doBinary = config["binaryProtocol"] == "true"; + } + for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++) { DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n"; @@ -253,16 +264,36 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket toSend["name"] = "getSupportedEventTypes"; toSend["transactionid"] = amb::createUuid().c_str(); - QByteArray data = QJsonDocument::fromVariant(toSend).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(toSend).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(toSend).toJson(); - lwsWrite(wsi,data.data(),data.length()); + lwsWrite(wsi,replystr.data(),replystr.length()); break; } case LWS_CALLBACK_CLIENT_RECEIVE: { QByteArray d((char*)in,len); - QJsonDocument doc = QJsonDocument::fromBinaryData(d); + QJsonDocument doc; + + if(doBinary) + doc = QJsonDocument::fromBinaryData(d); + else + { + doc = QJsonDocument::fromJson(d); + DebugOut(7)<<d.data()<<endl; + } + + if(doc.isNull()) + { + DebugOut(DebugOut::Error)<<"Invalid message"<<endl; + break; + } + QVariantMap call = doc.toVariant().toMap(); string type = call["type"].toString().toStdString(); @@ -516,7 +547,12 @@ void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply) replyvar["data"] = data; replyvar["transactionid"] = uuid.c_str(); - QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(replyvar).toJson(); lwsWrite(clientsocket, replystr.data(), replystr.length()); } @@ -550,7 +586,12 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) replyvar["data"] = properties; - QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(replyvar).toJson(); lwsWrite(clientsocket, replystr.data(), replystr.length()); } @@ -571,7 +612,12 @@ AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest reque replyvar["data"] = data; replyvar["transactionid"] = amb::createUuid().c_str(); - QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + QByteArray replystr; + + if(doBinary) + replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + else + replystr = QJsonDocument::fromVariant(replyvar).toJson(); lwsWrite(clientsocket, replystr.data(), replystr.length()); |