summaryrefslogtreecommitdiff
path: root/docs/tutorials/010/message_queue.cpp
blob: c52273a134bcdb0478f368705ac3e8b46935abe9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

// $Id$

/*
   To illustrate the ACE_Message_Queue, we use a derivative of ACE_Task<>.  We
   also derive from ACE_Message_Block to show that we don't have memory leaks. 
 */
#include "task.h"
#include "block.h"

int run_test( int iterations, int threads )
{
  /*
     Create and open an instance of our Task object.  I've overridden the
     open() method to make it look more like other ACE objects. 
   */
  Task task;

  if (task.open (threads) == -1)
  {
    ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
  }

  /*
     Give the threads a moment to open.  This isn't really necessary but if we
     don't we find that all of our blocks are constructed and enqueued before
     any of the threads get created. Basically, the sleep() makes the output
     look more interesting.       
   */
  ACE_OS::sleep (ACE_Time_Value (1));

  int i;
  for (i = 0; i < iterations; ++i)
  {
    /*
       Create a new message block to hold our data.  Here, we ask for a block
       that has 128 bytes of data space. 
     */
    Block *message = new Block (128);

    /*
       Grab the "write pointer".  This is a pointer into the data area where we 
       can write our data.  After writting the data you have to increment the
       wr_ptr() so that subsequent writes won't clobber what you've put there. 
     */
    ACE_OS::sprintf (message->wr_ptr (), "This is message %d.", i);
    message->wr_ptr (strlen (message->rd_ptr ()));

    /*
       Put the message block into the queue.  One of the threads in the Task
       object will pick up the block and "do work" on it. 
     */
    if (task.putq (message) == -1)
    {
      break;
    }
  }

  /*
     Once we're done, we have to signal the Task objects to shut down. There
     are several choices including: - Send a message of zero length - Send a
     message with a special content I don't like these choices because they're
     likely to interfere with application logic.  Instead, I use the message
     type feature to send a message of type "hangup".  The default type is
     MB_DATA, so when the tasks get a MB_HANGUP type, they know to go away. 
   */
  Block *message = new Block ();
  message->msg_type (ACE_Message_Block::MB_HANGUP);
  task.putq (message);

  /*
     Wait for the threads in our task object to go away. 
   */
  task.wait ();
  
  return(0);
}

int main (int argc, char *argv[])
{
  /*
     Set the number of iterations through our putq() loop and the number of
     threads to use in our Task<> derivative. 
   */
  int iterations = argc > 1 ? atoi (argv[1]) : 9;
  int threads = argc > 2 ? atoi (argv[2]) : 2;
  
  (void)run_test(iterations,threads);

  ACE_DEBUG ((LM_DEBUG, "(%P|%t) Application exiting\n"));
  
  exit(0);
}