summaryrefslogtreecommitdiff
path: root/docs/tutorials/018/Test_T.cpp
blob: dfb01939d1995bc51e41656cf776664a9b49f84b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191

// $Id$

/* This is something new... Since we're included by the header, we
   have to provide a sentry to protect against recursive inclusion.
 */
#ifndef TEST_T_C
#define TEST_T_C

// Get our definition
#include "Test_T.h"

// We'll hard-code the thread count.  Mucking around with that isn't
// really the point of the exercise today...
#define TEST_THREAD_COUNT 5

/* Construction time...
   Initialize the baseclass, the name and the barrier.  Since the
   client will probably invoke run() next, we go ahead an announce our 
   creation to make the output more readable.
 */
template <class MUTEX>
Test_T<MUTEX>::Test_T( const char * _name )
        : ACE_Task<ACE_MT_SYNCH>()
         ,name_(_name)
         ,barrier_(TEST_THREAD_COUNT)
{
    ACE_DEBUG ((LM_INFO, "(%P|%t|%T)\tTest_T (%s) created\n", _name ));
}

/* Activate the threads and create some test data...
 */
template <class MUTEX>
int Test_T<MUTEX>::run(void)
{
        // Try to activate the set of threads that will test the mutex
    if( this->open() == -1 )
    {
        return -1;
    }

        // Create a set of messages.  I chose twice the thread count
        // so that we can see how they get distributed.
    for( int i = 0 ; i < TEST_THREAD_COUNT*2 ; ++i )
    {
            // A message block big enough for a simple message.
        ACE_Message_Block * message = new ACE_Message_Block(64);

            // Put some text into the message block so that we can
            // know what's going on when we get to svc()
        sprintf(message->wr_ptr(),"Message Number %d",i);
        message->wr_ptr( strlen(message->rd_ptr())+1 );

            // Send the message to the thread pool
        if( this->send(message) == -1 )
        {
            break;
        }
    }

        // Send a hangup to the thread pool so that we can exit.
    if( this->send() == -1 )
    {
        return -1;
    }

        // Wait for all of the threads to exit and then return to the client.
    return this->wait();
}

/* Send a message to the thread pool
 */
template <class MUTEX>
int Test_T<MUTEX>::send( ACE_Message_Block * _message )
{
        // If no message was provided, create a hangup message.
    if( ! _message )
    {
        _message = new
            ACE_Message_Block(0,ACE_Message_Block::MB_HANGUP);
    }

        // Use the duplicate() method when sending the message.  For
        // this simple application, that may be overkill but it's a
        // good habit.  duplicate() will increment the reference count 
        // so that each user of the message can release() it when
        // done.  The last user to call release() will cause the data
        // to be deleted.
    if( this->putq(_message->duplicate()) == -1 )
    {
            // Error?  release() the message block and return failure.
        _message->release();
        return -1;
    }

        // release() the data to prevent memory leaks.
    _message->release();

    return 0;
}

/* A farily typical open().  Just activate the set of threads and return.
 */
template <class MUTEX>
int Test_T<MUTEX>::open( void * _arg )
{
    ACE_UNUSED_ARG(_arg);
    return this->activate(THR_NEW_LWP, TEST_THREAD_COUNT);
}

/* svc() is also fairly typical.  The new part is the use of the guard 
   to simulate protection of shared resources.
 */
template <class MUTEX>
int Test_T<MUTEX>::svc(void)
{
        // Keep a simple thread identifier.  We could always use the
        // thread id but this is a nice, simple number.
    int my_number = ++thread_num_;

    ACE_DEBUG ((LM_INFO, "%d (%P|%t|%T)\tTest_T::svc() Entry\n",
                my_number));

        // Wait for all of threads to get started so that they all
        // have a fair shot at the message queue.  Comment this out
        // and see how the behaviour changes.  Does it surprise you?
    barrier_.wait();

    ACE_Message_Block * message;
    int mcount = 0;

        // This would usually be an almost-infinite loop.  Instead,
        // I've governed it so that no single thread can get more than 
        // "thread count" number of messages.  You'll see that with
        // ACE_Mutex, this is just about the only way to keep the
        // first thread from getting all the action.  Ths is obviously 
        // just for sake of the test since you don't want your
        // real-world app to exit after a fixed number of messages!
    while( mcount < TEST_THREAD_COUNT )
    {
            // Get a message.  Since the message queue is already
            // thread-safe we don't have to guard it.  In fact, moving 
            // the guard up above getq() will decrease your
            // parallelization.
        if( getq(message) == -1 )
        {
            break;
        }

            // Now we pretend that there are shared resources required 
            // to process the data.  We grab the mutex through the
            // guard and "do work".  In a real application, you'll
            // want to keep these critical sections as small as
            // possible since they will reduce the usefulness of
            // multi-threading.
        guard_t guard(mutex_);

            // Increase our message count for the debug output and the
            // governor.
        ++mcount;

            // Check for a hangup request...
            // Notice the use of release() again to prevent leaks
        if( message->msg_type() == ACE_Message_Block::MB_HANGUP )
        {
            message->release();
            break;
        }

            // Display the message so that we can see if things are
            // working the way we want.
        ACE_DEBUG ((LM_INFO, "%d (%P|%t|%T)\tTest_T::svc() received message #%d (%s)\n",
                    my_number,mcount,message->rd_ptr()));

            // Pretend that the work takes some time to complete.
            // Remember, we're holding that lock during this time!
        ACE_OS::sleep(1);

            // No leaks...
        message->release();
    }

        // Send a hangup to the other threads in the pool.  If we don't
        // do this then wait() will never exit since all of the other
        // threads are still blocked on getq().
    this->send();
    
    return(0);
};

#endif // TEST_T_C