summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Trampedach <tim@timtt.com>2012-10-13 16:29:54 -0700
committerKevron Rees <kevron_m_rees@linux.intel.com>2012-10-26 16:34:47 -0700
commit796e318cc8b0d72bc35a7ab5c3d5f7f650fbe31d (patch)
tree445e9c5ce04c73827d556495f3990e1f2dbc9345
parent47259dda1afd3ce0d22f09c58691cefd8a1b51ff (diff)
downloadautomotive-message-broker-796e318cc8b0d72bc35a7ab5c3d5f7f650fbe31d.tar.gz
merge
-rw-r--r--plugins/websocketsink/websocketsinkmanager.cpp75
1 files changed, 40 insertions, 35 deletions
diff --git a/plugins/websocketsink/websocketsinkmanager.cpp b/plugins/websocketsink/websocketsinkmanager.cpp
index d0b27f8c..38755014 100644
--- a/plugins/websocketsink/websocketsinkmanager.cpp
+++ b/plugins/websocketsink/websocketsinkmanager.cpp
@@ -37,10 +37,10 @@ bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
{
m_engine = engine;
-
-
+
+
//Create a listening socket on port 23000 on localhost.
-
+
}
void WebSocketSinkManager::init()
@@ -56,14 +56,14 @@ void WebSocketSinkManager::setConfiguration(map<string, string> config)
{
// //Config has been passed, let's start stuff up.
configuration = config;
-
+
//Default values
int port = 23000;
std::string interface = "lo";
const char *ssl_cert_path = NULL;
const char *ssl_key_path = NULL;
int options = 0;
-
+
//Try to load config
for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
{
@@ -78,7 +78,7 @@ void WebSocketSinkManager::setConfiguration(map<string, string> config)
port = boost::lexical_cast<int>((*i).second);
}
}
- context = libwebsocket_create_context(port, interface.c_str(), protocollist, ssl_cert_path, ssl_key_path, -1, -1, options);
+ context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
}
void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
{
@@ -111,19 +111,19 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper
DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
return;
}
-
+
}
velocityRequest.completed = [socket,id,property](AsyncPropertyReply* reply)
{
printf("Got property:%s\n",reply->value->toString().c_str());
//uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
stringstream s;
-
+
//TODO: Dirty hack hardcoded stuff, jsut to make it work.
string tmpstr = "";
tmpstr = property;
s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}";
-
+
string replystr = s.str();
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
@@ -132,11 +132,11 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper
new_response+=LWS_SEND_BUFFER_PRE_PADDING;
strcpy(new_response,replystr.c_str());
libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
-
+
//TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
//delete new_response; <- Unneeded. Apparently libwebsocket free's it.
delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
-
+
};
AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
@@ -156,7 +156,7 @@ void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Prop
stringstream s;
s << "{\"type\":\"methodReply\",\"name\":\"unsubscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
-
+
string replystr = s.str();
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
@@ -184,12 +184,12 @@ void WebSocketSinkManager::setValue(string property,string value)
m_engine->setProperty(request);
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
delete type;
-
+
}
void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
{
stringstream s;
-
+
//TODO: Dirty hack hardcoded stuff, jsut to make it work.
string tmpstr = "";
if (property == "running_status_speedometer")
@@ -220,10 +220,10 @@ void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Proper
//Invalid property requested.
return;
}
-
+
}
s << "{\"type\":\"methodReply\",\"name\":\"subscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
-
+
string replystr = s.str();
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
@@ -314,7 +314,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
{
//printf("Switch: %i\n",reason);
-
+
switch (reason)
{
case LWS_CALLBACK_CLIENT_WRITEABLE:
@@ -341,6 +341,12 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
//printf("Client writable\n");
break;
}
+ case LWS_CALLBACK_SERVER_WRITEABLE:
+ {
+ //printf("Server writable\n");
+ break;
+ }
+
case LWS_CALLBACK_RECEIVE:
{
//printf("Data Received: %s\n",(char*)in);
@@ -352,15 +358,15 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
//TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
GError* error = nullptr;
-
-
+
+
JsonParser* parser = json_parser_new();
if (!json_parser_load_from_data(parser,(char*)in,len,&error))
{
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
return 0;
}
-
+
JsonNode* node = json_parser_get_root(parser);
if(node == nullptr)
{
@@ -368,7 +374,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
//throw std::runtime_error("Unable to get JSON root object");
return 0;
}
-
+
JsonReader* reader = json_reader_new(node);
if(reader == nullptr)
{
@@ -376,16 +382,16 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
//throw std::runtime_error("Unable to create JSON reader");
return 0;
}
-
-
-
-
-
+
+
+
+
+
string type;
json_reader_read_member(reader,"type");
type = json_reader_get_string_value(reader);
json_reader_end_member(reader);
-
+
string name;
json_reader_read_member(reader,"name");
name = json_reader_get_string_value(reader);
@@ -405,7 +411,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
//Raw string value
string path = json_reader_get_string_value(reader);
data.push_back(path);
-
+
}
else
{
@@ -431,7 +437,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
}
}
json_reader_end_member(reader);
-
+
string id;
json_reader_read_member(reader,"transactionid");
if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
@@ -447,15 +453,15 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
id = strstr.str();
}
json_reader_end_member(reader);
-
+
///TODO: this will probably explode:
//mlc: I agree with Kevron here, it does explode.
//if(error) g_error_free(error);
-
+
g_object_unref(reader);
g_object_unref(parser);
-
-
+
+
if (type == "method")
{
if (name == "get")
@@ -518,7 +524,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
//(*d);
d++;
}
-
+
}
}
}
@@ -658,4 +664,3 @@ bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
libwebsocket_service_fd(context,&pollstruct);
return true;
}
-