summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevron Rees <tripzero.kev@gmail.com>2015-01-06 00:24:35 -0800
committerKevron Rees <tripzero.kev@gmail.com>2015-01-06 01:02:48 -0800
commit4d43929911190385dc30dce1c3929c70f8ffbb96 (patch)
tree5ddd3757bf5586a6befc9aef70a129cf48006bc1
parentee74d7ca9a131a159f19dd47352f8e216b9e8215 (diff)
downloadautomotive-message-broker-4d43929911190385dc30dce1c3929c70f8ffbb96.tar.gz
[websocket] fixed getRanged requests
-rw-r--r--CMakeLists.txt5
-rw-r--r--lib/asyncqueue.hpp5
-rw-r--r--plugins/database/databasesink.cpp7
-rw-r--r--plugins/database/databasesink.h4
-rw-r--r--plugins/exampleplugin.cpp2
-rw-r--r--plugins/examplesink.cpp3
-rw-r--r--plugins/opencvlux/CMakeLists.txt3
-rw-r--r--plugins/websocket/CMakeLists.txt3
-rw-r--r--plugins/websocket/protocol5
-rw-r--r--plugins/websocket/websocketsinkmanager.cpp31
-rw-r--r--plugins/websocket/websocketsource.cpp8
11 files changed, 42 insertions, 34 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e95bab52..43d79db7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -47,11 +47,6 @@ set(XWALK_EXTENSION_PATH "/automotive-message-broker/xwalk" CACHE PATH "director
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fpie -pie -std=c++1y")
-if(opencvlux_plugin)
- message(STATUS "OpenCV Lux plugin enabled")
-
-endif(opencvlux_plugin)
-
include (CMakeForceCompiler)
if (enable_icecc)
diff --git a/lib/asyncqueue.hpp b/lib/asyncqueue.hpp
index ed0c69b4..6557958b 100644
--- a/lib/asyncqueue.hpp
+++ b/lib/asyncqueue.hpp
@@ -56,7 +56,7 @@ public:
if(mBlocking)
{
- while(!mQueue.size())
+ if(!mQueue.size())
{
cond.wait(lock);
}
@@ -78,13 +78,12 @@ public:
{
{
std::lock_guard<std::mutex> lock(mutex);
-
mQueue.insert(item);
}
if(mBlocking)
{
- cond.notify_one();
+ cond.notify_all();
}
}
diff --git a/plugins/database/databasesink.cpp b/plugins/database/databasesink.cpp
index 4a9c8b37..8881ae53 100644
--- a/plugins/database/databasesink.cpp
+++ b/plugins/database/databasesink.cpp
@@ -47,7 +47,7 @@ static void * cbFunc(Shared* shared)
NameValuePair<string> zone("zone", boost::lexical_cast<string>(obj.zone));
NameValuePair<string> four("time", boost::lexical_cast<string>(obj.time));
NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj.sequence));
- NameValuePair<string> six("tripId", boost::lexical_cast<string>(shared->tripId));
+ NameValuePair<string> six("tripId", shared->tripId);
dict.push_back(one);
dict.push_back(two);
@@ -76,9 +76,8 @@ static void * cbFunc(Shared* shared)
/// final flush of whatever is still in the queue:
shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
- for(int i=0; i< insertList.size(); i++)
+ for(auto d : insertList)
{
- DictionaryList<string> d = insertList[i];
shared->db->insert(d);
}
shared->db->exec("END TRANSACTION");
@@ -380,6 +379,8 @@ void DatabaseSink::propertyChanged(AbstractPropertyType *value)
{
VehicleProperty::Property property = value->name;
+ DebugOut() << "Received property change for " << property << endl;
+
if(!shared)
return;
diff --git a/plugins/database/databasesink.h b/plugins/database/databasesink.h
index 8976f416..459ec928 100644
--- a/plugins/database/databasesink.h
+++ b/plugins/database/databasesink.h
@@ -52,7 +52,7 @@ public:
bool quit;
- bool operator ==(const DBObject & other) const
+ bool operator == (const DBObject & other) const
{
return (key == other.key && source == other.source && zone == other.zone &&
value == other.value && sequence == other.sequence && time == other.time);
@@ -88,7 +88,7 @@ namespace std {
{
size_t operator()(const DBObject & x) const
{
- return x.key.length();
+ return x.key.length() * x.value.length() + x.time;
}
};
}
diff --git a/plugins/exampleplugin.cpp b/plugins/exampleplugin.cpp
index 32901731..4b643c67 100644
--- a/plugins/exampleplugin.cpp
+++ b/plugins/exampleplugin.cpp
@@ -365,8 +365,10 @@ void ExampleSourcePlugin::randomizeProperties()
DebugOut()<<"setting enginespeed to: "<<engineSpeed<<endl;
vel.setValue(velocity);
+ vel.sequence++;
vel.priority = AbstractPropertyType::High;
es.setValue(engineSpeed);
+ es.sequence++;
es.priority = AbstractPropertyType::Low;
ac.setValue(accelerationX);
swa.setValue(steeringWheelAngle);
diff --git a/plugins/examplesink.cpp b/plugins/examplesink.cpp
index 85f3d39f..bd0afef4 100644
--- a/plugins/examplesink.cpp
+++ b/plugins/examplesink.cpp
@@ -183,7 +183,8 @@ void ExampleSink::supportedChanged(const PropertyList & supportedProperties)
for(auto itr = values.begin(); itr != values.end(); itr++)
{
auto val = *itr;
- DebugOut(1)<<"Value from past: ("<<val->name<<"): "<<val->toString()<<" time: "<<val->timestamp<<endl;
+ DebugOut(1) <<"Value from past: (" << val->name << "): " << val->toString()
+ <<" time: " << val->timestamp << " sequence: " << val->sequence << endl;
}
delete reply;
diff --git a/plugins/opencvlux/CMakeLists.txt b/plugins/opencvlux/CMakeLists.txt
index de3645c3..da0b8eb2 100644
--- a/plugins/opencvlux/CMakeLists.txt
+++ b/plugins/opencvlux/CMakeLists.txt
@@ -32,14 +32,11 @@ endif(cuda)
find_package(Qt5Core REQUIRED)
if(Qt5Core_FOUND)
- message(STATUS "using Qt5")
-
set(QT_INCLUDE_DIRS ${Qt5Core_INCLUDE_DIRS} )
set(QT_LIBRARIES ${Qt5Core_LIBRARIES} )
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${Qt5Core_EXECUTABLE_COMPILE_FLAGS}")
add_definitions(${Qt5Core_DEFINITIONS})
add_definitions(-DQT_NO_KEYWORDS)
-
endif(Qt5Core_FOUND)
set(CMAKE_AUTOMOC ON)
diff --git a/plugins/websocket/CMakeLists.txt b/plugins/websocket/CMakeLists.txt
index 12ebe299..a8a0fafa 100644
--- a/plugins/websocket/CMakeLists.txt
+++ b/plugins/websocket/CMakeLists.txt
@@ -8,8 +8,6 @@ include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs})
find_package(Qt5Core REQUIRED)
if(Qt5Core_FOUND)
- message(STATUS "using Qt5")
-
set(QT_INCLUDE_DIRS ${Qt5Core_INCLUDE_DIRS} )
set(QT_LIBRARIES ${Qt5Core_LIBRARIES} )
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${Qt5Core_EXECUTABLE_COMPILE_FLAGS}")
@@ -40,6 +38,7 @@ target_link_libraries(websocketsource amb ${websockets_LIBRARIES} -L${CMAKE_CURR
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test/vehicle.js ${CMAKE_CURRENT_SOURCE_DIR}/test/vehicle.js)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test/test.js ${CMAKE_CURRENT_SOURCE_DIR}/test/test.js)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test/events.js ${CMAKE_CURRENT_SOURCE_DIR}/test/events.js)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocol ${CMAKE_CURRENT_SOURCE_DIR}/protocol)
install(TARGETS websocketsource LIBRARY DESTINATION ${PLUGIN_INSTALL_PATH})
diff --git a/plugins/websocket/protocol b/plugins/websocket/protocol
index 2d723a0f..0a4eec93 100644
--- a/plugins/websocket/protocol
+++ b/plugins/websocket/protocol
@@ -28,7 +28,10 @@ Unsubscribe to data:
{"type":"method", "name":"unsubscribe", "property":"EngineSpeed", "transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
Get History request:
-{"type":"method","name":"getRange","data": {"property":"VehicleSpeed", "timeBegin":"1368825008.35948","timeEnd":"1368825018.35948","sequenceBegin":"-1","sequenceEnd":"-1"},"transactionid":"b07589ba-417c-4604-80c6-01c0dcbd524d"}
+{"type" : "method", "name" : "getRange", "data" : ["VehicleSpeed"], "timeBegin" : 1368825008.35948, "timeEnd" : 1368825018.35948, "sequenceBegin" : -1, "sequenceEnd" : -1, "transactionid" : "b07589ba-417c-4604-80c6-01c0dcbd524d"}
+
+Get History reply:
+{"data" : [{"name" : "EngineSpeed", "sequence":-1, "timestamp" : 143706.443, "value" : "13789"}], "name" : "getRanged", "transactionid" : "fe4a803e-d587-4fa0-bd5a-9cf689097d88", "type" : "methodReply"}
Set property request:
{ "type" : "method", "name" : "set", "data" : { "property" : "MachineGunTurretStatus", "value" : "true", "zone" : 0 }, "transactionid" : "4123123123" }
diff --git a/plugins/websocket/websocketsinkmanager.cpp b/plugins/websocket/websocketsinkmanager.cpp
index b75db03a..b82c6b66 100644
--- a/plugins/websocket/websocketsinkmanager.cpp
+++ b/plugins/websocket/websocketsinkmanager.cpp
@@ -191,19 +191,21 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Propert
rangedRequest.timeEnd = end;
rangedRequest.sequenceBegin = seqstart;
rangedRequest.sequenceEnd = seqend;
+ rangedRequest.properties = properties;
- rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
+ rangedRequest.completed = [socket, id](AsyncRangePropertyReply* reply)
{
QVariantMap replyvar;
QVariantList list;
std::list<AbstractPropertyType*> values = reply->values;
- for(auto itr = values.begin(); itr != values.end(); itr++)
+ for(auto value : values)
{
QVariantMap obj;
- obj["value"]= (*itr)->toString().c_str();
- obj["timestamp"] = (*itr)->timestamp;
- obj["sequence"] = (*itr)->sequence;
+ obj["name"] = value->name.c_str();
+ obj["value"] = value->toString().c_str();
+ obj["timestamp"] = value->timestamp;
+ obj["sequence"] = value->sequence;
list.append(obj);
}
@@ -452,16 +454,21 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
{
if(name == "getRanged")
{
- QVariantMap data = call["data"].toMap();
+ QVariant dataVariant = call["data"];
+
+ QVariantList data = dataVariant.toList();
PropertyList propertyList;
- propertyList.push_back(data["property"].toString().toStdString());
+ Q_FOREACH(QVariant v, data)
+ {
+ propertyList.push_back(v.toString().toStdString());
+ }
- double timeBegin = data["timeBegin"].toDouble();
- double timeEnd = data["timeEnd"].toDouble();
- double sequenceBegin = data["sequenceBegin"].toInt();
- double sequenceEnd = data["sequenceEnd"].toInt();
+ double timeBegin = call["timeBegin"].toDouble();
+ double timeEnd = call["timeEnd"].toDouble();
+ int sequenceBegin = call["sequenceBegin"].toInt();
+ int sequenceEnd = call["sequenceEnd"].toInt();
if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
{
@@ -473,7 +480,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
}
else
{
- sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
+ sinkManager->addSingleShotRangedSink(wsi, propertyList, timeBegin, timeEnd, sequenceBegin, sequenceEnd, id);
}
}
else if (name == "get")
diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp
index eb0211a5..97a6496f 100644
--- a/plugins/websocket/websocketsource.cpp
+++ b/plugins/websocket/websocketsource.cpp
@@ -377,7 +377,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
{
doc = QJsonDocument::fromJson(d);
DebugOut(7)<<d.data()<<endl;
- }
+ }
if(doc.isNull())
{
@@ -490,6 +490,11 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
int sequence = obj["sequence"].toInt();
AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
+ if(!type)
+ {
+ DebugOut() << "TODO: support custom types here: " << endl;
+ continue;
+ }
type->timestamp = timestamp;
type->sequence = sequence;
@@ -698,7 +703,6 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
replyvar["sequenceBegin"] = reply->sequenceBegin;
replyvar["sequenceEnd"] = reply->sequenceEnd;
-
QStringList properties;
for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)