diff options
author | Dmitriy Klimenko <dklimenko@luxoft.com> | 2014-09-11 04:23:19 -0700 |
---|---|---|
committer | Dmitriy Klimenko <dklimenko@luxoft.com> | 2014-09-11 04:23:48 -0700 |
commit | 542f4a9f7c5ae8334c75f48e5bf936028e80531a (patch) | |
tree | 9a5299008feb3daedcf3715ebe04f902ec8d11b7 | |
parent | a02de081a1e36965b5d201b59c85a47384859039 (diff) | |
download | sdl_core-542f4a9f7c5ae8334c75f48e5bf936028e80531a.tar.gz |
message broker MQClient Shared memmory integration
4 files changed, 111 insertions, 43 deletions
diff --git a/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp b/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp index 7b4b7f363b..7ea42ae40c 100644 --- a/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp +++ b/customer-specific/pasa/src/3rd_party-static/MessageBroker/include/mb_mqclient.hpp @@ -14,10 +14,10 @@ #include "mb_client.hpp"
-#define MAX_QUEUE_NAME_SIZE 24
-#define MAX_QUEUE_MSG_SIZE 4095
-#define MSGQ_MAX_MESSAGES 128
-#define MSGQ_MESSAGE_SIZE MAX_QUEUE_MSG_SIZE
+#define MAX_QUEUE_NAME_SIZE 24
+#define MAX_QUEUE_MSG_SIZE 4095
+#define MSGQ_MAX_MESSAGES 128
+#define MSGQ_MESSAGE_SIZE MAX_QUEUE_MSG_SIZE
/**
* \namespace NsMessageBroker
@@ -25,6 +25,12 @@ */
namespace NsMessageBroker
{
+#ifdef CUSTOMER_PASA
+ typedef struct {
+ int size;
+ char text[30000];
+ } shmem_t;
+#endif
/**
* \class Client
@@ -68,6 +74,12 @@ namespace NsMessageBroker */
MqClient();
+#ifdef CUSTOMER_PASA
+ int fd;
+ shmem_t *ptr;
+ int fd2;
+ shmem_t *ptr2;
+#endif
private:
diff --git a/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp b/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp index edb093ba28..ad122ac29f 100644 --- a/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp +++ b/customer-specific/pasa/src/3rd_party-static/MessageBroker/src/client/mb_mqclient.cpp @@ -45,49 +45,70 @@ namespace NsMessageBroker bool MqClient::MqOpen()
{
- struct mq_attr attributes;
- attributes.mq_maxmsg = MSGQ_MAX_MESSAGES;
- attributes.mq_msgsize = MAX_QUEUE_MSG_SIZE;
- attributes.mq_flags = 0;
+ struct mq_attr attributes;
+ attributes.mq_maxmsg = MSGQ_MAX_MESSAGES;
+ attributes.mq_msgsize = MAX_QUEUE_MSG_SIZE;
+ attributes.mq_flags = 0;
- m_sndHandle = ::mq_open(m_send.c_str(), O_WRONLY|O_CREAT|O_NONBLOCK, 0666, &attributes);
- m_rcvHandle = ::mq_open(m_recv.c_str(), O_RDONLY|O_CREAT, 0666, &attributes);
+ m_sndHandle = ::mq_open(m_send.c_str(), O_WRONLY|O_CREAT|O_NONBLOCK, 0666, &attributes);
+ m_rcvHandle = ::mq_open(m_recv.c_str(), O_RDONLY|O_CREAT, 0666, &attributes);
- return ((m_sndHandle != -1) && (m_rcvHandle != -1)) ? true : false;
+ return ((m_sndHandle != -1) && (m_rcvHandle != -1)) ? true : false;
}
-
ssize_t MqClient::Send(const std::string& data)
{
-
- std::string rep = data;
- const char* ptrBuffer = rep.c_str();
-
- if(rep.empty())
- {
- return -1;
- }
-
- int retVal = ::mq_send(m_sndHandle, ptrBuffer, rep.length(), 0);
-
- return (retVal == -1) ? retVal : rep.length();
-
+ std::string rep = data;
+ int retVal = 0;
+ const char* ptrBuffer = rep.c_str();
+ std::string shm_data="SHARED_MEMORY";
+
+ if (rep.empty())
+ {
+ return -1;
+ }
+
+ if (rep.length() > MAX_QUEUE_MSG_SIZE)
+ {
+ int len = rep.length();
+ ptr2->size = len;
+ memset(ptr2->text, 0, sizeof(ptr2->size));
+ memcpy(ptr2->text, rep.c_str(), len); /* write to the shared memory */
+
+ int retVal = ::mq_send(m_sndHandle, &shm_data[0], shm_data.length(), 0);
+ DBG_MSG(("Sending length %d", len));
+ return (retVal == -1) ? retVal :(shm_data.length());
+ }
+ else
+ {
+ int retVal = ::mq_send(m_sndHandle, ptrBuffer, rep.length(), 0);
+ DBG_MSG(("MqClient::Send:Buffer %s", ptrBuffer));
+ return (retVal == -1) ? retVal : rep.length();
+ }
}
-
ssize_t MqClient::Recv(std::string& data)
{
- char buf[MAX_QUEUE_MSG_SIZE];
- ssize_t length=0;
-
- if ((length = ::mq_receive(m_rcvHandle, buf, sizeof(buf), 0)) == -1)
- {
- return -1;
- }
-
- data = std::string(&buf[1], length);
-
- return length;
+ char buf[MAX_QUEUE_MSG_SIZE] = {'\0'};
+ ssize_t length=0;
+
+ if ((length = ::mq_receive(m_rcvHandle, buf, sizeof(buf), 0)) == -1)
+ {
+ return -1;
+ }
+
+ if (strcmp(buf+1,"SHARED_MEMORY") != 0)
+ {
+ data = std::string(&buf[1], length);
+ DBG_MSG(("MqClient::Received Data at mq_client %s",data.c_str()));
+ return length;
+ } else {
+ DBG_MSG(("Shared Memory::Received Length %d",ptr->size));
+ data = std::string(ptr->text ,ptr->size);
+ length = ptr->size;
+
+ return length;
+ }
}
} /* namespace NsMessageBroker */
diff --git a/src/3rd_party-static/MessageBroker/include/mb_controller.hpp b/src/3rd_party-static/MessageBroker/include/mb_controller.hpp index 13be9f3348..705c53d0d0 100644 --- a/src/3rd_party-static/MessageBroker/include/mb_controller.hpp +++ b/src/3rd_party-static/MessageBroker/include/mb_controller.hpp @@ -14,7 +14,8 @@ #include "mb_tcpclient.hpp" #ifdef CUSTOMER_PASA #include "mb_mqclient.hpp" - +#include <sys/mman.h> +#include <sys/stat.h> #endif #include "utils/lock.h" diff --git a/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp b/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp index 2d7ebb36af..7326ff7cc3 100644 --- a/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp +++ b/src/3rd_party-static/MessageBroker/src/client/mb_controller.cpp @@ -34,8 +34,27 @@ namespace NsMessageBroker mControllersIdStart(-1),
mControllersIdCurrent(0)
{
- mControllersName = name;
- mClientType = MQUEUE;
+ mControllersName = name; + mClientType = MQUEUE; + + /* Create the shared memory object */ + fd = shm_open("/SHNAME_SDLQUEUE", O_RDWR | O_CREAT , 0777); + //DBG_MSG(("MqClient::Send:A")); + ftruncate(fd, sizeof(shmem_t)); + /* Set the size of the shared memory object */ + + // DBG_MSG(("MqClient::Send::B")); + /* Get a pointer to the shared memory, map it into + * our address space */ + ptr = ( shmem_t *)mmap(0, sizeof(shmem_t), PROT_READ, MAP_SHARED, fd, 0); + // DBG_MSG(("MqClient::Send:B")); + + fd2 = shm_open("/SHNAME_SDLQUEUE2", O_RDWR | O_CREAT, 0777); + DBG_MSG(("A\n")); + ftruncate(fd2, sizeof(shmem_t)); + /* Get a pointer to the shared memory object */ + ptr2 =(shmem_t *) mmap(0, sizeof(shmem_t), + PROT_WRITE, MAP_SHARED, fd2, 0); } #endif @@ -46,6 +65,14 @@ namespace NsMessageBroker CMessageBrokerController::~CMessageBrokerController() { +#ifdef CUSTOMER_PASA + ftruncate(fd, 0); + munmap(ptr, sizeof(shmem_t)); + close(fd); + ftruncate(fd2, 0); + munmap(ptr2, sizeof(shmem_t)); + close(fd2); +#endif } #ifdef CUSTOMER_PASA
@@ -62,11 +89,11 @@ namespace NsMessageBroker {
switch(mClientType)
{
- case TCP: return TcpClient::Send(data);
- case MQUEUE: return MqClient::Send(data);
+ case TCP: return TcpClient::Send(data);
+ case MQUEUE: return MqClient::Send(data); + default : return -1;
}
} - #endif ssize_t CMessageBrokerController::Recv(std::string& data) { @@ -81,6 +108,13 @@ namespace NsMessageBroker while (!stop) { Json::Value root; +#ifdef CUSTOMER_PASA + if (m_receivingBuffer.length() > sizeof(shmem_t)) + { + m_receivingBuffer.clear(); + return 0; + } +#endif if (!m_reader.parse(m_receivingBuffer, root)) { DBG_MSG(("Received not JSON string! %s\n", m_receivingBuffer.c_str())); |