summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTimo Lotterbach <timo.lotterbach@bmw-carit.de>2012-09-10 06:40:22 -0700
committerTimo Lotterbach <timo.lotterbach@bmw-carit.de>2012-10-02 05:08:55 -0700
commit7c6324e390394ec7955e15eb1d134e47e2ea2c27 (patch)
treeb819ba242e45b961b7ceae37780ca512e19d5370
parent6be7fba3b1bf2cce8b17c1e6eda915523b53b612 (diff)
downloadlayer_management-7c6324e390394ec7955e15eb1d134e47e2ea2c27.tar.gz
TcpIpcModule: updated to new IpcModule API, added notification support
Added support for creating and sending notifications. Improved handling of incoming message data structure. removed destroyMessage: was only used for DBusIpcModule, but lifecycle of received messages is now handled internally. The external call is not required any more.
-rw-r--r--LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h17
-rw-r--r--LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h3
-rw-r--r--LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c55
-rw-r--r--LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c45
-rw-r--r--LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c98
-rw-r--r--LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c257
-rw-r--r--LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp2
7 files changed, 333 insertions, 144 deletions
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h
index f2000d9..459a996 100644
--- a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h
+++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h
@@ -28,17 +28,12 @@
#define SOCKET_MAX_MESSAGE_SIZE 1024
#define SOCKET_MAX_PENDING_CONNECTIONS 10
-#define SOCKET_MESSAGE_BUFFER_COUNT 2
-
-#define SOCKET_MESSAGE_TYPE_NORMAL 'n'
-#define SOCKET_MESSAGE_TYPE_ERROR 'e'
-
-#define SOCKET_MESSAGE_TYPE_INT 'i'
-#define SOCKET_MESSAGE_TYPE_UINT 'u'
-#define SOCKET_MESSAGE_TYPE_BOOL 'b'
-#define SOCKET_MESSAGE_TYPE_DOUBLE 'd'
-#define SOCKET_MESSAGE_TYPE_STRING 's'
-#define SOCKET_MESSAGE_TYPE_ARRAY 'a'
+#define SOCKET_MESSAGE_TYPE_INT 'i'
+#define SOCKET_MESSAGE_TYPE_UINT 'u'
+#define SOCKET_MESSAGE_TYPE_BOOL 'b'
+#define SOCKET_MESSAGE_TYPE_DOUBLE 'd'
+#define SOCKET_MESSAGE_TYPE_STRING 's'
+#define SOCKET_MESSAGE_TYPE_ARRAY 'a'
#define ENV_TCP_HOST "LM_TCP_HOST"
#define ENV_TCP_PORT "LM_TCP_PORT"
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h
index cea8c1e..8b3b094 100644
--- a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h
+++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h
@@ -56,9 +56,6 @@ struct State
struct sockaddr_in clientAddrIn;
fd_set monitoredSockets;
int monitoredSocketMax;
- struct SocketMessage outgoingMessage;
- struct SocketMessage incomingMessage[SOCKET_MESSAGE_BUFFER_COUNT];
- t_ilm_int incomingQueueIndex;
};
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c
index a554436..47f4fa7 100644
--- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c
+++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c
@@ -23,10 +23,8 @@
// append simple data types
//-----------------------------------------------------------------------------
-t_ilm_bool appendGenericValue(const char protocolType, const char size, const void* value)
+t_ilm_bool appendGenericValue(struct SocketMessage* msg, const char protocolType, const char size, const void* value)
{
- struct SocketMessage* msg = &gState.outgoingMessage;
-
// size check: is message size reached
if (sizeof(msg->paket) - sizeof(msg->paket.data) // header
+ msg->index + size // + data
@@ -51,41 +49,44 @@ t_ilm_bool appendGenericValue(const char protocolType, const char size, const vo
return ILM_TRUE;
}
-t_ilm_bool appendUint(const t_ilm_uint value)
+t_ilm_bool appendUint(t_ilm_message message, const t_ilm_uint value)
{
- return appendGenericValue(SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), &value);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), &value);
}
-t_ilm_bool appendInt(const t_ilm_int value)
+t_ilm_bool appendInt(t_ilm_message message, const t_ilm_int value)
{
- return appendGenericValue(SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), &value);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), &value);
}
-t_ilm_bool appendBool(const t_ilm_bool value)
+t_ilm_bool appendBool(t_ilm_message message, const t_ilm_bool value)
{
- return appendGenericValue(SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), &value);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), &value);
}
-t_ilm_bool appendDouble(const t_ilm_float value)
+t_ilm_bool appendDouble(t_ilm_message message, const t_ilm_float value)
{
- return appendGenericValue(SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), &value);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), &value);
}
-t_ilm_bool appendString(t_ilm_const_string value)
+t_ilm_bool appendString(t_ilm_message message, t_ilm_const_string value)
{
- return appendGenericValue(SOCKET_MESSAGE_TYPE_STRING, strlen(value), value);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_STRING, strlen(value), value);
}
//-----------------------------------------------------------------------------
// append array data types
//-----------------------------------------------------------------------------
-t_ilm_bool appendGenericArray(const char arraySize, const char protocolType, const char size, const void* value)
+t_ilm_bool appendGenericArray(struct SocketMessage* msg, const char arraySize, const char protocolType, const char size, const void* value)
{
t_ilm_bool result = ILM_TRUE;
- struct SocketMessage* msg = &gState.outgoingMessage;
-
// TODO: size check: is message size reached?
// append array type
@@ -100,30 +101,34 @@ t_ilm_bool appendGenericArray(const char arraySize, const char protocolType, con
char i = 0;
for (i = 0; i < arraySize; ++i)
{
- result &= appendGenericValue(protocolType, size, value + i * size);
+ result &= appendGenericValue(msg, protocolType, size, value + i * size);
}
return result;
}
-t_ilm_bool appendUintArray(const t_ilm_uint* valueArray, t_ilm_int arraySize)
+t_ilm_bool appendUintArray(t_ilm_message message, const t_ilm_uint* valueArray, t_ilm_int arraySize)
{
- return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), valueArray);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), valueArray);
}
-t_ilm_bool appendIntArray(const t_ilm_int* valueArray, t_ilm_int arraySize)
+t_ilm_bool appendIntArray(t_ilm_message message, const t_ilm_int* valueArray, t_ilm_int arraySize)
{
- return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), valueArray);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), valueArray);
}
-t_ilm_bool appendBoolArray(const t_ilm_bool* valueArray, t_ilm_int arraySize)
+t_ilm_bool appendBoolArray(t_ilm_message message, const t_ilm_bool* valueArray, t_ilm_int arraySize)
{
- return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), valueArray);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), valueArray);
}
-t_ilm_bool appendDoubleArray(const t_ilm_float* valueArray, t_ilm_int arraySize)
+t_ilm_bool appendDoubleArray(t_ilm_message message, const t_ilm_float* valueArray, t_ilm_int arraySize)
{
- return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), valueArray);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), valueArray);
}
// TODO appendStringArray()
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c
index 1917c10..6cfa56a 100644
--- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c
+++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c
@@ -25,12 +25,10 @@
// get simple data types
//-----------------------------------------------------------------------------
-t_ilm_bool getGenericValue(void* value, const char protocolType, const char expectedSize)
+t_ilm_bool getGenericValue(struct SocketMessage* msg, void* value, const char protocolType, const char expectedSize)
{
t_ilm_bool result = ILM_FALSE;
- struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex];
-
// get protocol value from message
char readType = msg->paket.data[msg->index];
msg->index += sizeof(readType);
@@ -75,41 +73,44 @@ t_ilm_bool getGenericValue(void* value, const char protocolType, const char expe
return ILM_TRUE;
}
-t_ilm_bool getUint(t_ilm_uint* value)
+t_ilm_bool getUint(t_ilm_message message, t_ilm_uint* value)
{
- return getGenericValue(value, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint));
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint));
}
-t_ilm_bool getInt(t_ilm_int* value)
+t_ilm_bool getInt(t_ilm_message message, t_ilm_int* value)
{
- return getGenericValue(value, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int));
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int));
}
-t_ilm_bool getBool(t_ilm_bool* value)
+t_ilm_bool getBool(t_ilm_message message, t_ilm_bool* value)
{
- return getGenericValue(value, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool));
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool));
}
-t_ilm_bool getDouble(t_ilm_float* value)
+t_ilm_bool getDouble(t_ilm_message message, t_ilm_float* value)
{
- return getGenericValue(value, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float));
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float));
}
-t_ilm_bool getString(char* value)
+t_ilm_bool getString(t_ilm_message message, char* value)
{
- return getGenericValue(value, SOCKET_MESSAGE_TYPE_STRING, sizeof(t_ilm_const_string));
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_STRING, sizeof(t_ilm_const_string));
}
//-----------------------------------------------------------------------------
// get array data types
//-----------------------------------------------------------------------------
-t_ilm_bool getGenericArray(t_ilm_int* arraySize, void** value, const char protocolType, const char expectedSize)
+t_ilm_bool getGenericArray(struct SocketMessage* msg, t_ilm_int* arraySize, void** value, const char protocolType, const char expectedSize)
{
t_ilm_bool result = ILM_TRUE;
- struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex];
-
// get protocol value from message
char readType = msg->paket.data[msg->index];
msg->index += sizeof(readType);
@@ -134,19 +135,21 @@ t_ilm_bool getGenericArray(t_ilm_int* arraySize, void** value, const char protoc
char i = 0;
for (i = 0; i < *arraySize; ++i)
{
- result &= getGenericValue(((*value) + expectedSize * i), protocolType, expectedSize);
+ result &= getGenericValue(msg, ((*value) + expectedSize * i), protocolType, expectedSize);
}
return result;
}
-t_ilm_bool getIntArray(t_ilm_int** valueArray, t_ilm_int* arraySize)
+t_ilm_bool getIntArray(t_ilm_message message, t_ilm_int** valueArray, t_ilm_int* arraySize)
{
- return getGenericArray(arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int));
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return getGenericArray(msg, arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int));
}
-t_ilm_bool getUintArray(t_ilm_uint** valueArray, t_ilm_int* arraySize)
+t_ilm_bool getUintArray(t_ilm_message message, t_ilm_uint** valueArray, t_ilm_int* arraySize)
{
- return getGenericArray(arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint));
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return getGenericArray(msg, arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint));
}
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c
index e7c3f96..eb531c2 100644
--- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c
+++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c
@@ -24,14 +24,108 @@
#include <signal.h>
-t_ilm_bool init(t_ilm_bool isClient)
+t_ilm_bool initServiceMode()
{
// ignore broken pipe, if clients disconnect, handled in receive()
signal(SIGPIPE, SIG_IGN);
+ t_ilm_bool isClient = ILM_FALSE;
+
t_ilm_bool result = ILM_TRUE;
- gState.incomingQueueIndex = SOCKET_MESSAGE_BUFFER_COUNT - 1;
+ gState.isClient = isClient;
+
+ gState.socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+
+ if (gState.socket < 0)
+ {
+ printf("TcpIpcModule: socket()...failed\n");
+ result = ILM_FALSE;
+ }
+
+ const char* portString = getenv(ENV_TCP_PORT);
+ int port = SOCKET_TCP_PORT;
+ if (portString)
+ {
+ port = atoi(portString);
+ }
+
+ gState.serverAddrIn.sin_family = AF_INET;
+ gState.serverAddrIn.sin_port = htons(port);
+ memset(&(gState.serverAddrIn.sin_zero), '\0', 8);
+
+ if (gState.isClient) // Client
+ {
+ const char* hostname = getenv(ENV_TCP_HOST);
+ if (!hostname)
+ {
+ hostname = SOCKET_TCP_HOST;
+ }
+
+ struct hostent* server;
+ server = gethostbyname(hostname);
+ if (!server)
+ {
+ printf("TcpIpcModule: error: could not resolve host '%s'.\n", hostname);
+ result = ILM_FALSE;
+ }
+ else
+ {
+ memcpy(&gState.serverAddrIn.sin_addr.s_addr, server->h_addr, server->h_length);
+ }
+
+ if (0 != connect(gState.socket,
+ (struct sockaddr *) &gState.serverAddrIn,
+ sizeof(gState.serverAddrIn)))
+ {
+ result = ILM_FALSE;
+ }
+
+ printf("TcpIpcModule: connection to %s:%d %s.\n",
+ hostname, port,
+ (ILM_TRUE == result) ? "established" : "failed");
+
+ FD_SET(gState.socket, &gState.monitoredSockets);
+ gState.monitoredSocketMax = gState.socket;
+ }
+ else // LayerManagerService
+ {
+ int on = 1;
+ setsockopt(gState.socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+
+ gState.serverAddrIn.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (0 > bind(gState.socket,
+ (struct sockaddr *) &gState.serverAddrIn,
+ sizeof(gState.serverAddrIn)))
+ {
+ printf("TcpIpcModule: bind()...failed\n");
+ result = ILM_FALSE;
+ }
+
+ if (listen(gState.socket, SOCKET_MAX_PENDING_CONNECTIONS) < 0)
+ {
+ printf("TcpIpcModule: listen()...failed\n");
+ result = ILM_FALSE;
+ }
+
+ FD_SET(gState.socket, &gState.monitoredSockets);
+ gState.monitoredSocketMax = gState.socket;
+
+ printf("TcpIpcModule: listening to TCP port: %d\n", port);
+ }
+
+ return result;
+}
+
+t_ilm_bool initClientMode()
+{
+ // ignore broken pipe, if clients disconnect, handled in receive()
+ signal(SIGPIPE, SIG_IGN);
+
+ t_ilm_bool isClient = ILM_TRUE;
+
+ t_ilm_bool result = ILM_TRUE;
gState.isClient = isClient;
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c
index ffdc86d..ffa26fa 100644
--- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c
+++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c
@@ -21,102 +21,164 @@
#include <errno.h>
#include <stdio.h>
#include <sys/select.h>
+#include <stdlib.h>
//=============================================================================
// prototypes
//=============================================================================
t_ilm_bool acceptClientConnection();
-void receiveFromSocket(int socketNumber);
-
+t_ilm_bool sendToSocket(struct SocketMessage* msg, int socketNumber);
+void receiveFromSocket(struct SocketMessage* msg, int socketNumber);
//=============================================================================
-// message handling
+// incoming queue handling (one select may return more than one active
+// descriptor, but receive must only return one message at a time.
+// all messages are first received and added to this queue, so no
+// messages get lost
//=============================================================================
-t_ilm_bool createMessage(t_ilm_const_string name)
+struct QueueElement
{
- memset(&gState.outgoingMessage, 0, sizeof(gState.outgoingMessage));
+ struct SocketMessage* data;
+ struct QueueElement* next;
+};
- gState.outgoingMessage.paket.type = SOCKET_MESSAGE_TYPE_NORMAL;
- gState.outgoingMessage.index = 0;
+static struct QueueElement* oldest = NULL;
+static struct QueueElement* latest = NULL;
- return appendString(name);
+void addToIncomingQueue(struct SocketMessage* data)
+{
+ struct QueueElement* newMessage = (struct QueueElement*)malloc(sizeof(struct QueueElement));
+ newMessage->data = data;
+ newMessage->next = NULL;
+ if (!oldest)
+ {
+ oldest = latest = newMessage;
+ }
+ else
+ {
+ latest->next = newMessage;
+ latest = latest->next;
+ }
}
-t_ilm_bool destroyMessage()
+struct SocketMessage* getFromIncomingQueue()
{
- t_ilm_bool returnValue = ILM_TRUE;
-
- /* to be implemented if needed */
-
- return returnValue;
+ struct SocketMessage* data = NULL;
+ if (oldest)
+ {
+ data = oldest->data;
+ struct QueueElement* delPtr = oldest;
+ oldest = oldest->next;
+ free(delPtr);
+ }
+ return data;
}
-t_ilm_bool sendMessage()
+//=============================================================================
+// message handling
+//=============================================================================
+t_ilm_message createMessage(t_ilm_const_string name)
{
- int activesocket = 0;
- int sentBytes = 0;
- int retVal = 0;
+ struct SocketMessage* newMessage = (struct SocketMessage*)malloc(sizeof(struct SocketMessage));
+ newMessage->paket.type = IpcMessageTypeCommand;
+ newMessage->index = 0;
+ appendString(newMessage, name);
+ return (t_ilm_message)newMessage;
+}
- int headerSize = sizeof(gState.outgoingMessage.paket) - sizeof(gState.outgoingMessage.paket.data);
- gState.outgoingMessage.paket.size = gState.outgoingMessage.index + headerSize;
+t_ilm_message createResponse(t_ilm_message receivedMessage)
+{
+ struct SocketMessage* newResponse = (struct SocketMessage*)malloc(sizeof(struct SocketMessage));
+ newResponse->paket.type = IpcMessageTypeCommand;
+ newResponse->index = 0;
+ appendString(newResponse, getMessageName(receivedMessage));
+ return (t_ilm_message)newResponse;
+}
- if (gState.isClient)
+t_ilm_message createErrorResponse(t_ilm_message receivedMessage)
+{
+ struct SocketMessage* newErrorResponse = (struct SocketMessage*)malloc(sizeof(struct SocketMessage));
+ newErrorResponse->paket.type = IpcMessageTypeError;
+ newErrorResponse->index = 0;
+ appendString(newErrorResponse, getMessageName(receivedMessage));
+ return (t_ilm_message)newErrorResponse;
+}
+
+t_ilm_message createNotification(t_ilm_const_string name)
+{
+ struct SocketMessage* newNotification = (struct SocketMessage*)malloc(sizeof(struct SocketMessage));
+ newNotification->paket.type = IpcMessageTypeNotification;
+ newNotification->index = 0;
+ appendString(newNotification, name);
+ return (t_ilm_message)newNotification;
+}
+
+t_ilm_bool destroyMessage(t_ilm_message message)
+{
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ if (msg)
{
- activesocket = gState.socket;
+ free(msg);
}
- else
+ return ILM_TRUE;
+}
+
+t_ilm_bool sendToClients(t_ilm_message message, t_ilm_client_handle* receiverList, int receiverCount)
+{
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ if (gState.isClient)
{
- activesocket = gState.incomingMessage[gState.incomingQueueIndex].sender;
+ return ILM_FALSE;
}
- int sendSize = gState.outgoingMessage.paket.size;
+ t_ilm_bool result = ILM_TRUE;
+ int i = 0;
- do
+ for (i = 0; i < receiverCount; ++i)
{
- retVal += send(activesocket,
- &gState.outgoingMessage.paket + sentBytes,
- sendSize - sentBytes,
- 0);
- sentBytes += retVal;
- } while (retVal > 0 && sentBytes < sendSize);
-
- //printf(" --> TcpIpcModule: %d bytes sent to socket %d\n", sentBytes, activesocket);
-
- return (sentBytes == sendSize) ? ILM_TRUE : ILM_FALSE;
+ int sock = (int)receiverList[i];
+ result &= sendToSocket(msg, sock);
+ }
+ return result;
}
-t_ilm_bool sendError(t_ilm_const_string desc)
+t_ilm_bool sendToService(t_ilm_message message)
{
- gState.outgoingMessage.paket.type = SOCKET_MESSAGE_TYPE_ERROR;
-
- // reset content to error description
- gState.outgoingMessage.index = 0;
- appendString(desc);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ if (!gState.isClient)
+ {
+ return ILM_FALSE;
+ }
- return sendMessage();
+ return sendToSocket(msg, gState.socket);
}
-enum IpcMessageType receiveMessage(t_ilm_int timeoutInMs)
+t_ilm_message receive(t_ilm_int timeoutInMs)
{
- enum IpcMessageType result = IpcMessageTypeNone;
-
- // switch to next receive buffer
- gState.incomingQueueIndex = ++gState.incomingQueueIndex % SOCKET_MESSAGE_BUFFER_COUNT;
-
- struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex];
-
- msg->index = 0;
+ struct SocketMessage* queuedMessage = getFromIncomingQueue();
+ if (queuedMessage)
+ {
+ return queuedMessage;
+ }
fd_set readFds = gState.monitoredSockets;
- struct timeval timeoutValue;
- timeoutValue.tv_sec = timeoutInMs / 1000;
- timeoutValue.tv_usec = (timeoutInMs % 1000) * 1000;
+ int numberOfFdsReady = 0;
- int numberOfFdsReady = select(gState.monitoredSocketMax + 1, &readFds, 0, 0, &timeoutValue);
+ if (timeoutInMs < 0)
+ {
+ numberOfFdsReady = select(gState.monitoredSocketMax + 1, &readFds, 0, 0, NULL);
+ }
+ else
+ {
+ struct timeval timeoutValue;
+ timeoutValue.tv_sec = timeoutInMs / 1000;
+ timeoutValue.tv_usec = (timeoutInMs % 1000) * 1000;
+ numberOfFdsReady = select(gState.monitoredSocketMax + 1, &readFds, 0, 0, &timeoutValue);
+ }
- if (-1 == numberOfFdsReady)
+ if (-1 == numberOfFdsReady)
{
printf("TcpIpcModule: select() failed\n");
}
@@ -127,73 +189,86 @@ enum IpcMessageType receiveMessage(t_ilm_int timeoutInMs)
{
if (FD_ISSET(socketNumber, &readFds))
{
+ struct SocketMessage* msg = (struct SocketMessage*)malloc(sizeof(struct SocketMessage));
+ msg->paket.type = IpcMessageTypeNone;
+ msg->sender = socketNumber;
+ msg->index = 0;
+ addToIncomingQueue(msg);
+
if (!gState.isClient)
{
if (gState.socket == socketNumber)
{
// New client connected
+ msg->paket.type = IpcMessageTypeConnect;
acceptClientConnection();
- result = IpcMessageTypeNone; // no data received
continue;
}
// receive data from socket
- receiveFromSocket(socketNumber);
+ receiveFromSocket(msg, socketNumber);
if(msg->paket.size > 0)
{
// new message from client
- getString(msg->name);
- result = IpcMessageTypeCommand;
+ getString(msg, msg->name);
continue;
}
if(msg->paket.size == 0)
{
// client disconnected
+ msg->paket.type = IpcMessageTypeDisconnect;
close(socketNumber);
FD_CLR(socketNumber, &gState.monitoredSockets);
- result = IpcMessageTypeDisconnect;
continue;
}
// error
+ msg->paket.type = IpcMessageTypeError;
const char* errorMsg = (char*)strerror(errno);
- printf(" --> TcpIpcModule: Error receiving data from socket %d (%s)\n",
- msg->sender, errorMsg);
- result = IpcMessageTypeError;
+ printf("TcpIpcModule: receive error socket %d (%s)\n", msg->sender, errorMsg);
}
else
{
- // receive LayerManager response
- receiveFromSocket(socketNumber);
- getString(msg->name);
- result = IpcMessageTypeCommand;
+ // receive LayerManager response or notification
+ receiveFromSocket(msg, socketNumber);
+ getString(msg, msg->name);
}
}
}
}
- return result;
+ return getFromIncomingQueue();
}
-t_ilm_const_string getMessageName()
+t_ilm_const_string getMessageName(t_ilm_message message)
{
- return gState.incomingMessage[gState.incomingQueueIndex].name;
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return msg ? msg->name : NULL;
}
-t_ilm_bool isErrorMessage()
+t_ilm_message_type getMessageType(t_ilm_message message)
{
- return (SOCKET_MESSAGE_TYPE_ERROR == gState.incomingMessage[gState.incomingQueueIndex].paket.type);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return msg ? msg->paket.type : IpcMessageTypeNone;
}
-t_ilm_const_string getSenderName()
+t_ilm_const_string getSenderName(t_ilm_message message)
{
- char buffer[16];
- sprintf(buffer, "socket %d", gState.incomingMessage[gState.incomingQueueIndex].sender);
- return strdup(buffer);
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ char name[] = "socket XXXXXXXXXXXXXXXXXX";
+ sprintf(name, "socket %d", (msg ? msg->sender : -1));
+ return strdup(name);
}
+t_ilm_client_handle getSenderHandle(t_ilm_message message)
+{
+ struct SocketMessage* msg = (struct SocketMessage*)message;
+ return msg ? (t_ilm_client_handle)msg->sender : (t_ilm_client_handle)0;
+}
+
+
//=============================================================================
//private
//=============================================================================
@@ -216,12 +291,32 @@ t_ilm_bool acceptClientConnection()
return result;
}
-void receiveFromSocket(int socketNumber)
+t_ilm_bool sendToSocket(struct SocketMessage* msg, int socketNumber)
{
- int receivedBytes = 0;
+ int sentBytes = 0;
int retVal = 0;
- struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex];
+ int headerSize = sizeof(msg->paket) - sizeof(msg->paket.data);
+ msg->paket.size = msg->index + headerSize;
+
+ int sendSize = msg->paket.size;
+
+ do
+ {
+ retVal += send(socketNumber,
+ &msg->paket + sentBytes,
+ sendSize - sentBytes,
+ 0);
+ sentBytes += retVal;
+ } while (retVal > 0 && sentBytes < sendSize);
+
+ return (sentBytes == sendSize) ? ILM_TRUE : ILM_FALSE;
+}
+
+void receiveFromSocket(struct SocketMessage* msg, int socketNumber)
+{
+ int receivedBytes = 0;
+ int retVal = 0;
msg->sender = socketNumber;
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp b/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp
index 2de0bf0..308b3ec 100644
--- a/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp
+++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp
@@ -56,7 +56,7 @@ protected:
TEST_F(Loopback, DISABLE_lifecycle)
{
- ASSERT_TRUE(mService.init(ILM_FALSE));
+ ASSERT_TRUE(mService.initClientMode());
//ASSERT_TRUE(mClient.init(ILM_TRUE));
//ASSERT_TRUE(mClient.destroy());