diff options
author | Michael Carpenter <malcom2073@gmail.com> | 2012-08-28 11:08:04 -0400 |
---|---|---|
committer | Michael Carpenter <malcom2073@gmail.com> | 2012-08-28 13:48:28 -0400 |
commit | 8e23d92eb5fb77054c9f1b6ef81d5b12cf1908ae (patch) | |
tree | 80cec5d25c56866a92a8a76126d3cacf92364e70 | |
parent | 2cdaf8803d25b4693a9bf8d145e37ffeac295794 (diff) | |
download | automotive-message-broker-8e23d92eb5fb77054c9f1b6ef81d5b12cf1908ae.tar.gz |
Fixes for issues concerning subscriptions and websocketsource
-rw-r--r-- | ambd/core.cpp | 1 | ||||
-rw-r--r-- | plugins/exampleplugin.cpp | 1 | ||||
-rw-r--r-- | plugins/examplesink.cpp | 2 | ||||
-rw-r--r-- | plugins/websocketsourceplugin/websocketsource.cpp | 169 | ||||
-rw-r--r-- | plugins/websocketsourceplugin/websocketsource.h | 13 |
5 files changed, 87 insertions, 99 deletions
diff --git a/ambd/core.cpp b/ambd/core.cpp index 24fd0794..23131dc1 100644 --- a/ambd/core.cpp +++ b/ambd/core.cpp @@ -182,6 +182,7 @@ void Core::setProperty(VehicleProperty::Property property, boost::any value) void Core::subscribeToProperty(VehicleProperty::Property property, AbstractSink* self) { + printf("Subscribing\n"); if(!ListPlusPlus<VehicleProperty::Property>(&mMasterPropertyList).contains((property))) { DebugOut()<<__FUNCTION__<<"(): property not supported: "<<property<<endl; diff --git a/plugins/exampleplugin.cpp b/plugins/exampleplugin.cpp index 6563214b..f5e815ef 100644 --- a/plugins/exampleplugin.cpp +++ b/plugins/exampleplugin.cpp @@ -45,7 +45,6 @@ ExampleSourcePlugin::ExampleSourcePlugin(AbstractRoutingEngine* re) :AbstractSource(re), velocity(0), engineSpeed(0) { re->setSupported(supported(), this); - debugOut("setting timeout"); g_timeout_add(1000, timeoutCallback, this ); diff --git a/plugins/examplesink.cpp b/plugins/examplesink.cpp index b127ab2b..950c839a 100644 --- a/plugins/examplesink.cpp +++ b/plugins/examplesink.cpp @@ -36,6 +36,7 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine): AbstractSink(engine) velocityRequest.completed = [](AsyncPropertyReply* reply) { DebugOut()<<"Velocity Async request completed: "<<boost::any_cast<uint16_t>(reply->value)<<endl; }; AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest); + routingEngine->registerSink(this); } @@ -46,6 +47,7 @@ PropertyList ExampleSink::subscriptions() void ExampleSink::supportedChanged(PropertyList supportedProperties) { + printf("Support changed!\n"); routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this); routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this); } diff --git a/plugins/websocketsourceplugin/websocketsource.cpp b/plugins/websocketsourceplugin/websocketsource.cpp index f81d14c0..cae3010a 100644 --- a/plugins/websocketsourceplugin/websocketsource.cpp +++ b/plugins/websocketsourceplugin/websocketsource.cpp @@ -18,17 +18,46 @@ #include "websocketsource.h" -#include <libwebsockets.h> #include <iostream> #include <boost/assert.hpp> #include <boost/lexical_cast.hpp> #include <glib.h> #include <sstream> #include <json-glib/json-glib.h> +#include <listplusplus.h> #include "debugout.h" #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1) libwebsocket_context *context; +WebSocketSource *source; AbstractRoutingEngine *m_re; + + + +void WebSocketSource::checkSubscriptions() +{ + PropertyList notSupportedList; + while (queuedRequests.size() > 0) + { + VehicleProperty::Property prop = queuedRequests.front(); + queuedRequests.pop_front(); + if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop)) + { + return; + } + activeRequests.push_back(prop); + stringstream s; + s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << prop << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + + string replystr = s.str(); + printf("Reply: %s\n",replystr.c_str()); + + char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; + new_response+=LWS_SEND_BUFFER_PRE_PADDING; + strcpy(new_response,replystr.c_str()); +libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); + } +} + bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data) { //This is the polling function. If it return false, glib will stop polling this FD. @@ -58,7 +87,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING]; int l; - printf("Switch: %i\n",reason); + //printf("Switch: %i\n",reason); switch (reason) { case LWS_CALLBACK_CLOSED: @@ -69,9 +98,12 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket case LWS_CALLBACK_CLIENT_ESTABLISHED: { + source->clientConnected = true; + source->checkSubscriptions(); printf("Incoming connection!\n"); + stringstream s; - s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << "VehicleSpeed" << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; string replystr = s.str(); printf("Reply: %s\n",replystr.c_str()); @@ -193,6 +225,22 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket } } } + else if (type == "methodReply") + { + if (name == "getSupportedEventTypes") + { + PropertyList props; + while (data.size() > 0) + { + string val = data.front(); + data.pop_front(); + props.push_back(val); + + } + source->setSupported(props); + //m_re->updateSupported(m_supportedProperties,PropertyList()); + } + } break; } case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED: @@ -212,6 +260,12 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket return 0; } } +void WebSocketSource::setSupported(PropertyList list) +{ + m_supportedProperties = list; + m_re->updateSupported(list,PropertyList()); +} + static struct libwebsocket_protocols protocols[] = { { "http-only", @@ -226,6 +280,8 @@ static struct libwebsocket_protocols protocols[] = { }; WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re) { + clientConnected = false; + source = this; m_re = re; context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0); @@ -243,6 +299,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re) { g_error_free(error); DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON"; + return; } } } @@ -282,47 +339,19 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re) int port = json_reader_get_int_value(reader); json_reader_end_member(reader); printf("Connecting to %s on port %i\n",ip.c_str(),port); - libwebsocket *wsi_dumb = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1); + clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1); json_reader_end_element(reader); } } - else - { - //string path = json_reader_get_string_value(reader); - //if (path != "") - //{ - // data.push_back(path); - //} - } json_reader_end_member(reader); - /*string type; - json_reader_read_member(reader,"type"); - type = json_reader_get_string_value(reader); - json_reader_end_member(reader); - - string name; - json_reader_read_member(reader,"name"); - name = json_reader_get_string_value(reader); - json_reader_end_member(reader); - */ - re->setSupported(supported(), this); } PropertyList WebSocketSource::supported() { - PropertyList props; - props.push_back(VehicleProperty::EngineSpeed); - props.push_back(VehicleProperty::VehicleSpeed); - props.push_back(VehicleProperty::AccelerationX); - props.push_back(VehicleProperty::TransmissionShiftPosition); - props.push_back(VehicleProperty::SteeringWheelAngle); - props.push_back(VehicleProperty::ThrottlePosition); - props.push_back(VehicleProperty::EngineCoolantTemperature); - - return props; + return m_supportedProperties; } extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine) { @@ -346,16 +375,25 @@ boost::any WebSocketSource::getProperty(VehicleProperty::Property property) } void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property) { - printf("Subscribed to property: %s\n",property.c_str()); - mRequests.push_back(property); + //printf("Subscribed to property: %s\n",property.c_str()); + queuedRequests.push_back(property); + if (clientConnected) + { + checkSubscriptions(); + } } void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property) { - mRequests.remove(property); + removeRequests.push_back(property); + if (clientConnected) + { + checkSubscriptions(); + } } + void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply) { /*if(reply->property == VehicleProperty::VehicleSpeed) @@ -389,62 +427,3 @@ void WebSocketSource::setProperty(VehicleProperty::Property , boost::any ) { } - -/*void ExampleSourcePlugin::getPropertyAsync(AsyncPropertyReply *reply) -{ - if(reply->property == VehicleProperty::VehicleSpeed) - { - reply->value = velocity; - reply->completed(reply); - } - else if(reply->property == VehicleProperty::EngineSpeed) - { - reply->value = engineSpeed; - reply->completed(reply); - } - else if(reply->property == VehicleProperty::AccelerationX) - { - reply->value = accelerationX; - reply->completed(reply); - } - else if(reply->property == VehicleProperty::TransmissionShiftPosition) - { - reply->value = transmissionShiftPostion; - reply->completed(reply); - } - else if(reply->property == VehicleProperty::SteeringWheelAngle) - { - reply->value = steeringWheelAngle; - reply->completed(reply); - } -} - -void ExampleSourcePlugin::setProperty(VehicleProperty::Property , boost::any ) -{ - -} - - - -void ExampleSourcePlugin::randomizeProperties() -{ - velocity = 1 + (255.00 * (rand() / (RAND_MAX + 1.0))); - engineSpeed = 1 + (15000.00 * (rand() / (RAND_MAX + 1.0))); - accelerationX = 1 + (15000.00 * (rand() / (RAND_MAX + 1.0))); - transmissionShiftPostion = 1 + (6.00 * (rand() / (RAND_MAX + 1.0))); - steeringWheelAngle = 1 + (359.00 * (rand() / (RAND_MAX + 1.0))); - throttlePos = 1 + (100.00 * (rand() / (RAND_MAX + 1.0))); - engineCoolant = 1 + (40.00 * (rand() / (RAND_MAX + 140.0))); - - DebugOut()<<"setting velocity to: "<<velocity<<endl; - DebugOut()<<"setting enginespeed to: "<<engineSpeed<<endl; - - routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity); - routingEngine->updateProperty(VehicleProperty::EngineSpeed, engineSpeed); - routingEngine->updateProperty(VehicleProperty::AccelerationX, accelerationX); - routingEngine->updateProperty(VehicleProperty::SteeringWheelAngle, steeringWheelAngle); - routingEngine->updateProperty(VehicleProperty::TransmissionShiftPosition, transmissionShiftPostion); - routingEngine->updateProperty(VehicleProperty::ThrottlePosition, throttlePos); - routingEngine->updateProperty(VehicleProperty::EngineCoolantTemperature, engineCoolant); -} -*/
\ No newline at end of file diff --git a/plugins/websocketsourceplugin/websocketsource.h b/plugins/websocketsourceplugin/websocketsource.h index 791f15a6..b7d19676 100644 --- a/plugins/websocketsourceplugin/websocketsource.h +++ b/plugins/websocketsourceplugin/websocketsource.h @@ -25,6 +25,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #include <abstractsource.h> #include <string> +#include <libwebsockets.h> class WebSocketSource : public AbstractSource @@ -39,14 +40,20 @@ public: void subscribeToPropertyChanges(VehicleProperty::Property property); void unsubscribeToPropertyChanges(VehicleProperty::Property property); PropertyList supported(); - + libwebsocket *clientsocket; + PropertyList queuedRequests; + bool clientConnected; + void checkSubscriptions(); + PropertyList activeRequests; + PropertyList removeRequests; + void setSupported(PropertyList list); void propertyChanged(VehicleProperty::Property property, boost::any value, string uuid) {} void supportedChanged(PropertyList) {} //void randomizeProperties(); private: - -PropertyList mRequests; + PropertyList m_supportedProperties; + }; #endif // WEBSOCKETSOURCE_H |