summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Carpenter <malcom2073@gmail.com>2012-08-28 11:08:04 -0400
committerMichael Carpenter <malcom2073@gmail.com>2012-08-28 13:48:28 -0400
commit8e23d92eb5fb77054c9f1b6ef81d5b12cf1908ae (patch)
tree80cec5d25c56866a92a8a76126d3cacf92364e70
parent2cdaf8803d25b4693a9bf8d145e37ffeac295794 (diff)
downloadautomotive-message-broker-8e23d92eb5fb77054c9f1b6ef81d5b12cf1908ae.tar.gz
Fixes for issues concerning subscriptions and websocketsource
-rw-r--r--ambd/core.cpp1
-rw-r--r--plugins/exampleplugin.cpp1
-rw-r--r--plugins/examplesink.cpp2
-rw-r--r--plugins/websocketsourceplugin/websocketsource.cpp169
-rw-r--r--plugins/websocketsourceplugin/websocketsource.h13
5 files changed, 87 insertions, 99 deletions
diff --git a/ambd/core.cpp b/ambd/core.cpp
index 24fd0794..23131dc1 100644
--- a/ambd/core.cpp
+++ b/ambd/core.cpp
@@ -182,6 +182,7 @@ void Core::setProperty(VehicleProperty::Property property, boost::any value)
void Core::subscribeToProperty(VehicleProperty::Property property, AbstractSink* self)
{
+ printf("Subscribing\n");
if(!ListPlusPlus<VehicleProperty::Property>(&mMasterPropertyList).contains((property)))
{
DebugOut()<<__FUNCTION__<<"(): property not supported: "<<property<<endl;
diff --git a/plugins/exampleplugin.cpp b/plugins/exampleplugin.cpp
index 6563214b..f5e815ef 100644
--- a/plugins/exampleplugin.cpp
+++ b/plugins/exampleplugin.cpp
@@ -45,7 +45,6 @@ ExampleSourcePlugin::ExampleSourcePlugin(AbstractRoutingEngine* re)
:AbstractSource(re), velocity(0), engineSpeed(0)
{
re->setSupported(supported(), this);
-
debugOut("setting timeout");
g_timeout_add(1000, timeoutCallback, this );
diff --git a/plugins/examplesink.cpp b/plugins/examplesink.cpp
index b127ab2b..950c839a 100644
--- a/plugins/examplesink.cpp
+++ b/plugins/examplesink.cpp
@@ -36,6 +36,7 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine): AbstractSink(engine)
velocityRequest.completed = [](AsyncPropertyReply* reply) { DebugOut()<<"Velocity Async request completed: "<<boost::any_cast<uint16_t>(reply->value)<<endl; };
AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
+ routingEngine->registerSink(this);
}
@@ -46,6 +47,7 @@ PropertyList ExampleSink::subscriptions()
void ExampleSink::supportedChanged(PropertyList supportedProperties)
{
+ printf("Support changed!\n");
routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this);
routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this);
}
diff --git a/plugins/websocketsourceplugin/websocketsource.cpp b/plugins/websocketsourceplugin/websocketsource.cpp
index f81d14c0..cae3010a 100644
--- a/plugins/websocketsourceplugin/websocketsource.cpp
+++ b/plugins/websocketsourceplugin/websocketsource.cpp
@@ -18,17 +18,46 @@
#include "websocketsource.h"
-#include <libwebsockets.h>
#include <iostream>
#include <boost/assert.hpp>
#include <boost/lexical_cast.hpp>
#include <glib.h>
#include <sstream>
#include <json-glib/json-glib.h>
+#include <listplusplus.h>
#include "debugout.h"
#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
libwebsocket_context *context;
+WebSocketSource *source;
AbstractRoutingEngine *m_re;
+
+
+
+void WebSocketSource::checkSubscriptions()
+{
+ PropertyList notSupportedList;
+ while (queuedRequests.size() > 0)
+ {
+ VehicleProperty::Property prop = queuedRequests.front();
+ queuedRequests.pop_front();
+ if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
+ {
+ return;
+ }
+ activeRequests.push_back(prop);
+ stringstream s;
+ s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << prop << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+
+ string replystr = s.str();
+ printf("Reply: %s\n",replystr.c_str());
+
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ }
+}
+
bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
{
//This is the polling function. If it return false, glib will stop polling this FD.
@@ -58,7 +87,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 +
LWS_SEND_BUFFER_POST_PADDING];
int l;
- printf("Switch: %i\n",reason);
+ //printf("Switch: %i\n",reason);
switch (reason) {
case LWS_CALLBACK_CLOSED:
@@ -69,9 +98,12 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
case LWS_CALLBACK_CLIENT_ESTABLISHED:
{
+ source->clientConnected = true;
+ source->checkSubscriptions();
printf("Incoming connection!\n");
+
stringstream s;
- s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << "VehicleSpeed" << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+ s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
string replystr = s.str();
printf("Reply: %s\n",replystr.c_str());
@@ -193,6 +225,22 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
}
}
}
+ else if (type == "methodReply")
+ {
+ if (name == "getSupportedEventTypes")
+ {
+ PropertyList props;
+ while (data.size() > 0)
+ {
+ string val = data.front();
+ data.pop_front();
+ props.push_back(val);
+
+ }
+ source->setSupported(props);
+ //m_re->updateSupported(m_supportedProperties,PropertyList());
+ }
+ }
break;
}
case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
@@ -212,6 +260,12 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
return 0;
}
}
+void WebSocketSource::setSupported(PropertyList list)
+{
+ m_supportedProperties = list;
+ m_re->updateSupported(list,PropertyList());
+}
+
static struct libwebsocket_protocols protocols[] = {
{
"http-only",
@@ -226,6 +280,8 @@ static struct libwebsocket_protocols protocols[] = {
};
WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re)
{
+ clientConnected = false;
+ source = this;
m_re = re;
context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
@@ -243,6 +299,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re)
{
g_error_free(error);
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON";
+ return;
}
}
}
@@ -282,47 +339,19 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re)
int port = json_reader_get_int_value(reader);
json_reader_end_member(reader);
printf("Connecting to %s on port %i\n",ip.c_str(),port);
- libwebsocket *wsi_dumb = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1);
+ clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1);
json_reader_end_element(reader);
}
}
- else
- {
- //string path = json_reader_get_string_value(reader);
- //if (path != "")
- //{
- // data.push_back(path);
- //}
- }
json_reader_end_member(reader);
- /*string type;
- json_reader_read_member(reader,"type");
- type = json_reader_get_string_value(reader);
- json_reader_end_member(reader);
-
- string name;
- json_reader_read_member(reader,"name");
- name = json_reader_get_string_value(reader);
- json_reader_end_member(reader);
- */
-
re->setSupported(supported(), this);
}
PropertyList WebSocketSource::supported()
{
- PropertyList props;
- props.push_back(VehicleProperty::EngineSpeed);
- props.push_back(VehicleProperty::VehicleSpeed);
- props.push_back(VehicleProperty::AccelerationX);
- props.push_back(VehicleProperty::TransmissionShiftPosition);
- props.push_back(VehicleProperty::SteeringWheelAngle);
- props.push_back(VehicleProperty::ThrottlePosition);
- props.push_back(VehicleProperty::EngineCoolantTemperature);
-
- return props;
+ return m_supportedProperties;
}
extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine)
{
@@ -346,16 +375,25 @@ boost::any WebSocketSource::getProperty(VehicleProperty::Property property)
}
void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
{
- printf("Subscribed to property: %s\n",property.c_str());
- mRequests.push_back(property);
+ //printf("Subscribed to property: %s\n",property.c_str());
+ queuedRequests.push_back(property);
+ if (clientConnected)
+ {
+ checkSubscriptions();
+ }
}
void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
{
- mRequests.remove(property);
+ removeRequests.push_back(property);
+ if (clientConnected)
+ {
+ checkSubscriptions();
+ }
}
+
void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
{
/*if(reply->property == VehicleProperty::VehicleSpeed)
@@ -389,62 +427,3 @@ void WebSocketSource::setProperty(VehicleProperty::Property , boost::any )
{
}
-
-/*void ExampleSourcePlugin::getPropertyAsync(AsyncPropertyReply *reply)
-{
- if(reply->property == VehicleProperty::VehicleSpeed)
- {
- reply->value = velocity;
- reply->completed(reply);
- }
- else if(reply->property == VehicleProperty::EngineSpeed)
- {
- reply->value = engineSpeed;
- reply->completed(reply);
- }
- else if(reply->property == VehicleProperty::AccelerationX)
- {
- reply->value = accelerationX;
- reply->completed(reply);
- }
- else if(reply->property == VehicleProperty::TransmissionShiftPosition)
- {
- reply->value = transmissionShiftPostion;
- reply->completed(reply);
- }
- else if(reply->property == VehicleProperty::SteeringWheelAngle)
- {
- reply->value = steeringWheelAngle;
- reply->completed(reply);
- }
-}
-
-void ExampleSourcePlugin::setProperty(VehicleProperty::Property , boost::any )
-{
-
-}
-
-
-
-void ExampleSourcePlugin::randomizeProperties()
-{
- velocity = 1 + (255.00 * (rand() / (RAND_MAX + 1.0)));
- engineSpeed = 1 + (15000.00 * (rand() / (RAND_MAX + 1.0)));
- accelerationX = 1 + (15000.00 * (rand() / (RAND_MAX + 1.0)));
- transmissionShiftPostion = 1 + (6.00 * (rand() / (RAND_MAX + 1.0)));
- steeringWheelAngle = 1 + (359.00 * (rand() / (RAND_MAX + 1.0)));
- throttlePos = 1 + (100.00 * (rand() / (RAND_MAX + 1.0)));
- engineCoolant = 1 + (40.00 * (rand() / (RAND_MAX + 140.0)));
-
- DebugOut()<<"setting velocity to: "<<velocity<<endl;
- DebugOut()<<"setting enginespeed to: "<<engineSpeed<<endl;
-
- routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
- routingEngine->updateProperty(VehicleProperty::EngineSpeed, engineSpeed);
- routingEngine->updateProperty(VehicleProperty::AccelerationX, accelerationX);
- routingEngine->updateProperty(VehicleProperty::SteeringWheelAngle, steeringWheelAngle);
- routingEngine->updateProperty(VehicleProperty::TransmissionShiftPosition, transmissionShiftPostion);
- routingEngine->updateProperty(VehicleProperty::ThrottlePosition, throttlePos);
- routingEngine->updateProperty(VehicleProperty::EngineCoolantTemperature, engineCoolant);
-}
-*/ \ No newline at end of file
diff --git a/plugins/websocketsourceplugin/websocketsource.h b/plugins/websocketsourceplugin/websocketsource.h
index 791f15a6..b7d19676 100644
--- a/plugins/websocketsourceplugin/websocketsource.h
+++ b/plugins/websocketsourceplugin/websocketsource.h
@@ -25,6 +25,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#include <abstractsource.h>
#include <string>
+#include <libwebsockets.h>
class WebSocketSource : public AbstractSource
@@ -39,14 +40,20 @@ public:
void subscribeToPropertyChanges(VehicleProperty::Property property);
void unsubscribeToPropertyChanges(VehicleProperty::Property property);
PropertyList supported();
-
+ libwebsocket *clientsocket;
+ PropertyList queuedRequests;
+ bool clientConnected;
+ void checkSubscriptions();
+ PropertyList activeRequests;
+ PropertyList removeRequests;
+ void setSupported(PropertyList list);
void propertyChanged(VehicleProperty::Property property, boost::any value, string uuid) {}
void supportedChanged(PropertyList) {}
//void randomizeProperties();
private:
-
-PropertyList mRequests;
+ PropertyList m_supportedProperties;
+
};
#endif // WEBSOCKETSOURCE_H