summaryrefslogtreecommitdiff
path: root/plugins/cangenplugin/websockets.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/cangenplugin/websockets.cpp')
-rw-r--r--plugins/cangenplugin/websockets.cpp251
1 files changed, 251 insertions, 0 deletions
diff --git a/plugins/cangenplugin/websockets.cpp b/plugins/cangenplugin/websockets.cpp
new file mode 100644
index 00000000..bafbe16d
--- /dev/null
+++ b/plugins/cangenplugin/websockets.cpp
@@ -0,0 +1,251 @@
+/*
+Copyright (C) 2012 Intel Corporation
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "websockets.h"
+#include <json/json.h>
+#include <json/json_object.h>
+#include <json/json_tokener.h>
+#include <vehicleproperty.h>
+#include <listplusplus.h>
+
+#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
+
+static int websocket_callback(
+ libwebsocket_context *context, libwebsocket *wsi, libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
+{
+ WebSockets *ws = static_cast<WebSockets*>(libwebsocket_context_user (context));
+ WebSocketsObserver* observer = &ws->getObserver();
+ DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ <<
+ "websocket_callback:" << reason << " ,user:" << reinterpret_cast<uint64_t>(user) << endl;
+ if(!observer)
+ return 0;
+
+ switch (reason)
+ {
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ {
+ //Connection has been established.
+ //printf("Connection established\n");
+ break;
+ }
+ case LWS_CALLBACK_CLOSED:
+ {
+ //Connection is closed, we need to remove all related sinks
+ //sinkManager->disconnectAll(wsi);
+ break;
+ }
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ {
+ //printf("Client writable\n");
+ break;
+ }
+ case LWS_CALLBACK_SERVER_WRITEABLE:
+ {
+ //printf("Server writable\n");
+ break;
+ }
+
+ case LWS_CALLBACK_RECEIVE:
+ {
+ //printf("Data Received: %s\n",(char*)in);
+ //The lack of a break; here is intentional.
+ }
+ case LWS_CALLBACK_HTTP:
+ {
+ //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
+ const char *data = static_cast<const char*>(in);
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << data << "\n";
+ observer->dataReceived(wsi, data, len);
+ break;
+ }
+ case LWS_CALLBACK_ADD_POLL_FD:
+ {
+ //printf("Adding poll %i\n",sinkManager);
+ DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
+ ws->addPoll(libwebsocket_get_socket_fd(wsi));
+ break;
+ }
+ case LWS_CALLBACK_DEL_POLL_FD:
+ {
+ ws->removePoll(libwebsocket_get_socket_fd(wsi));
+ break;
+ }
+ case LWS_CALLBACK_SET_MODE_POLL_FD:
+ {
+ //Set the poll mode
+ break;
+ }
+ case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
+ {
+ //Don't handle this yet.
+ break;
+ }
+ default:
+ {
+ //printf("Unhandled callback: %i\n",reason);
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
+ break;
+ }
+ }
+ return 0;
+}
+
+static bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ //DebugOut(5) << "Polling..." << condition << endl;
+ WebSockets* ws = static_cast<WebSockets*>(data);
+
+ if(condition & G_IO_ERR)
+ {
+ DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
+ }
+
+ if (condition & G_IO_HUP)
+ {
+ //Hang up. Returning false closes out the GIOChannel.
+ //printf("Callback on G_IO_HUP\n");
+ DebugOut(0)<<"socket hangup event..."<<endl;
+ if(ws)
+ ws->removePoll(g_io_channel_unix_get_fd(source));
+ return false;
+ }
+
+ //This is the polling function. If it return false, glib will stop polling this FD.
+ //printf("Polling...%i\n",condition);
+
+ lws_tokens token;
+ struct pollfd pollstruct;
+ int newfd = g_io_channel_unix_get_fd(source);
+ pollstruct.fd = newfd;
+ pollstruct.events = condition;
+ pollstruct.revents = condition;
+ //libwebsocket_context* context = sinkManager->lwscontext();
+ libwebsocket_service_fd(static_cast<WebSockets*>(data)->getContext(),&pollstruct);
+
+ if(pollstruct.revents & G_IO_ERR)
+ {
+ DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
+ }
+
+ if (pollstruct.revents & G_IO_HUP)
+ {
+ //Hang up. Returning false closes out the GIOChannel.
+ //printf("Callback on G_IO_HUP\n");
+ DebugOut(0)<<"socket hangup event..."<<endl;
+ if(ws)
+ ws->removePoll(g_io_channel_unix_get_fd(source));
+ return false;
+ }
+ return true;
+}
+
+WebSockets::WebSockets(WebSocketsObserver& observer, Type t, int port, string ip) :
+ observer(observer),
+ protocollist({ { "http-only", websocket_callback, 0 }, { NULL, NULL, 0 } }),
+ context(nullptr, &libwebsocket_context_destroy)
+{
+ //Protocol list for libwebsockets.
+ //protocollist[0] = { "http-only", websocket_callback, 0 };
+ //protocollist[1] = { NULL, NULL, 0 };
+
+ struct lws_context_creation_info info;
+ memset(&info, 0, sizeof info);
+ info.iface = "any";
+ info.protocols = protocollist;
+ info.extensions = libwebsocket_get_internal_extensions();
+ info.gid = -1;
+ info.uid = -1;
+ info.options = 0;
+ info.port = port;
+ info.user = this;
+ //context = libwebsocket_create_context(&info);
+ context = lwsContextPtr(libwebsocket_create_context(&info), &libwebsocket_context_destroy);
+
+ if(t == WebSockets::Client)
+ {
+ libwebsocket_client_connect(context.get(), ip.c_str(), port, false, "/", "localhost", "websocket", protocollist[0].name, -1);
+ }
+}
+
+WebSockets::~WebSockets()
+{
+ scoped_lock<interprocess_recursive_mutex> lock(mutex);
+ context.reset();
+ for (auto it = m_ioChannelMap.begin(); it != m_ioChannelMap.end(); ++it)
+ {
+ int fd = g_io_channel_unix_get_fd(it->second);
+ g_io_channel_shutdown(it->second, false, 0);
+ auto sourceIt = m_ioSourceMap.find(fd);
+ if( sourceIt != m_ioSourceMap.end() ) {
+ g_source_remove(sourceIt->second); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
+ m_ioSourceMap.erase(sourceIt);
+ }
+ }
+ m_ioChannelMap.clear();
+ assert(m_ioSourceMap.empty() == true);
+}
+
+int WebSockets::Write(struct libwebsocket *lws, const std::string& strToWrite)
+{
+ std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+ char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(buf, strToWrite.c_str());
+
+ //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+ return lws ? libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT) : 0;
+}
+
+void WebSockets::addPoll(int fd)
+{
+ scoped_lock<interprocess_recursive_mutex> lock(mutex);
+ GIOChannel *chan = g_io_channel_unix_new(fd);
+ stringstream ss;
+ ss << chan;
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Adding chanel" << ss.str() << endl;
+ guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc, this);
+ ss.str(std::string());
+ ss << sourceid;
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Source is" << ss.str() << endl;
+ g_io_channel_set_close_on_unref(chan,true);
+ g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
+ m_ioChannelMap[fd] = chan;
+ m_ioSourceMap[fd] = sourceid;
+}
+
+void WebSockets::removePoll(int fd)
+{
+ scoped_lock<interprocess_recursive_mutex> lock(mutex);
+ auto channelIt = m_ioChannelMap.find(fd);
+ if( channelIt != m_ioChannelMap.end() ) {
+ stringstream ss;
+ ss << channelIt->second;
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel" << ss.str() << endl;
+ g_io_channel_shutdown(channelIt->second,false,0);
+ m_ioChannelMap.erase(channelIt);
+
+ }
+ auto sourceIt = m_ioSourceMap.find(fd);
+ if( sourceIt != m_ioSourceMap.end() ) {
+ stringstream ss;
+ ss << sourceIt->second;
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source" << ss.str() << endl;
+ g_source_remove(sourceIt->second); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
+ m_ioSourceMap.erase(sourceIt);
+ }
+}