summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitriy Klimenko <dklimenko@luxoft.com>2014-09-11 04:23:19 -0700
committerDmitriy Klimenko <dklimenko@luxoft.com>2014-09-11 04:23:48 -0700
commit542f4a9f7c5ae8334c75f48e5bf936028e80531a (patch)
tree9a5299008feb3daedcf3715ebe04f902ec8d11b7
parenta02de081a1e36965b5d201b59c85a47384859039 (diff)
downloadsdl_core-542f4a9f7c5ae8334c75f48e5bf936028e80531a.tar.gz
message broker MQClient Shared memmory integration
-rw-r--r--customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp20
-rw-r--r--customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp87
-rw-r--r--src/3rd_party-static/MessageBroker/include/mb_controller.hpp3
-rw-r--r--src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp44
4 files changed, 111 insertions, 43 deletions
diff --git a/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp b/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp
index 7b4b7f363b..7ea42ae40c 100644
--- a/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp
+++ b/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp
@@ -14,10 +14,10 @@
#include "mb_client.hpp"
-#define MAX_QUEUE_NAME_SIZE 24
-#define MAX_QUEUE_MSG_SIZE 4095
-#define MSGQ_MAX_MESSAGES 128
-#define MSGQ_MESSAGE_SIZE MAX_QUEUE_MSG_SIZE
+#define MAX_QUEUE_NAME_SIZE 24
+#define MAX_QUEUE_MSG_SIZE 4095
+#define MSGQ_MAX_MESSAGES 128
+#define MSGQ_MESSAGE_SIZE MAX_QUEUE_MSG_SIZE
/**
* \namespace NsMessageBroker
@@ -25,6 +25,12 @@
*/
namespace NsMessageBroker
{
+#ifdef CUSTOMER_PASA
+ typedef struct {
+ int size;
+ char text[30000];
+ } shmem_t;
+#endif
/**
* \class Client
@@ -68,6 +74,12 @@ namespace NsMessageBroker
*/
MqClient();
+#ifdef CUSTOMER_PASA
+ int fd;
+ shmem_t *ptr;
+ int fd2;
+ shmem_t *ptr2;
+#endif
private:
diff --git a/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp b/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp
index edb093ba28..ad122ac29f 100644
--- a/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp
+++ b/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp
@@ -45,49 +45,70 @@ namespace NsMessageBroker
bool MqClient::MqOpen()
{
- struct mq_attr attributes;
- attributes.mq_maxmsg = MSGQ_MAX_MESSAGES;
- attributes.mq_msgsize = MAX_QUEUE_MSG_SIZE;
- attributes.mq_flags = 0;
+ struct mq_attr attributes;
+ attributes.mq_maxmsg = MSGQ_MAX_MESSAGES;
+ attributes.mq_msgsize = MAX_QUEUE_MSG_SIZE;
+ attributes.mq_flags = 0;
- m_sndHandle = ::mq_open(m_send.c_str(), O_WRONLY|O_CREAT|O_NONBLOCK, 0666, &attributes);
- m_rcvHandle = ::mq_open(m_recv.c_str(), O_RDONLY|O_CREAT, 0666, &attributes);
+ m_sndHandle = ::mq_open(m_send.c_str(), O_WRONLY|O_CREAT|O_NONBLOCK, 0666, &attributes);
+ m_rcvHandle = ::mq_open(m_recv.c_str(), O_RDONLY|O_CREAT, 0666, &attributes);
- return ((m_sndHandle != -1) && (m_rcvHandle != -1)) ? true : false;
+ return ((m_sndHandle != -1) && (m_rcvHandle != -1)) ? true : false;
}
-
ssize_t MqClient::Send(const std::string& data)
{
-
- std::string rep = data;
- const char* ptrBuffer = rep.c_str();
-
- if(rep.empty())
- {
- return -1;
- }
-
- int retVal = ::mq_send(m_sndHandle, ptrBuffer, rep.length(), 0);
-
- return (retVal == -1) ? retVal : rep.length();
-
+ std::string rep = data;
+ int retVal = 0;
+ const char* ptrBuffer = rep.c_str();
+ std::string shm_data="SHARED_MEMORY";
+
+ if (rep.empty())
+ {
+ return -1;
+ }
+
+ if (rep.length() > MAX_QUEUE_MSG_SIZE)
+ {
+ int len = rep.length();
+ ptr2->size = len;
+ memset(ptr2->text, 0, sizeof(ptr2->size));
+ memcpy(ptr2->text, rep.c_str(), len); /* write to the shared memory */
+
+ int retVal = ::mq_send(m_sndHandle, &shm_data[0], shm_data.length(), 0);
+ DBG_MSG(("Sending length %d", len));
+ return (retVal == -1) ? retVal :(shm_data.length());
+ }
+ else
+ {
+ int retVal = ::mq_send(m_sndHandle, ptrBuffer, rep.length(), 0);
+ DBG_MSG(("MqClient::Send:Buffer %s", ptrBuffer));
+ return (retVal == -1) ? retVal : rep.length();
+ }
}
-
ssize_t MqClient::Recv(std::string& data)
{
- char buf[MAX_QUEUE_MSG_SIZE];
- ssize_t length=0;
-
- if ((length = ::mq_receive(m_rcvHandle, buf, sizeof(buf), 0)) == -1)
- {
- return -1;
- }
-
- data = std::string(&buf[1], length);
-
- return length;
+ char buf[MAX_QUEUE_MSG_SIZE] = {'\0'};
+ ssize_t length=0;
+
+ if ((length = ::mq_receive(m_rcvHandle, buf, sizeof(buf), 0)) == -1)
+ {
+ return -1;
+ }
+
+ if (strcmp(buf+1,"SHARED_MEMORY") != 0)
+ {
+ data = std::string(&buf[1], length);
+ DBG_MSG(("MqClient::Received Data at mq_client %s",data.c_str()));
+ return length;
+ } else {
+ DBG_MSG(("Shared Memory::Received Length %d",ptr->size));
+ data = std::string(ptr->text ,ptr->size);
+ length = ptr->size;
+
+ return length;
+ }
}
} /* namespace NsMessageBroker */
diff --git a/src/3rd_party-static/MessageBroker/include/mb_controller.hpp b/src/3rd_party-static/MessageBroker/include/mb_controller.hpp
index 13be9f3348..705c53d0d0 100644
--- a/src/3rd_party-static/MessageBroker/include/mb_controller.hpp
+++ b/src/3rd_party-static/MessageBroker/include/mb_controller.hpp
@@ -14,7 +14,8 @@
#include "mb_tcpclient.hpp"
#ifdef CUSTOMER_PASA
#include "mb_mqclient.hpp"
-
+#include <sys/mman.h>
+#include <sys/stat.h>
#endif
#include "utils/lock.h"
diff --git a/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp b/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp
index 2d7ebb36af..7326ff7cc3 100644
--- a/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp
+++ b/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp
@@ -34,8 +34,27 @@ namespace NsMessageBroker
mControllersIdStart(-1),
mControllersIdCurrent(0)
{
- mControllersName = name;
- mClientType = MQUEUE;
+ mControllersName = name;
+ mClientType = MQUEUE;
+
+ /* Create the shared memory object */
+ fd = shm_open("/SHNAME_SDLQUEUE", O_RDWR | O_CREAT , 0777);
+ //DBG_MSG(("MqClient::Send:A"));
+ ftruncate(fd, sizeof(shmem_t));
+ /* Set the size of the shared memory object */
+
+ // DBG_MSG(("MqClient::Send::B"));
+ /* Get a pointer to the shared memory, map it into
+ * our address space */
+ ptr = ( shmem_t *)mmap(0, sizeof(shmem_t), PROT_READ, MAP_SHARED, fd, 0);
+ // DBG_MSG(("MqClient::Send:B"));
+
+ fd2 = shm_open("/SHNAME_SDLQUEUE2", O_RDWR | O_CREAT, 0777);
+ DBG_MSG(("A\n"));
+ ftruncate(fd2, sizeof(shmem_t));
+ /* Get a pointer to the shared memory object */
+ ptr2 =(shmem_t *) mmap(0, sizeof(shmem_t),
+ PROT_WRITE, MAP_SHARED, fd2, 0);
}
#endif
@@ -46,6 +65,14 @@ namespace NsMessageBroker
CMessageBrokerController::~CMessageBrokerController()
{
+#ifdef CUSTOMER_PASA
+ ftruncate(fd, 0);
+ munmap(ptr, sizeof(shmem_t));
+ close(fd);
+ ftruncate(fd2, 0);
+ munmap(ptr2, sizeof(shmem_t));
+ close(fd2);
+#endif
}
#ifdef CUSTOMER_PASA
@@ -62,11 +89,11 @@ namespace NsMessageBroker
{
switch(mClientType)
{
- case TCP: return TcpClient::Send(data);
- case MQUEUE: return MqClient::Send(data);
+ case TCP: return TcpClient::Send(data);
+ case MQUEUE: return MqClient::Send(data);
+ default : return -1;
}
}
-
#endif
ssize_t CMessageBrokerController::Recv(std::string& data)
{
@@ -81,6 +108,13 @@ namespace NsMessageBroker
while (!stop)
{
Json::Value root;
+#ifdef CUSTOMER_PASA
+ if (m_receivingBuffer.length() > sizeof(shmem_t))
+ {
+ m_receivingBuffer.clear();
+ return 0;
+ }
+#endif
if (!m_reader.parse(m_receivingBuffer, root))
{
DBG_MSG(("Received not JSON string! %s\n", m_receivingBuffer.c_str()));