diff options
author | Timo Lotterbach <timo.lotterbach@bmw-carit.de> | 2012-09-10 06:40:22 -0700 |
---|---|---|
committer | Timo Lotterbach <timo.lotterbach@bmw-carit.de> | 2012-10-02 05:08:55 -0700 |
commit | 7c6324e390394ec7955e15eb1d134e47e2ea2c27 (patch) | |
tree | b819ba242e45b961b7ceae37780ca512e19d5370 | |
parent | 6be7fba3b1bf2cce8b17c1e6eda915523b53b612 (diff) | |
download | layer_management-7c6324e390394ec7955e15eb1d134e47e2ea2c27.tar.gz |
TcpIpcModule: updated to new IpcModule API, added notification support
Added support for creating and sending notifications.
Improved handling of incoming message data structure.
removed destroyMessage:
was only used for DBusIpcModule, but lifecycle of received messages
is now handled internally. The external call is not required any more.
7 files changed, 333 insertions, 144 deletions
diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h index f2000d9..459a996 100644 --- a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h +++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketConfiguration.h @@ -28,17 +28,12 @@ #define SOCKET_MAX_MESSAGE_SIZE 1024 #define SOCKET_MAX_PENDING_CONNECTIONS 10 -#define SOCKET_MESSAGE_BUFFER_COUNT 2 - -#define SOCKET_MESSAGE_TYPE_NORMAL 'n' -#define SOCKET_MESSAGE_TYPE_ERROR 'e' - -#define SOCKET_MESSAGE_TYPE_INT 'i' -#define SOCKET_MESSAGE_TYPE_UINT 'u' -#define SOCKET_MESSAGE_TYPE_BOOL 'b' -#define SOCKET_MESSAGE_TYPE_DOUBLE 'd' -#define SOCKET_MESSAGE_TYPE_STRING 's' -#define SOCKET_MESSAGE_TYPE_ARRAY 'a' +#define SOCKET_MESSAGE_TYPE_INT 'i' +#define SOCKET_MESSAGE_TYPE_UINT 'u' +#define SOCKET_MESSAGE_TYPE_BOOL 'b' +#define SOCKET_MESSAGE_TYPE_DOUBLE 'd' +#define SOCKET_MESSAGE_TYPE_STRING 's' +#define SOCKET_MESSAGE_TYPE_ARRAY 'a' #define ENV_TCP_HOST "LM_TCP_HOST" #define ENV_TCP_PORT "LM_TCP_PORT" diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h index cea8c1e..8b3b094 100644 --- a/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h +++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/include/socketShared.h @@ -56,9 +56,6 @@ struct State struct sockaddr_in clientAddrIn; fd_set monitoredSockets; int monitoredSocketMax; - struct SocketMessage outgoingMessage; - struct SocketMessage incomingMessage[SOCKET_MESSAGE_BUFFER_COUNT]; - t_ilm_int incomingQueueIndex; }; diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c index a554436..47f4fa7 100644 --- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c +++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/append.c @@ -23,10 +23,8 @@ // append simple data types //----------------------------------------------------------------------------- -t_ilm_bool appendGenericValue(const char protocolType, const char size, const void* value) +t_ilm_bool appendGenericValue(struct SocketMessage* msg, const char protocolType, const char size, const void* value) { - struct SocketMessage* msg = &gState.outgoingMessage; - // size check: is message size reached if (sizeof(msg->paket) - sizeof(msg->paket.data) // header + msg->index + size // + data @@ -51,41 +49,44 @@ t_ilm_bool appendGenericValue(const char protocolType, const char size, const vo return ILM_TRUE; } -t_ilm_bool appendUint(const t_ilm_uint value) +t_ilm_bool appendUint(t_ilm_message message, const t_ilm_uint value) { - return appendGenericValue(SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), &value); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), &value); } -t_ilm_bool appendInt(const t_ilm_int value) +t_ilm_bool appendInt(t_ilm_message message, const t_ilm_int value) { - return appendGenericValue(SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), &value); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), &value); } -t_ilm_bool appendBool(const t_ilm_bool value) +t_ilm_bool appendBool(t_ilm_message message, const t_ilm_bool value) { - return appendGenericValue(SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), &value); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), &value); } -t_ilm_bool appendDouble(const t_ilm_float value) +t_ilm_bool appendDouble(t_ilm_message message, const t_ilm_float value) { - return appendGenericValue(SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), &value); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), &value); } -t_ilm_bool appendString(t_ilm_const_string value) +t_ilm_bool appendString(t_ilm_message message, t_ilm_const_string value) { - return appendGenericValue(SOCKET_MESSAGE_TYPE_STRING, strlen(value), value); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericValue(msg, SOCKET_MESSAGE_TYPE_STRING, strlen(value), value); } //----------------------------------------------------------------------------- // append array data types //----------------------------------------------------------------------------- -t_ilm_bool appendGenericArray(const char arraySize, const char protocolType, const char size, const void* value) +t_ilm_bool appendGenericArray(struct SocketMessage* msg, const char arraySize, const char protocolType, const char size, const void* value) { t_ilm_bool result = ILM_TRUE; - struct SocketMessage* msg = &gState.outgoingMessage; - // TODO: size check: is message size reached? // append array type @@ -100,30 +101,34 @@ t_ilm_bool appendGenericArray(const char arraySize, const char protocolType, con char i = 0; for (i = 0; i < arraySize; ++i) { - result &= appendGenericValue(protocolType, size, value + i * size); + result &= appendGenericValue(msg, protocolType, size, value + i * size); } return result; } -t_ilm_bool appendUintArray(const t_ilm_uint* valueArray, t_ilm_int arraySize) +t_ilm_bool appendUintArray(t_ilm_message message, const t_ilm_uint* valueArray, t_ilm_int arraySize) { - return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), valueArray); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint), valueArray); } -t_ilm_bool appendIntArray(const t_ilm_int* valueArray, t_ilm_int arraySize) +t_ilm_bool appendIntArray(t_ilm_message message, const t_ilm_int* valueArray, t_ilm_int arraySize) { - return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), valueArray); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int), valueArray); } -t_ilm_bool appendBoolArray(const t_ilm_bool* valueArray, t_ilm_int arraySize) +t_ilm_bool appendBoolArray(t_ilm_message message, const t_ilm_bool* valueArray, t_ilm_int arraySize) { - return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), valueArray); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool), valueArray); } -t_ilm_bool appendDoubleArray(const t_ilm_float* valueArray, t_ilm_int arraySize) +t_ilm_bool appendDoubleArray(t_ilm_message message, const t_ilm_float* valueArray, t_ilm_int arraySize) { - return appendGenericArray(arraySize, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), valueArray); + struct SocketMessage* msg = (struct SocketMessage*)message; + return appendGenericArray(msg, arraySize, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float), valueArray); } // TODO appendStringArray() diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c index 1917c10..6cfa56a 100644 --- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c +++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/get.c @@ -25,12 +25,10 @@ // get simple data types //----------------------------------------------------------------------------- -t_ilm_bool getGenericValue(void* value, const char protocolType, const char expectedSize) +t_ilm_bool getGenericValue(struct SocketMessage* msg, void* value, const char protocolType, const char expectedSize) { t_ilm_bool result = ILM_FALSE; - struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex]; - // get protocol value from message char readType = msg->paket.data[msg->index]; msg->index += sizeof(readType); @@ -75,41 +73,44 @@ t_ilm_bool getGenericValue(void* value, const char protocolType, const char expe return ILM_TRUE; } -t_ilm_bool getUint(t_ilm_uint* value) +t_ilm_bool getUint(t_ilm_message message, t_ilm_uint* value) { - return getGenericValue(value, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint)); + struct SocketMessage* msg = (struct SocketMessage*)message; + return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint)); } -t_ilm_bool getInt(t_ilm_int* value) +t_ilm_bool getInt(t_ilm_message message, t_ilm_int* value) { - return getGenericValue(value, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int)); + struct SocketMessage* msg = (struct SocketMessage*)message; + return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int)); } -t_ilm_bool getBool(t_ilm_bool* value) +t_ilm_bool getBool(t_ilm_message message, t_ilm_bool* value) { - return getGenericValue(value, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool)); + struct SocketMessage* msg = (struct SocketMessage*)message; + return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_BOOL, sizeof(t_ilm_bool)); } -t_ilm_bool getDouble(t_ilm_float* value) +t_ilm_bool getDouble(t_ilm_message message, t_ilm_float* value) { - return getGenericValue(value, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float)); + struct SocketMessage* msg = (struct SocketMessage*)message; + return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_DOUBLE, sizeof(t_ilm_float)); } -t_ilm_bool getString(char* value) +t_ilm_bool getString(t_ilm_message message, char* value) { - return getGenericValue(value, SOCKET_MESSAGE_TYPE_STRING, sizeof(t_ilm_const_string)); + struct SocketMessage* msg = (struct SocketMessage*)message; + return getGenericValue(msg, value, SOCKET_MESSAGE_TYPE_STRING, sizeof(t_ilm_const_string)); } //----------------------------------------------------------------------------- // get array data types //----------------------------------------------------------------------------- -t_ilm_bool getGenericArray(t_ilm_int* arraySize, void** value, const char protocolType, const char expectedSize) +t_ilm_bool getGenericArray(struct SocketMessage* msg, t_ilm_int* arraySize, void** value, const char protocolType, const char expectedSize) { t_ilm_bool result = ILM_TRUE; - struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex]; - // get protocol value from message char readType = msg->paket.data[msg->index]; msg->index += sizeof(readType); @@ -134,19 +135,21 @@ t_ilm_bool getGenericArray(t_ilm_int* arraySize, void** value, const char protoc char i = 0; for (i = 0; i < *arraySize; ++i) { - result &= getGenericValue(((*value) + expectedSize * i), protocolType, expectedSize); + result &= getGenericValue(msg, ((*value) + expectedSize * i), protocolType, expectedSize); } return result; } -t_ilm_bool getIntArray(t_ilm_int** valueArray, t_ilm_int* arraySize) +t_ilm_bool getIntArray(t_ilm_message message, t_ilm_int** valueArray, t_ilm_int* arraySize) { - return getGenericArray(arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int)); + struct SocketMessage* msg = (struct SocketMessage*)message; + return getGenericArray(msg, arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_INT, sizeof(t_ilm_int)); } -t_ilm_bool getUintArray(t_ilm_uint** valueArray, t_ilm_int* arraySize) +t_ilm_bool getUintArray(t_ilm_message message, t_ilm_uint** valueArray, t_ilm_int* arraySize) { - return getGenericArray(arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint)); + struct SocketMessage* msg = (struct SocketMessage*)message; + return getGenericArray(msg, arraySize, (void**)valueArray, SOCKET_MESSAGE_TYPE_UINT, sizeof(t_ilm_uint)); } diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c index e7c3f96..eb531c2 100644 --- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c +++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/initialization.c @@ -24,14 +24,108 @@ #include <signal.h> -t_ilm_bool init(t_ilm_bool isClient) +t_ilm_bool initServiceMode() { // ignore broken pipe, if clients disconnect, handled in receive() signal(SIGPIPE, SIG_IGN); + t_ilm_bool isClient = ILM_FALSE; + t_ilm_bool result = ILM_TRUE; - gState.incomingQueueIndex = SOCKET_MESSAGE_BUFFER_COUNT - 1; + gState.isClient = isClient; + + gState.socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + + if (gState.socket < 0) + { + printf("TcpIpcModule: socket()...failed\n"); + result = ILM_FALSE; + } + + const char* portString = getenv(ENV_TCP_PORT); + int port = SOCKET_TCP_PORT; + if (portString) + { + port = atoi(portString); + } + + gState.serverAddrIn.sin_family = AF_INET; + gState.serverAddrIn.sin_port = htons(port); + memset(&(gState.serverAddrIn.sin_zero), '\0', 8); + + if (gState.isClient) // Client + { + const char* hostname = getenv(ENV_TCP_HOST); + if (!hostname) + { + hostname = SOCKET_TCP_HOST; + } + + struct hostent* server; + server = gethostbyname(hostname); + if (!server) + { + printf("TcpIpcModule: error: could not resolve host '%s'.\n", hostname); + result = ILM_FALSE; + } + else + { + memcpy(&gState.serverAddrIn.sin_addr.s_addr, server->h_addr, server->h_length); + } + + if (0 != connect(gState.socket, + (struct sockaddr *) &gState.serverAddrIn, + sizeof(gState.serverAddrIn))) + { + result = ILM_FALSE; + } + + printf("TcpIpcModule: connection to %s:%d %s.\n", + hostname, port, + (ILM_TRUE == result) ? "established" : "failed"); + + FD_SET(gState.socket, &gState.monitoredSockets); + gState.monitoredSocketMax = gState.socket; + } + else // LayerManagerService + { + int on = 1; + setsockopt(gState.socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + + gState.serverAddrIn.sin_addr.s_addr = htonl(INADDR_ANY); + + if (0 > bind(gState.socket, + (struct sockaddr *) &gState.serverAddrIn, + sizeof(gState.serverAddrIn))) + { + printf("TcpIpcModule: bind()...failed\n"); + result = ILM_FALSE; + } + + if (listen(gState.socket, SOCKET_MAX_PENDING_CONNECTIONS) < 0) + { + printf("TcpIpcModule: listen()...failed\n"); + result = ILM_FALSE; + } + + FD_SET(gState.socket, &gState.monitoredSockets); + gState.monitoredSocketMax = gState.socket; + + printf("TcpIpcModule: listening to TCP port: %d\n", port); + } + + return result; +} + +t_ilm_bool initClientMode() +{ + // ignore broken pipe, if clients disconnect, handled in receive() + signal(SIGPIPE, SIG_IGN); + + t_ilm_bool isClient = ILM_TRUE; + + t_ilm_bool result = ILM_TRUE; gState.isClient = isClient; diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c index ffdc86d..ffa26fa 100644 --- a/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c +++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/src/message.c @@ -21,102 +21,164 @@ #include <errno.h> #include <stdio.h> #include <sys/select.h> +#include <stdlib.h> //============================================================================= // prototypes //============================================================================= t_ilm_bool acceptClientConnection(); -void receiveFromSocket(int socketNumber); - +t_ilm_bool sendToSocket(struct SocketMessage* msg, int socketNumber); +void receiveFromSocket(struct SocketMessage* msg, int socketNumber); //============================================================================= -// message handling +// incoming queue handling (one select may return more than one active +// descriptor, but receive must only return one message at a time. +// all messages are first received and added to this queue, so no +// messages get lost //============================================================================= -t_ilm_bool createMessage(t_ilm_const_string name) +struct QueueElement { - memset(&gState.outgoingMessage, 0, sizeof(gState.outgoingMessage)); + struct SocketMessage* data; + struct QueueElement* next; +}; - gState.outgoingMessage.paket.type = SOCKET_MESSAGE_TYPE_NORMAL; - gState.outgoingMessage.index = 0; +static struct QueueElement* oldest = NULL; +static struct QueueElement* latest = NULL; - return appendString(name); +void addToIncomingQueue(struct SocketMessage* data) +{ + struct QueueElement* newMessage = (struct QueueElement*)malloc(sizeof(struct QueueElement)); + newMessage->data = data; + newMessage->next = NULL; + if (!oldest) + { + oldest = latest = newMessage; + } + else + { + latest->next = newMessage; + latest = latest->next; + } } -t_ilm_bool destroyMessage() +struct SocketMessage* getFromIncomingQueue() { - t_ilm_bool returnValue = ILM_TRUE; - - /* to be implemented if needed */ - - return returnValue; + struct SocketMessage* data = NULL; + if (oldest) + { + data = oldest->data; + struct QueueElement* delPtr = oldest; + oldest = oldest->next; + free(delPtr); + } + return data; } -t_ilm_bool sendMessage() +//============================================================================= +// message handling +//============================================================================= +t_ilm_message createMessage(t_ilm_const_string name) { - int activesocket = 0; - int sentBytes = 0; - int retVal = 0; + struct SocketMessage* newMessage = (struct SocketMessage*)malloc(sizeof(struct SocketMessage)); + newMessage->paket.type = IpcMessageTypeCommand; + newMessage->index = 0; + appendString(newMessage, name); + return (t_ilm_message)newMessage; +} - int headerSize = sizeof(gState.outgoingMessage.paket) - sizeof(gState.outgoingMessage.paket.data); - gState.outgoingMessage.paket.size = gState.outgoingMessage.index + headerSize; +t_ilm_message createResponse(t_ilm_message receivedMessage) +{ + struct SocketMessage* newResponse = (struct SocketMessage*)malloc(sizeof(struct SocketMessage)); + newResponse->paket.type = IpcMessageTypeCommand; + newResponse->index = 0; + appendString(newResponse, getMessageName(receivedMessage)); + return (t_ilm_message)newResponse; +} - if (gState.isClient) +t_ilm_message createErrorResponse(t_ilm_message receivedMessage) +{ + struct SocketMessage* newErrorResponse = (struct SocketMessage*)malloc(sizeof(struct SocketMessage)); + newErrorResponse->paket.type = IpcMessageTypeError; + newErrorResponse->index = 0; + appendString(newErrorResponse, getMessageName(receivedMessage)); + return (t_ilm_message)newErrorResponse; +} + +t_ilm_message createNotification(t_ilm_const_string name) +{ + struct SocketMessage* newNotification = (struct SocketMessage*)malloc(sizeof(struct SocketMessage)); + newNotification->paket.type = IpcMessageTypeNotification; + newNotification->index = 0; + appendString(newNotification, name); + return (t_ilm_message)newNotification; +} + +t_ilm_bool destroyMessage(t_ilm_message message) +{ + struct SocketMessage* msg = (struct SocketMessage*)message; + if (msg) { - activesocket = gState.socket; + free(msg); } - else + return ILM_TRUE; +} + +t_ilm_bool sendToClients(t_ilm_message message, t_ilm_client_handle* receiverList, int receiverCount) +{ + struct SocketMessage* msg = (struct SocketMessage*)message; + if (gState.isClient) { - activesocket = gState.incomingMessage[gState.incomingQueueIndex].sender; + return ILM_FALSE; } - int sendSize = gState.outgoingMessage.paket.size; + t_ilm_bool result = ILM_TRUE; + int i = 0; - do + for (i = 0; i < receiverCount; ++i) { - retVal += send(activesocket, - &gState.outgoingMessage.paket + sentBytes, - sendSize - sentBytes, - 0); - sentBytes += retVal; - } while (retVal > 0 && sentBytes < sendSize); - - //printf(" --> TcpIpcModule: %d bytes sent to socket %d\n", sentBytes, activesocket); - - return (sentBytes == sendSize) ? ILM_TRUE : ILM_FALSE; + int sock = (int)receiverList[i]; + result &= sendToSocket(msg, sock); + } + return result; } -t_ilm_bool sendError(t_ilm_const_string desc) +t_ilm_bool sendToService(t_ilm_message message) { - gState.outgoingMessage.paket.type = SOCKET_MESSAGE_TYPE_ERROR; - - // reset content to error description - gState.outgoingMessage.index = 0; - appendString(desc); + struct SocketMessage* msg = (struct SocketMessage*)message; + if (!gState.isClient) + { + return ILM_FALSE; + } - return sendMessage(); + return sendToSocket(msg, gState.socket); } -enum IpcMessageType receiveMessage(t_ilm_int timeoutInMs) +t_ilm_message receive(t_ilm_int timeoutInMs) { - enum IpcMessageType result = IpcMessageTypeNone; - - // switch to next receive buffer - gState.incomingQueueIndex = ++gState.incomingQueueIndex % SOCKET_MESSAGE_BUFFER_COUNT; - - struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex]; - - msg->index = 0; + struct SocketMessage* queuedMessage = getFromIncomingQueue(); + if (queuedMessage) + { + return queuedMessage; + } fd_set readFds = gState.monitoredSockets; - struct timeval timeoutValue; - timeoutValue.tv_sec = timeoutInMs / 1000; - timeoutValue.tv_usec = (timeoutInMs % 1000) * 1000; + int numberOfFdsReady = 0; - int numberOfFdsReady = select(gState.monitoredSocketMax + 1, &readFds, 0, 0, &timeoutValue); + if (timeoutInMs < 0) + { + numberOfFdsReady = select(gState.monitoredSocketMax + 1, &readFds, 0, 0, NULL); + } + else + { + struct timeval timeoutValue; + timeoutValue.tv_sec = timeoutInMs / 1000; + timeoutValue.tv_usec = (timeoutInMs % 1000) * 1000; + numberOfFdsReady = select(gState.monitoredSocketMax + 1, &readFds, 0, 0, &timeoutValue); + } - if (-1 == numberOfFdsReady) + if (-1 == numberOfFdsReady) { printf("TcpIpcModule: select() failed\n"); } @@ -127,73 +189,86 @@ enum IpcMessageType receiveMessage(t_ilm_int timeoutInMs) { if (FD_ISSET(socketNumber, &readFds)) { + struct SocketMessage* msg = (struct SocketMessage*)malloc(sizeof(struct SocketMessage)); + msg->paket.type = IpcMessageTypeNone; + msg->sender = socketNumber; + msg->index = 0; + addToIncomingQueue(msg); + if (!gState.isClient) { if (gState.socket == socketNumber) { // New client connected + msg->paket.type = IpcMessageTypeConnect; acceptClientConnection(); - result = IpcMessageTypeNone; // no data received continue; } // receive data from socket - receiveFromSocket(socketNumber); + receiveFromSocket(msg, socketNumber); if(msg->paket.size > 0) { // new message from client - getString(msg->name); - result = IpcMessageTypeCommand; + getString(msg, msg->name); continue; } if(msg->paket.size == 0) { // client disconnected + msg->paket.type = IpcMessageTypeDisconnect; close(socketNumber); FD_CLR(socketNumber, &gState.monitoredSockets); - result = IpcMessageTypeDisconnect; continue; } // error + msg->paket.type = IpcMessageTypeError; const char* errorMsg = (char*)strerror(errno); - printf(" --> TcpIpcModule: Error receiving data from socket %d (%s)\n", - msg->sender, errorMsg); - result = IpcMessageTypeError; + printf("TcpIpcModule: receive error socket %d (%s)\n", msg->sender, errorMsg); } else { - // receive LayerManager response - receiveFromSocket(socketNumber); - getString(msg->name); - result = IpcMessageTypeCommand; + // receive LayerManager response or notification + receiveFromSocket(msg, socketNumber); + getString(msg, msg->name); } } } } - return result; + return getFromIncomingQueue(); } -t_ilm_const_string getMessageName() +t_ilm_const_string getMessageName(t_ilm_message message) { - return gState.incomingMessage[gState.incomingQueueIndex].name; + struct SocketMessage* msg = (struct SocketMessage*)message; + return msg ? msg->name : NULL; } -t_ilm_bool isErrorMessage() +t_ilm_message_type getMessageType(t_ilm_message message) { - return (SOCKET_MESSAGE_TYPE_ERROR == gState.incomingMessage[gState.incomingQueueIndex].paket.type); + struct SocketMessage* msg = (struct SocketMessage*)message; + return msg ? msg->paket.type : IpcMessageTypeNone; } -t_ilm_const_string getSenderName() +t_ilm_const_string getSenderName(t_ilm_message message) { - char buffer[16]; - sprintf(buffer, "socket %d", gState.incomingMessage[gState.incomingQueueIndex].sender); - return strdup(buffer); + struct SocketMessage* msg = (struct SocketMessage*)message; + char name[] = "socket XXXXXXXXXXXXXXXXXX"; + sprintf(name, "socket %d", (msg ? msg->sender : -1)); + return strdup(name); } +t_ilm_client_handle getSenderHandle(t_ilm_message message) +{ + struct SocketMessage* msg = (struct SocketMessage*)message; + return msg ? (t_ilm_client_handle)msg->sender : (t_ilm_client_handle)0; +} + + //============================================================================= //private //============================================================================= @@ -216,12 +291,32 @@ t_ilm_bool acceptClientConnection() return result; } -void receiveFromSocket(int socketNumber) +t_ilm_bool sendToSocket(struct SocketMessage* msg, int socketNumber) { - int receivedBytes = 0; + int sentBytes = 0; int retVal = 0; - struct SocketMessage* msg = &gState.incomingMessage[gState.incomingQueueIndex]; + int headerSize = sizeof(msg->paket) - sizeof(msg->paket.data); + msg->paket.size = msg->index + headerSize; + + int sendSize = msg->paket.size; + + do + { + retVal += send(socketNumber, + &msg->paket + sentBytes, + sendSize - sentBytes, + 0); + sentBytes += retVal; + } while (retVal > 0 && sentBytes < sendSize); + + return (sentBytes == sendSize) ? ILM_TRUE : ILM_FALSE; +} + +void receiveFromSocket(struct SocketMessage* msg, int socketNumber) +{ + int receivedBytes = 0; + int retVal = 0; msg->sender = socketNumber; diff --git a/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp b/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp index 2de0bf0..308b3ec 100644 --- a/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp +++ b/LayerManagerPlugins/IpcModules/TcpIpcModule/tests/LoopbackTest.cpp @@ -56,7 +56,7 @@ protected: TEST_F(Loopback, DISABLE_lifecycle) { - ASSERT_TRUE(mService.init(ILM_FALSE)); + ASSERT_TRUE(mService.initClientMode()); //ASSERT_TRUE(mClient.init(ILM_TRUE)); //ASSERT_TRUE(mClient.destroy()); |