summaryrefslogtreecommitdiff
path: root/docs/tutorials/010/page05.html
blob: dca2a02b74e2c938f98667a61ba8f4f5188880cb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
<HTML>
<HEAD>
   <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
   <META NAME="Author" CONTENT="James CE Johnson">
   <TITLE>ACE Tutorial 010</TITLE>
</HEAD>
<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">

<CENTER><B><FONT SIZE=+2>ACE Tutorial 010</FONT></B></CENTER>

<CENTER><B><FONT SIZE=+2>Passing chunks of data through an ACE_Message_Queue</FONT></B></CENTER>


<P>
<HR WIDTH="100%">

Our <A HREF="task.cpp">Task</A> object definition.
<P>
Something to look at here is the ACE_Barrier usage.  In the
constructor, we tell the barrier how many threads we're using.  Then,
in the svc() method, we use the barrier's wait() method.  You can
think of the barrier as a semaphore initialized to the thread count.
  Each time wait()
is invoked, the semaphore is decremented and the thread is blocked.
  When the count equals zero, all threads are unblocked and allowed to
continue.
<P>
<font size=-1>Note:  This isn't the way ACE_Barrier really works, it's 
just an analogy</font>

<HR WIDTH="100%">
<PRE>
<font color=red>// $Id$</font>

<font color=blue>#include</font> "<font color=green>task.h</font>"
<font color=blue>#include</font> "<font color=green>block.h</font>"

<font color=red>/* Set our housekeeping pointer to NULL and tell the user we exist.  */</font>
<font color=#008888>Task::Task</font> (size_t n_threads)
  : barrier_ (n_threads),
    n_threads_ (n_threads)
{
  ACE_DEBUG ((LM_DEBUG,
              "<font color=green>(%P|%t) Task ctor 0x%x\n</font>",
              (void *) this));
}

<font color=red>/* Take care of cleanup & tell the user we're going away.  */</font>
<font color=#008888>Task::~Task</font> (void)
{
  ACE_DEBUG ((LM_DEBUG,
              "<font color=green>(%P|%t) Task dtor 0x%x\n</font>",
              (void *) this));

  <font color=red>/* Get our shutdown notification out of the queue and release it.  */</font>
  ACE_Message_Block *message;

  <font color=red>/* Like the getq() in svc() below, this will block until a message
    arrives.  By blocking, we know that the destruction will be paused
    until the last thread is done with the message block.  */</font>
  this->getq (message);
  message->release ();
}

<font color=red>/* Open the object to do work.  Next, we activate the Task into the
  number of requested threads.  */</font>
int 
<font color=#008888>Task::open</font> (void *unused)
{
  ACE_UNUSED_ARG (unused);

  return this->activate (THR_NEW_LWP,
                         n_threads_);
}

<font color=red>/* Tell the user we're closing and invoke the baseclass' close() to
  take care of things.  */</font>
int 
<font color=#008888>Task::close</font> (u_long flags)
{
  ACE_DEBUG ((LM_DEBUG,
              "<font color=green>(%P|%t) Task close 0x%x\n</font>",
              (void *) this));
  return <font color=#008888>inherited::close</font> (flags);
}

<font color=red>/* Our svc() method waits for work on the queue and then processes
  that work.  */</font>
int 
<font color=#008888>Task::svc</font> (void)
{
  <font color=red>/* This will cause all of the threads to wait on this line until all
    have invoked this method.  The net result is that no thread in the
    Task will get a shot at the queue until all of the threads are
    active.  There's no real need to do this but it's an easy intro
    into the use of ACE_Barrier.  */</font>
  this->barrier_.wait ();

  ACE_DEBUG ((LM_DEBUG,
              "<font color=green>(%P|%t) Task 0x%x starts in thread %d\n</font>",
              (void *) this,
              <font color=#008888>ACE_Thread::self</font> ()));

  <font color=red>/* Remember that get() needs a reference to a pointer.  To save
    stack thrashing we'll go ahead and create a pointer outside of the
    almost- infinite loop.  */</font>
  ACE_Message_Block *message;

  for (;;)
    {
      <font color=red>/* Get a message from the queue.  Note that getq() will block
        until a message shows up.  That makes us very
        processor-friendly.  */</font>
      if (this->getq (message) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "<font color=green>%p\n</font>",
                           "<font color=green>getq</font>"),
                          -1);
      <font color=red>/* If we got the shutdown request, we need to go away.  */</font>
      if (message->msg_type () == <font color=#008888>ACE_Message_Block::MB_HANGUP</font>)
        {
          <font color=red>/* Forward the request to any peer threads.  */</font>
          this->putq (message);

          <font color=red>/* Leave the infinite loop so that the thread exits.  */</font>
          break;
        }

      <font color=red>/* The message queue stores char* data.  We use rd_ptr() to get
        to the beginning of the data.  */</font>
      const char *cp = message->rd_ptr ();

      <font color=red>/* Move the rd_ptr() past the data we read.  This isn't real
        useful here since we won't be reading any more from the block
        but it's a good habit to get into.  */</font>
      message->rd_ptr (<font color=#008888>ACE_OS::strlen</font> (cp));
    
      <font color=red>/* Display the block's address and data to the user.  */</font>
      ACE_DEBUG ((LM_DEBUG,
                  "<font color=green>(%P|%t) Block 0x%x contains (%s)\n</font>",
                  (void *) message,
                  cp));

      <font color=red>/* Pretend that it takes a while to process the data.  */</font>
      <font color=#008888>ACE_OS::sleep</font> (ACE_Time_Value (0, 5000));

      <font color=red>/* Release the message block.  Notice that we never delete a
        message block.  Blocks are reference counted & the release()
        method will take care of the delete when there are no more
        references to the data.  */</font>
      message->release ();
    }

  return 0;
}
</PRE>
<HR WIDTH="100%">
<P>
This is all pretty straight-forward too.  One gottcha we avoided was a memory leak
due to our shutdown message.  Notice that svc() enqueues that block without bothering
to see if there are any more threads to dequeue it.  Thats why our dtor can call getq()
without worrying about blocking infinitely:  it knows the message block will be there.
<P><HR WIDTH="100%">
<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] [<A HREF="page06.html">Continue This Tutorial</A>]</CENTER>