summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevron Rees <kevron.m.rees@intel.com>2014-01-02 16:18:47 -0800
committerKevron Rees <kevron.m.rees@intel.com>2014-01-02 16:18:47 -0800
commitff5290afca78664da71444582b9d8bff9965f185 (patch)
tree109da86642e5bee132adbb7c2becc70dce196d82
parent2884fe08bf62b414d4c74727fc282e2a82f94fb9 (diff)
downloadautomotive-message-broker-binarywebsockets.tar.gz
add option to use json or binary protocolbinarywebsockets
-rw-r--r--TODO3
-rw-r--r--examples/configwebsocketsink3
-rw-r--r--examples/configwebsocketsource3
-rw-r--r--plugins/opencvlux/CMakeLists.txt3
-rw-r--r--plugins/websocket/test/test.js25
-rw-r--r--plugins/websocket/websocketsink.cpp34
-rw-r--r--plugins/websocket/websocketsink.h3
-rw-r--r--plugins/websocket/websocketsinkmanager.cpp77
-rw-r--r--plugins/websocket/websocketsource.cpp90
9 files changed, 180 insertions, 61 deletions
diff --git a/TODO b/TODO
index f3b25d60..95f7c7f1 100644
--- a/TODO
+++ b/TODO
@@ -1,3 +1,6 @@
+For 0.12
+
+- Rewrite websocket test html
For 0.11
diff --git a/examples/configwebsocketsink b/examples/configwebsocketsink
index 9f55d203..2f95e479 100644
--- a/examples/configwebsocketsink
+++ b/examples/configwebsocketsink
@@ -12,7 +12,8 @@
"path" : "/usr/lib/automotive-message-broker/websocketsinkplugin.so",
"interface" : "lo",
"ssl" : "false",
- "port" : "23000"
+ "port" : "23000",
+ "binaryProtocol" : "true"
}
]
}
diff --git a/examples/configwebsocketsource b/examples/configwebsocketsource
index 5ead5d17..25a80886 100644
--- a/examples/configwebsocketsource
+++ b/examples/configwebsocketsource
@@ -5,7 +5,8 @@
"path" : "/usr/lib/automotive-message-broker/websocketsourceplugin.so",
"port" : "23000",
"ssl" : "false",
- "ip" : "127.0.0.1"
+ "ip" : "127.0.0.1",
+ "binaryProtocol" : "true"
}
],
"sinks": [
diff --git a/plugins/opencvlux/CMakeLists.txt b/plugins/opencvlux/CMakeLists.txt
index 64a8d656..1d93e6c6 100644
--- a/plugins/opencvlux/CMakeLists.txt
+++ b/plugins/opencvlux/CMakeLists.txt
@@ -8,9 +8,6 @@ else(OpenCV_LIBS)
message(FATAL_ERROR "opencv missing. please install opencv")
endif(OpenCV_LIBS)
-find_package(Boost COMPONENTS thread REQUIRED)
-find_package(Threads REQUIRED)
-
#find opencv ocl headers:
find_path(ocl ocl.hpp PATH_SUFFIXES opencv/ocl opencv2/ocl DOC "opencv ocl headers")
diff --git a/plugins/websocket/test/test.js b/plugins/websocket/test/test.js
index 90106f21..9d54cc66 100644
--- a/plugins/websocket/test/test.js
+++ b/plugins/websocket/test/test.js
@@ -172,9 +172,9 @@ function eventListener(e) {
}
}
-function subscribe(eventlist) {
- var zoneList = getZone(eventlist);
- window.vehicle.subscribe(eventlist, zoneList,
+function subscribe(event) {
+ var zoneList = getZone(event);
+ window.vehicle.subscribe(event, zoneList,
function(data) {
PRINT.pass("Subscribe success for: " + data);
for (var i = 0; i < data.length; i++) {
@@ -191,14 +191,13 @@ function subscribe(eventlist) {
);
}
-function unsubscribe(eventlist, zoneList) {
- zoneList = getZone(eventlist);
+function unsubscribe(event, zoneList) {
+ zoneList = getZone(event);
/* kill the handers first, so even if the service fails to acknowledge */
/* we've stopped listening */
- for (var i = 0; i < eventlist.length; i++) {
- document.removeEventListener(eventlist[i], eventListener, false);
- }
- window.vehicle.unsubscribe(eventlist, zoneList,
+ document.removeEventListener(event, eventListener, false);
+
+ window.vehicle.unsubscribe(event, zoneList,
function(data) {
PRINT.pass("Unsubscribe success for: " + data);
for (var i = 0; i < data.length; i++) {
@@ -254,12 +253,12 @@ function select(elem) {
return;
var idx = selected.indexOf(name);
- if (elem.className == "propinfo") {
+ if (elem.className === "propinfo") {
if (idx < 0) {
selected[selected.length] = name;
}
elem.className = "propinfo select";
- } else if (elem.className == "propinfo select") {
+ } else if (elem.className === "propinfo select") {
if (idx >= 0) {
selected.splice(idx, 1);
}
@@ -279,8 +278,8 @@ function start(msg) {
var part = ['<div class="proptest"><div class="propinfo" onclick=select(this)>',
'</div><div class="buttons"><div class="testbutton types" onclick=getTypes("',
'")></div><div id="',
- '_subscribe" class="testbutton subscribe" onclick=subscribe(["',
- '"])></div><div id="',
+ '_subscribe" class="testbutton subscribe" onclick=subscribe("',
+ '")></div><div id="',
'_unsubscribe" class="testbutton unsubscribe" onclick=unsubscribe(["',
'"])></div><div class="testbutton get" onclick=getValue(["',
'"])></div><div class="testbutton set" onclick=setValue(["',
diff --git a/plugins/websocket/websocketsink.cpp b/plugins/websocket/websocketsink.cpp
index 4a7ff288..37f88e39 100644
--- a/plugins/websocket/websocketsink.cpp
+++ b/plugins/websocket/websocketsink.cpp
@@ -33,15 +33,27 @@
#include <QJsonDocument>
#include <QVariantMap>
+bool WebSocketSink::doBinary = false;
+
+// libwebsocket_write helper function
static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len)
{
- /*std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
-
- char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(buf, strToWrite.c_str());
-*/
- //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
- return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
+ int retval = -1;
+
+ if(WebSocketSink::doBinary)
+ {
+ retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
+ }
+ else
+ {
+ std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
+ char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(buf, strToWrite);
+
+ retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT);
+ }
+
+ return retval;
}
@@ -75,7 +87,11 @@ void WebSocketSink::propertyChanged(AbstractPropertyType *value)
reply["name"]=property.c_str();
reply["transactionid"]=m_uuid.c_str();
- QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ QByteArray replystr;
+ if(WebSocketSink::doBinary)
+ replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(reply).toJson();
lwsWrite(m_wsi, replystr.data(),replystr.length());
}
@@ -91,3 +107,5 @@ PropertyList WebSocketSink::subscriptions()
return PropertyList();
}
+/// 6% and 4% cpu with json
+/// 5% and 4% with binary
diff --git a/plugins/websocket/websocketsink.h b/plugins/websocket/websocketsink.h
index 94d4b695..68d5cf1a 100644
--- a/plugins/websocket/websocketsink.h
+++ b/plugins/websocket/websocketsink.h
@@ -34,6 +34,9 @@ public:
void supportedChanged(PropertyList supportedProperties);
PropertyList subscriptions();
libwebsocket *socket() { return m_wsi; }
+
+ static bool doBinary;
+
private:
char *webSocketBuffer;
string m_amdbproperty;
diff --git a/plugins/websocket/websocketsinkmanager.cpp b/plugins/websocket/websocketsinkmanager.cpp
index 1fbe4b6b..1cb7bacb 100644
--- a/plugins/websocket/websocketsinkmanager.cpp
+++ b/plugins/websocket/websocketsinkmanager.cpp
@@ -41,16 +41,27 @@ WebSocketSinkManager *sinkManager;
static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
+static bool doBinary = false;
+
// libwebsocket_write helper function
static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len)
{
- /*std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+ int retval = -1;
- char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(buf, strToWrite.c_str());
-*/
- //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
- return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
+ if(doBinary)
+ {
+ retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
+ }
+ else
+ {
+ std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
+ char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(buf, strToWrite);
+
+ retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT);
+ }
+
+ return retval;
}
WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
@@ -58,6 +69,12 @@ WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<st
m_engine = engine;
+ if(config.find("binaryProtocol") != config.end())
+ {
+ doBinary = config["binaryProtocol"] == "true";
+ WebSocketSink::doBinary = doBinary;
+ }
+
//Create a listening socket on port 23000 on localhost.
@@ -177,7 +194,12 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper
replyvar["data"]= data;
replyvar["transactionid"]=id.c_str();
- QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(replyvar).toJson();
lwsWrite(socket, replystr.data(), replystr.length());
@@ -217,7 +239,12 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Propert
replyvar["data"]=list;
replyvar["transactionid"]=id.c_str();
- QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(replyvar).toJson();
lwsWrite(socket, replystr.data(), replystr.length());
@@ -246,7 +273,12 @@ void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Prop
reply["data"]=property.c_str();
reply["transactionid"]= uuid.c_str();
- QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(reply).toJson();
lwsWrite(socket, replystr.data(), replystr.length());
}
@@ -272,7 +304,12 @@ void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Proper
replyvar["data"]= data;
replyvar["transactionid"]=uuid.c_str();
- QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(replyvar).toJson();
lwsWrite(socket, replystr.data(), replystr.length());
@@ -300,7 +337,12 @@ void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Proper
reply["data"] = property.c_str();
reply["transactionid"] = uuid.c_str();
- QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(reply).toJson();
lwsWrite(socket, replystr.data(), replystr.length());
@@ -429,7 +471,11 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
QByteArray d((char*)in,len);
- QJsonDocument doc = QJsonDocument::fromBinaryData(d);
+ QJsonDocument doc;
+ if(doBinary)
+ doc = QJsonDocument::fromBinaryData(d);
+ else
+ doc = QJsonDocument::fromJson(d);
if(doc.isNull())
{
@@ -520,7 +566,12 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
reply["transactionid"] = id.c_str();
reply["data"] = list;
- QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(reply).toJson();
lwsWrite(wsi, replystr.data(), replystr.length());
}
diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp
index 31562015..21de55a4 100644
--- a/plugins/websocket/websocketsource.cpp
+++ b/plugins/websocket/websocketsource.cpp
@@ -42,28 +42,28 @@ double totalTime=0;
double numUpdates=0;
double averageLatency=0;
+static bool doBinary = false;
+
static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len)
{
- //std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
-
- //char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
- //strcpy(buf, strToWrite);
-
- return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
-}
+ int retval = -1;
-static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
-{
- std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+ if(doBinary)
+ {
+ retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
+ }
+ else
+ {
+ std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
+ char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(buf, strToWrite);
- char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(buf, strToWrite.c_str());
+ retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT);
+ }
- //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
- return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+ return retval;
}
-
static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
static struct libwebsocket_protocols protocols[] = {
{
@@ -100,7 +100,12 @@ void WebSocketSource::checkSubscriptions()
reply["data"] = prop.c_str();
reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
- QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(reply).toJson();
lwsWrite(clientsocket, replystr.data(), replystr.length());
}
@@ -111,6 +116,12 @@ void WebSocketSource::setConfiguration(map<string, string> config)
std::string ip;
int port;
configuration = config;
+
+ if(config.find("binaryProtocol") != config.end())
+ {
+ doBinary = config["binaryProtocol"] == "true";
+ }
+
for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
{
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
@@ -253,16 +264,36 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
toSend["name"] = "getSupportedEventTypes";
toSend["transactionid"] = amb::createUuid().c_str();
- QByteArray data = QJsonDocument::fromVariant(toSend).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(toSend).toJson();
- lwsWrite(wsi,data.data(),data.length());
+ lwsWrite(wsi,replystr.data(),replystr.length());
break;
}
case LWS_CALLBACK_CLIENT_RECEIVE:
{
QByteArray d((char*)in,len);
- QJsonDocument doc = QJsonDocument::fromBinaryData(d);
+ QJsonDocument doc;
+
+ if(doBinary)
+ doc = QJsonDocument::fromBinaryData(d);
+ else
+ {
+ doc = QJsonDocument::fromJson(d);
+ DebugOut(7)<<d.data()<<endl;
+ }
+
+ if(doc.isNull())
+ {
+ DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
+ break;
+ }
+
QVariantMap call = doc.toVariant().toMap();
string type = call["type"].toString().toStdString();
@@ -516,7 +547,12 @@ void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
replyvar["data"] = data;
replyvar["transactionid"] = uuid.c_str();
- QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(replyvar).toJson();
lwsWrite(clientsocket, replystr.data(), replystr.length());
}
@@ -550,7 +586,12 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
replyvar["data"] = properties;
- QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(replyvar).toJson();
lwsWrite(clientsocket, replystr.data(), replystr.length());
}
@@ -571,7 +612,12 @@ AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest reque
replyvar["data"] = data;
replyvar["transactionid"] = amb::createUuid().c_str();
- QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ QByteArray replystr;
+
+ if(doBinary)
+ replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+ else
+ replystr = QJsonDocument::fromVariant(replyvar).toJson();
lwsWrite(clientsocket, replystr.data(), replystr.length());