diff options
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | lib/abstractroutingengine.h | 21 | ||||
-rw-r--r-- | lib/vehicleproperty.h | 1 | ||||
-rw-r--r-- | plugins/obd2plugin/obd2source.cpp | 200 | ||||
-rw-r--r-- | plugins/obd2plugin/obd2source.h | 1 | ||||
-rw-r--r-- | plugins/websocketsink/test/api.js | 30 | ||||
-rw-r--r-- | plugins/websocketsink/websocketsinkmanager.cpp | 134 | ||||
-rw-r--r-- | plugins/websocketsink/websocketsinkmanager.h | 1 | ||||
-rw-r--r-- | plugins/websocketsourceplugin/websocketsource.cpp | 119 | ||||
-rw-r--r-- | plugins/websocketsourceplugin/websocketsource.h | 1 |
10 files changed, 456 insertions, 54 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index e0d30983..0d4faae8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_BUILD_TYPE, Debug) include(FindPkgConfig) set(PROJECT_NAME "automotive-message-broker") -set(PROJECT_VERSION "0.4.0") +set(PROJECT_VERSION "0.5.0") add_definitions(-DPROJECT_VERSION="${PROJECT_VERSION}") add_definitions(-DPROJECT_NAME="${PROJECT_NAME}") diff --git a/lib/abstractroutingengine.h b/lib/abstractroutingengine.h index f39503e1..cca57b15 100644 --- a/lib/abstractroutingengine.h +++ b/lib/abstractroutingengine.h @@ -40,8 +40,13 @@ typedef std::function<void (AsyncRangePropertyReply*)> GetRangedPropertyComplete class PropertyValueTime { public: + ~PropertyValueTime() + { + delete value; + } + AbstractPropertyType* value; - time_t timestamp; + double timestamp; }; class AsyncPropertyRequest @@ -122,8 +127,8 @@ public: VehicleProperty::Property property; GetRangedPropertyCompletedSignal completed; - time_t begin; - time_t end; + double begin; + double end; }; class AsyncRangePropertyReply: public AsyncRangePropertyRequest @@ -135,6 +140,16 @@ public: } + ~AsyncRangePropertyReply() + { + for(auto itr = values.begin(); itr != values.end(); itr++) + { + delete (*itr); + } + + values.clear(); + } + std::list<PropertyValueTime*> values; bool success; }; diff --git a/lib/vehicleproperty.h b/lib/vehicleproperty.h index 5b4348b6..faeebf59 100644 --- a/lib/vehicleproperty.h +++ b/lib/vehicleproperty.h @@ -26,6 +26,7 @@ #include <set> #include <sstream> #include <map> +#include <functional> #include <abstractpropertytype.h> diff --git a/plugins/obd2plugin/obd2source.cpp b/plugins/obd2plugin/obd2source.cpp index 3fe54b03..f2b4446f 100644 --- a/plugins/obd2plugin/obd2source.cpp +++ b/plugins/obd2plugin/obd2source.cpp @@ -121,8 +121,13 @@ void threadLoop(gpointer data) ObdPid::ByteArray replyVector; std::string reply; std::string port; +<<<<<<< HEAD + int baud; + bool connected = false; +======= std::string baud; bool connected=false; +>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece while (true) { //gpointer query = g_async_queue_pop(privCommandQueue); @@ -154,6 +159,50 @@ void threadLoop(gpointer data) DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Command:" << req->req << endl; if (req->req == "connect") { +<<<<<<< HEAD + //printf("First: %s\nSecond: %s\n",req->arg.substr(0,req->arg.find(':')).c_str(),req->arg.substr(req->arg.find(':')+1).c_str()); + port = req->arg.substr(0,req->arg.find(':')); + baud = boost::lexical_cast<int>(req->arg.substr(req->arg.find(':')+1)); + obd->openPort(port.c_str(),baud); + + obd->sendObdRequestString("ATZ\r",4,&replyVector,500,3); + for (unsigned int i=0;i<replyVector.size();i++) + { + reply += replyVector[i]; + } + if (reply.find("ELM") == -1) + { + //No reply found + //printf("Error!\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error resetting ELM\n"; + } + else + { + //printf("Reply to reset: %s\n",reply.c_str()); + } + if (!sendElmCommand(obd,"ATSP0")) + { + //printf("Error sending echo\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error setting auto protocol"<<endl; + } + if (!sendElmCommand(obd,"ATE0")) + { + //printf("Error sending echo\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off echo"<<endl; + } + if (!sendElmCommand(obd,"ATH0")) + { + //printf("Error sending headers off\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off headers"<<endl; + } + if (!sendElmCommand(obd,"ATL0")) + { + //printf("Error turning linefeeds off\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off linefeeds"<<endl; + } + connected = true; + } +======= connect(obd,req->arglist[0],req->arglist[1]); connected = true; } @@ -162,6 +211,7 @@ void threadLoop(gpointer data) port = req->arglist[0]; baud = req->arglist[1]; } +>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece else if (req->req == "disconnect") { obd->closePort(); @@ -206,6 +256,63 @@ void threadLoop(gpointer data) { repeatReqList.push_back(*i); } +<<<<<<< HEAD + if (repeatReqList.size() == 0) + { + //Nothing in the queue, we should disconnect and sit idle. + if (connected) + { + ObdRequest *requ = new ObdRequest(); + requ->req = "disconnect"; + g_async_queue_push(privCommandQueue,requ); + } + } + else + { + if (!connected) + { + //Things in the request queue, but we aren't connected. Queue up a connect. + ObdRequest *requ = new ObdRequest(); + requ->req = "connect"; + requ->arg = port + ":" + baud; + g_async_queue_push(privCommandQueue,requ); + } + } + if (connected) + { + for (std::list<std::string>::iterator i=repeatReqList.begin();i!= repeatReqList.end();i++) + { + //printf("Req: %s\n",(*i).c_str()); + if ((*i) == "ATRV\r") + { + //printf("Requesting voltage...\n"); + if (!obd->sendObdRequestString((*i).c_str(),(*i).length(),&replyVector)) + { + //printf("Unable to request voltage!!!\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unable to request voltage!\n"; + continue; + } + std::string replystring = ""; + for (int j=0;j<replyVector.size();j++) + { + replystring += replyVector[j]; + } + //printf("Voltage reply: %s\n",replystring.c_str()); + replystring.substr(0,replystring.find("V")); + ObdReply *rep = new ObdReply(); + rep->req = "ATRV\r"; + rep->reply = replystring; + g_async_queue_push(privResponseQueue,rep); + } + if (!obd->sendObdRequest((*i).c_str(),(*i).length(),&replyVector)) + { + //printf("Error sending obd2 request\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error sending OBD2 request\n"; + continue; + } + //printf("Reply: %i %i\n",replyVector[0],replyVector[1]); + if (replyVector[0] == 0x41) +======= for (std::list<ObdPid*>::iterator i=repeatReqList.begin();i!= repeatReqList.end();i++) { if (!obd->sendObdRequestString((*i)->pid.c_str(),(*i)->pid.length(),&replyVector)) @@ -264,18 +371,89 @@ void threadLoop(gpointer data) //VIN number reply string vinstring; for (int j=0;j<replyVector.size();j++) +>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece { - if(replyVector[j] == 0x49 && replyVector[j+1] == 0x02) + if (replyVector[1] == 0x0C) { - //We're at a reply header - j+=3; + double rpm = ((replyVector[2] << 8) + replyVector[3]) / 4.0; + ObdReply *rep = new ObdReply(); + rep->req = "0C"; + rep->property = VehicleProperty::EngineSpeed; + rep->reply = boost::lexical_cast<string>(rpm); + g_async_queue_push(privResponseQueue,rep); + //printf("RPM: %f\n",rpm); } - if (replyVector[j] != 0x00) + else if (replyVector[1] == 0x0D) { - vinstring += (char)replyVector[j]; - //printf("VIN: %i %c\n",replyVector[j],replyVector[j]); + int mph = replyVector[2]; + ObdReply *rep = new ObdReply(); + rep->req = "0D"; + rep->property = VehicleProperty::VehicleSpeed; + rep->reply = boost::lexical_cast<string>(mph); + g_async_queue_push(privResponseQueue,rep); } + else if (replyVector[1] == 0x05) + { + int temp = replyVector[2] - 40; + ObdReply *rep = new ObdReply(); + rep->req = "05"; + rep->property = VehicleProperty::EngineCoolantTemperature; + rep->reply = boost::lexical_cast<string>(temp); + g_async_queue_push(privResponseQueue,rep); + } + else if (replyVector[1] == 0x10) + { + double maf = ((replyVector[2] << 8) + replyVector[3]) / 100.0; + ObdReply *rep = new ObdReply(); + rep->req = "10"; + rep->property = VehicleProperty::MassAirFlow; + rep->reply = boost::lexical_cast<string>(maf); + g_async_queue_push(privResponseQueue,rep); + } + else + { + //printf("Unknown response type: %i\n",replyVector[1]); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unknown response type" << replyVector[1] << endl; + } + } +<<<<<<< HEAD + else if (replyVector[0] == 0x49) + { + /* + 49 02 01 00 00 00 31 + 49 02 02 47 31 4A 43 + 49 02 03 35 34 34 34 + 49 02 04 52 37 32 35 + 49 02 05 32 33 36 37 + */ + //VIN number reply + string vinstring; + for (int j=0;j<replyVector.size();j++) + { + if(replyVector[j] == 0x49 && replyVector[j+1] == 0x02) + { + //We're at a reply header + j+=3; + } + if (replyVector[j] != 0x00) + { + vinstring += (char)replyVector[j]; + //printf("VIN: %i %c\n",replyVector[j],replyVector[j]); + } + } + ObdReply *rep = new ObdReply(); + rep->req = "0902"; + rep->reply = vinstring; + g_async_queue_push(privResponseQueue,rep); + //printf("VIN Number: %i %s\n",replyVector.size(),vinstring.c_str()); + } + + //DebugOut()<<"Reply: "<<replyVector[2]<<" "<<replyVector[3]<<endl; + } + } + else +======= /*ObdReply *rep = new ObdReply(); rep->req = "0902"; rep->reply = vinstring; @@ -284,6 +462,7 @@ void threadLoop(gpointer data) //DebugOut()<<"Reply: "<<replyVector[2]<<" "<<replyVector[3]<<endl; } if (!connected) +>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece { usleep(10000); } @@ -651,8 +830,17 @@ void OBD2Source::unsubscribeToPropertyChanges(VehicleProperty::Property property return; } +<<<<<<< HEAD + Obd2Amb obd2amb; + ObdRequest *requ = new ObdRequest(); + requ->property = property; + requ->req = obd2amb.propertyPidMap[property]; + g_async_queue_push(subscriptionRemoveQueue,requ); + +======= ObdPid *pid = obd2AmbInstance->createPidforProperty(property); g_async_queue_push(subscriptionRemoveQueue,pid); +>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece } diff --git a/plugins/obd2plugin/obd2source.h b/plugins/obd2plugin/obd2source.h index a37b694a..6c628706 100644 --- a/plugins/obd2plugin/obd2source.h +++ b/plugins/obd2plugin/obd2source.h @@ -142,6 +142,7 @@ public: std::string m_port; std::string m_baud; map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap; + list<VehicleProperty::Property> propertySubscriptionList; void updateProperty(VehicleProperty::Property property,AbstractPropertyType *value); obdLib * obd; diff --git a/plugins/websocketsink/test/api.js b/plugins/websocketsink/test/api.js index 3a6f3359..5c0d8362 100644 --- a/plugins/websocketsink/test/api.js +++ b/plugins/websocketsink/test/api.js @@ -44,6 +44,19 @@ * form of data[n].name/data[n].value * errorCB: error callback, called with error message string * +* Function name: getHistory(event, startTime, endTime, successCB, errorCB) +* Description: +* Retrieves a list of event/value pairs for a target list of event names +* Required arguments: +* event: event to read +* startTime: start date/time +* endTime: end date/time +* successCB: success callback, gets called with the event/value pair list +* for all requested events. The list is the in the +* form of data[n].name/data[n].value +* errorCB: error callback, called with error message string +* +* * Function name: set(eventlist, valuelist, successCB, errorCB) * Description: * Sets a gourp of event's values (triggers error on read-only events) @@ -245,6 +258,19 @@ Vehicle.prototype.get = function(namelist, successCB, errorCB) this.send(obj, successCB, errorCB); } +Vehicle.prototype.getHistory = function(event, startTime, endTime, successCB, errorCB) +{ + var obj = { + "type" : "method", + "name": "getHistory", + "transactionid" : this.generateTransactionId(), + "data" : [event, (startTime.getTime()/1000).toString(), (endTime.getTime()/1000).toString()] + }; + + this.send(obj, successCB, errorCB); + +} + Vehicle.prototype.set = function(namelist, valuelist, successCB, errorCB) { if((namelist.length != valuelist.length)||(namelist.length <= 0)) @@ -312,8 +338,8 @@ Vehicle.prototype.receive = function(msg) return; } - if((event == undefined)||(event.type == undefined)|| - (event.name == undefined)) + if((event === undefined)||(event.type === undefined)|| + (event.name === undefined)) { self.iErrorCB("BADLY FORMED MESSAGE: "+msg); return; diff --git a/plugins/websocketsink/websocketsinkmanager.cpp b/plugins/websocketsink/websocketsinkmanager.cpp index 38755014..a9a2d608 100644 --- a/plugins/websocketsink/websocketsinkmanager.cpp +++ b/plugins/websocketsink/websocketsinkmanager.cpp @@ -80,6 +80,7 @@ void WebSocketSinkManager::setConfiguration(map<string, string> config) } context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options); } + void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id) { AsyncPropertyRequest velocityRequest; @@ -122,7 +123,7 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper //TODO: Dirty hack hardcoded stuff, jsut to make it work. string tmpstr = ""; tmpstr = property; - s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}"; + s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"property\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}"; string replystr = s.str(); //printf("Reply: %s\n",replystr.c_str()); @@ -136,11 +137,88 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true. //delete new_response; <- Unneeded. Apparently libwebsocket free's it. delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer. - + delete reply; }; AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest); } + +void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, VehicleProperty::Property property, double start, double end, string id) +{ + AsyncRangePropertyRequest rangedRequest; + + rangedRequest.begin = start; + rangedRequest.end = end; + + if (property == "running_status_speedometer") + { + rangedRequest.property = VehicleProperty::VehicleSpeed; + } + else if (property == "running_status_engine_speed") + { + rangedRequest.property = VehicleProperty::EngineSpeed; + } + else if (property == "running_status_steering_wheel_angle") + { + rangedRequest.property = VehicleProperty::SteeringWheelAngle; + } + else if (property == "running_status_transmission_gear_status") + { + rangedRequest.property = VehicleProperty::TransmissionShiftPosition; + } + else + { + PropertyList foo = VehicleProperty::capabilities(); + if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property)) + { + rangedRequest.property = property; + } + else + { + DebugOut(0)<<"websocketsink: Invalid property requested: "<<property; + return; + } + + } + rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply) + { + stringstream s; + + //TODO: Dirty hack hardcoded stuff, jsut to make it work. + stringstream data ("["); + std::list<PropertyValueTime*> values = reply->values; + for(auto itr = values.begin(); itr != values.end(); itr++) + { + if(itr != values.begin()) + { + data<<","; + } + + data << "{ \"value\" : " << "\"" << (*itr)->value->toString() << "\", \"time\" : \"" << (*itr)->timestamp << "\" }"; + } + + data<<"]"; + + s << "{\"type\":\"methodReply\",\"name\":\"getHistory\",\"data\":"<<data<<",\"transactionid\":\"" << id << "\"}"; + + string replystr = s.str(); + //printf("Reply: %s\n",replystr.c_str()); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n"; + + char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; + new_response+=LWS_SEND_BUFFER_PRE_PADDING; + strcpy(new_response,replystr.c_str()); + libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); + + //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true. + //delete new_response; <- Unneeded. Apparently libwebsocket free's it. + delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer. + delete reply; + }; + + AsyncRangePropertyReply* reply = routingEngine->getRangePropertyAsync(rangedRequest); +} + void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid) { if (m_sinkMap.find(property) != m_sinkMap.end()) @@ -397,7 +475,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb name = json_reader_get_string_value(reader); json_reader_end_member(reader); - list<string> data; + vector<string> data; list<string> key; list<string> value; json_reader_read_member(reader,"data"); @@ -471,30 +549,6 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb //GetProperty is going to be a singleshot sink. //string arg = arguments.front(); sinkManager->addSingleShotSink(wsi,data.front(),id); - /*if (data.front()== "running_status_speedometer") - { - sinkManager->addSingleShotSink(wsi,VehicleProperty::VehicleSpeed,id); - } - else if (data.front() == "running_status_engine_speed") - { - sinkManager->addSingleShotSink(wsi,VehicleProperty::EngineSpeed,id); - } - else if (data.front() == "running_status_steering_wheel_angle") - { - sinkManager->addSingleShotSink(wsi,VehicleProperty::SteeringWheelAngle,id); - } - else if (data.front() == "running_status_transmission_gear_status") - { - sinkManager->addSingleShotSink(wsi,VehicleProperty::TransmissionShiftPosition,id); - } - else - { - PropertyList foo = VehicleProperty::capabilities(); - if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front())) - { - sinkManager->addSingleShotSink(wsi,data.front(),id); - } - }*/ } else { @@ -531,7 +585,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb else if (name == "subscribe") { //Websocket wants to subscribe to an event, data.front(); - for (list<string>::iterator i=data.begin();i!=data.end();i++) + for (auto i=data.begin();i!=data.end();i++) { sinkManager->addSink(wsi,(*i),id); } @@ -539,11 +593,29 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb else if (name == "unsubscribe") { //Websocket wants to unsubscribe to an event, data.front(); - for (list<string>::iterator i=data.begin();i!=data.end();i++) + for (auto i=data.begin();i!=data.end();i++) { sinkManager->removeSink(wsi,(*i),id); } } + else if (name == "getHistory") + { + if(data.size() == 3) + { + std::string property = data[0]; + std::string startStr = data[1]; + std::string endStr = data[2]; + + sinkManager->addSingleShotRangedSink(wsi,property, + boost::lexical_cast<double,std::string>(startStr), + boost::lexical_cast<double,std::string>(endStr), id); + } + + else + { + //TODO: error, "invalid arguments" should be sent in reply to this. + } + } else if (name == "getSupportedEventTypes") { //If data.front() dosen't contain a property name, return a list of properties supported. @@ -602,6 +674,10 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); } + else + { + DebugOut(0)<<"Unknown method called."<<endl; + } } break; } diff --git a/plugins/websocketsink/websocketsinkmanager.h b/plugins/websocketsink/websocketsinkmanager.h index 56a9163b..4a13caa5 100644 --- a/plugins/websocketsink/websocketsinkmanager.h +++ b/plugins/websocketsink/websocketsinkmanager.h @@ -35,6 +35,7 @@ class WebSocketSinkManager: public AbstractSinkManager public: WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config); void addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id); + void addSingleShotRangedSink(libwebsocket* socket, VehicleProperty::Property property,double start, double end, string id); void addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid); void disconnectAll(libwebsocket* socket); void removeSink(libwebsocket* socket,VehicleProperty::Property property,string uuid); diff --git a/plugins/websocketsourceplugin/websocketsource.cpp b/plugins/websocketsourceplugin/websocketsource.cpp index faec6456..2a564ab3 100644 --- a/plugins/websocketsourceplugin/websocketsource.cpp +++ b/plugins/websocketsourceplugin/websocketsource.cpp @@ -194,27 +194,71 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket json_reader_end_member(reader); list<string> data; - json_reader_read_member(reader,"data"); - if (json_reader_is_array(reader)) + list<pair<string,string> > pairdata; + if (name == "get") { - for(int i=0; i < json_reader_count_elements(reader); i++) + json_reader_read_member(reader,"data"); + if (json_reader_is_array(reader)) { - json_reader_read_element(reader,i); - string path = json_reader_get_string_value(reader); - data.push_back(path); - json_reader_end_element(reader); + for(int i=0; i < json_reader_count_elements(reader); i++) + { + + pair<string,string> pair; + json_reader_read_element(reader,i); + + json_reader_read_member(reader,"property"); + pair.first = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + json_reader_read_member(reader,"value"); + pair.second = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + json_reader_end_element(reader); + + pairdata.push_back(pair); + } + } + else + { + pair<string,string> pair; + + json_reader_read_member(reader,"property"); + pair.first = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + json_reader_read_member(reader,"value"); + pair.second = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + pairdata.push_back(pair); } + json_reader_end_member(reader); } else { - string path = json_reader_get_string_value(reader); - if (path != "") + json_reader_read_member(reader,"data"); + if (json_reader_is_array(reader)) + { + for(int i=0; i < json_reader_count_elements(reader); i++) + { + json_reader_read_element(reader,i); + string path = json_reader_get_string_value(reader); + data.push_back(path); + json_reader_end_element(reader); + } + } + else { - data.push_back(path); + string path = json_reader_get_string_value(reader); + if (path != "") + { + data.push_back(path); + } } + json_reader_end_member(reader); } - json_reader_end_member(reader); - + string id; json_reader_read_member(reader,"transactionid"); if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0) @@ -282,6 +326,23 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket source->setSupported(props); //m_re->updateSupported(m_supportedProperties,PropertyList()); } + else if (name == "get") + { + + DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size(); + while (pairdata.size() > 0) + { + pair<string,string> pair = pairdata.front(); + pairdata.pop_front(); + if (source->propertyReplyMap.find(pair.first) != source->propertyReplyMap.end()) + { + source->propertyReplyMap[pair.first]->value = VehicleProperty::getPropertyTypeForPropertyNameValue(source->propertyReplyMap[pair.first]->property,pair.second); + source->propertyReplyMap[pair.first]->completed(source->propertyReplyMap[pair.first]); + source->propertyReplyMap.erase(pair.first); + } + } + //data will contain a property/value map. + } } break; } @@ -355,6 +416,25 @@ void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property pro void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply) { ///TODO: fill in + //s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + //m_re->getPropertyAsync(); + /*reply.value = 1; + reply->completed(reply); + reply->completed = [](AsyncPropertyReply* reply) { + DebugOut()<<"Velocity Async request completed: "<<reply->value->toString()<<endl; + delete reply; + };*/ + propertyReplyMap[reply->property] = reply; + stringstream s; + s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + string replystr = s.str(); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n"; + //printf("Reply: %s\n",replystr.c_str()); + char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; + new_response+=LWS_SEND_BUFFER_PRE_PADDING; + strcpy(new_response,replystr.c_str()); + libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); + delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); } void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) @@ -365,7 +445,20 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request ) { ///TODO: fill in - return NULL; + AsyncPropertyReply* reply = new AsyncPropertyReply(request); + reply->success = true; + stringstream s; + s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + string replystr = s.str(); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n"; + //printf("Reply: %s\n",replystr.c_str()); + char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; + new_response+=LWS_SEND_BUFFER_PRE_PADDING; + strcpy(new_response,replystr.c_str()); + libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); + delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); + reply->completed(reply); + return reply; } extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config) diff --git a/plugins/websocketsourceplugin/websocketsource.h b/plugins/websocketsourceplugin/websocketsource.h index 7854c806..3eaaaa58 100644 --- a/plugins/websocketsourceplugin/websocketsource.h +++ b/plugins/websocketsourceplugin/websocketsource.h @@ -50,6 +50,7 @@ public: void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {} void supportedChanged(PropertyList) {} void setConfiguration(map<string, string> config); + map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap; private: PropertyList m_supportedProperties; |