summaryrefslogtreecommitdiff
path: root/libs/interprocess/test/message_queue_test.cpp
blob: afb23016b63427627996441fdd468c8ae7964886 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// See http://www.boost.org/libs/interprocess for documentation.
//
//////////////////////////////////////////////////////////////////////////////

#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/managed_external_buffer.hpp>
#include <boost/interprocess/managed_heap_memory.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/set.hpp>
#include <boost/interprocess/allocators/node_allocator.hpp>
#include <boost/interprocess/detail/os_thread_functions.hpp>
#include <vector>
#include <iostream>
#include <cstddef>
#include <limits>
#include <memory>
#include <string>
#include "get_process_id_name.hpp"

////////////////////////////////////////////////////////////////////////////////
//                                                                            //
//  This example tests the process shared message queue.                      //
//                                                                            //
////////////////////////////////////////////////////////////////////////////////

using namespace boost::interprocess;

//This test inserts messages with different priority and marks them with a
//time-stamp to check if receiver obtains highest priority messages first and
//messages with same priority are received in fifo order
bool test_priority_order()
{
   message_queue::remove(test::get_process_id_name());
   {
      message_queue mq1
         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
         mq2
         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));

      //We test that the queue is ordered by priority and in the
      //same priority, is a FIFO
      message_queue::size_type recvd = 0;
      unsigned int priority = 0;
      std::size_t tstamp;
      unsigned int priority_prev;
      std::size_t  tstamp_prev;

      //We will send 100 message with priority 0-9
      //The message will contain the timestamp of the message
      for(std::size_t i = 0; i < 100; ++i){
         tstamp = i;
         mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
      }

      priority_prev = (std::numeric_limits<unsigned int>::max)();
      tstamp_prev = 0;

      //Receive all messages and test those are ordered
      //by priority and by FIFO in the same priority
      for(std::size_t i = 0; i < 100; ++i){
         mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
         if(priority > priority_prev)
            return false;
         if(priority == priority_prev &&
            tstamp   <= tstamp_prev){
            return false;
         }
         priority_prev  = priority;
         tstamp_prev    = tstamp;
      }

      //Now retry it with different priority order
      for(std::size_t i = 0; i < 100; ++i){
         tstamp = i;
         mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
      }

      priority_prev = (std::numeric_limits<unsigned int>::max)();
      tstamp_prev = 0;

      //Receive all messages and test those are ordered
      //by priority and by FIFO in the same priority
      for(std::size_t i = 0; i < 100; ++i){
         mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
         if(priority > priority_prev)
            return false;
         if(priority == priority_prev &&
            tstamp   <= tstamp_prev){
            return false;
         }
         priority_prev  = priority;
         tstamp_prev    = tstamp;
      }
   }
   message_queue::remove(test::get_process_id_name());
   return true;
}

//[message_queue_test_test_serialize_db
//This test creates a in memory data-base using Interprocess machinery and
//serializes it through a message queue. Then rebuilds the data-base in
//another buffer and checks it against the original data-base
bool test_serialize_db()
{
   //Typedef data to create a Interprocess map
   typedef std::pair<const std::size_t, std::size_t> MyPair;
   typedef std::less<std::size_t>   MyLess;
   typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
      node_allocator_t;
   typedef map<std::size_t,
               std::size_t,
               std::less<std::size_t>,
               node_allocator_t>
               MyMap;

   //Some constants
   const std::size_t BufferSize  = 65536;
   const std::size_t MaxMsgSize  = 100;

   //Allocate a memory buffer to hold the destiny database using vector<char>
   std::vector<char> buffer_destiny(BufferSize, 0);

   message_queue::remove(test::get_process_id_name());
   {
      //Create the message-queues
      message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);

      //Open previously created message-queue simulating other process
      message_queue mq2(open_only, test::get_process_id_name());

      //A managed heap memory to create the origin database
      managed_heap_memory db_origin(buffer_destiny.size());

      //Construct the map in the first buffer
      MyMap *map1 = db_origin.construct<MyMap>("MyMap")
                                       (MyLess(),
                                       db_origin.get_segment_manager());
      if(!map1)
         return false;

      //Fill map1 until is full
      try{
         std::size_t i = 0;
         while(1){
            (*map1)[i] = i;
            ++i;
         }
      }
      catch(boost::interprocess::bad_alloc &){}

      //Data control data sending through the message queue
      std::size_t sent = 0;
      message_queue::size_type recvd = 0;
      message_queue::size_type total_recvd = 0;
      unsigned int priority;

      //Send whole first buffer through the mq1, read it
      //through mq2 to the second buffer
      while(1){
         //Send a fragment of buffer1 through mq1
       std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
                                       MaxMsgSize : (db_origin.get_size() - sent);
         mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
               , bytes_to_send
               , 0);
         sent += bytes_to_send;
         //Receive the fragment through mq2 to buffer_destiny
       mq2.receive( &buffer_destiny[total_recvd]
                , BufferSize - recvd
                  , recvd
                  , priority);
         total_recvd += recvd;

         //Check if we have received all the buffer
         if(total_recvd == BufferSize){
            break;
         }
      }

      //The buffer will contain a copy of the original database
      //so let's interpret the buffer with managed_external_buffer
      managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);

      //Let's find the map
      std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
      MyMap *map2 = ret.first;

      //Check if we have found it
      if(!map2){
         return false;
      }

      //Check if it is a single variable (not an array)
      if(ret.second != 1){
         return false;
      }

      //Now let's compare size
      if(map1->size() != map2->size()){
         return false;
      }

      //Now let's compare all db values
     MyMap::size_type num_elements = map1->size();
     for(std::size_t i = 0; i < num_elements; ++i){
         if((*map1)[i] != (*map2)[i]){
            return false;
         }
      }

      //Destroy maps from db-s
      db_origin.destroy_ptr(map1);
      db_destiny.destroy_ptr(map2);
   }
   message_queue::remove(test::get_process_id_name());
   return true;
}
//]

static const int MsgSize = 10;
static const int NumMsg  = 1000;
static char msgsend [10];
static char msgrecv [10];

static boost::interprocess::message_queue *pmessage_queue;

void receiver()
{
   boost::interprocess::message_queue::size_type recvd_size;
   unsigned int priority;
   int nummsg = NumMsg;

   while(nummsg--){
      pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
   }
}

bool test_buffer_overflow()
{
   boost::interprocess::message_queue::remove(test::get_process_id_name());
   {
      std::auto_ptr<boost::interprocess::message_queue>
         ptr(new boost::interprocess::message_queue
               (create_only, test::get_process_id_name(), 10, 10));
      pmessage_queue = ptr.get();

      //Launch the receiver thread
      boost::interprocess::ipcdetail::OS_thread_t thread;
      boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
      boost::interprocess::ipcdetail::thread_yield();

      int nummsg = NumMsg;

      while(nummsg--){
         pmessage_queue->send(msgsend, MsgSize, 0);
      }

      boost::interprocess::ipcdetail::thread_join(thread);
   }
   boost::interprocess::message_queue::remove(test::get_process_id_name());
   return true;
}


//////////////////////////////////////////////////////////////////////////////
//
// test_multi_sender_receiver is based on Alexander (aalutov's)
// testcase for ticket #9221. Many thanks.
//
//////////////////////////////////////////////////////////////////////////////

static boost::interprocess::message_queue *global_queue = 0;
//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
static const int MULTI_NUM_MSG_PER_SENDER = 10000;
//Message queue message capacity
static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
static const int MULTI_THREAD_COUNT = 10;

static void multisend()
{
   char buff;
   for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
      global_queue->send(&buff, 1, 0);
   }
   global_queue->send(&buff, 0, 0);
   //std::cout<<"writer thread complete"<<std::endl;
}

static void multireceive()
{
   char buff;
   size_t size;
   int received_msgs = 0;
   unsigned int priority;
   do {
      global_queue->receive(&buff, 1, size, priority);
      ++received_msgs;
   } while (size > 0);
   --received_msgs;
   //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
}


bool test_multi_sender_receiver()
{
   bool ret = true;
   //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
   try {
      boost::interprocess::message_queue::remove(test::get_process_id_name());
      boost::interprocess::message_queue mq
         (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
      global_queue = &mq;
      std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);

      //Launch senders receiver thread
      for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
         boost::interprocess::ipcdetail::thread_launch
            (threads[i], &multisend);
      }

      for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
         boost::interprocess::ipcdetail::thread_launch
            (threads[MULTI_THREAD_COUNT+i], &multireceive);
      }

      for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
         boost::interprocess::ipcdetail::thread_join(threads[i]);
         //std::cout << "Joined thread " << i << std::endl;
      }
   }
   catch (std::exception &e) {
      std::cout << "error " << e.what() << std::endl;
      ret = false;
   }
   boost::interprocess::message_queue::remove(test::get_process_id_name());
   return ret;
}


int main ()
{
   if(!test_priority_order()){
      return 1;
   }

   if(!test_serialize_db()){
      return 1;
   }

   if(!test_buffer_overflow()){
      return 1;
   }

   if(!test_multi_sender_receiver()){
      return 1;
   }

   return 0;
}

#include <boost/interprocess/detail/config_end.hpp>