summaryrefslogtreecommitdiff
path: root/docs/tutorials/017/barrier2.cpp
blob: 2ed76decc342a151094cf6b313910022614b3c07 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178

// $Id$

#include "Barrier_i.h"
#include "ace/Task.h"

/* We'll use a simple Task<> derivative to test our new Barrier
   object.
*/
class Test : public ACE_Task<ACE_NULL_SYNCH>
{
public:

        // Construct the object with a desired thread count
    Test(int _threads);

        // Open/begin the test.  As usual, we have to match the
        // ACE_Task signature.
    int open(void * _unused = 0);

        // Change the threads_ value for the next invocation of open()
    void threads(int _threads);

        // Get the current threads_ value.
    int threads(void);

        // Perform the test
    int svc(void);

protected:
        // How many threads the barrier will test.
    u_int threads_;

        // The Barrier object we'll use in our tests below
    Barrier barrier_;

        // This lets us pick one (eg -- the first) thread as the
        // "controller" for our little test...
    ACE_Atomic_Op<ACE_Mutex,u_int> tcount_;
};

/* Construct the object & initialize the threads value for open() to
   use.
*/
Test::Test(int _threads)
        : threads_(_threads), tcount_(0)
{
}

/* As usual, our open() will create one or more threads where we'll do
   the interesting work.
*/
int Test::open(void * _unused)
{
    ACE_UNUSED_ARG(_unused);

        // One thing about the barrier:  You have to tell it how many
        // threads it will be synching.  The threads() mutator on my
        // Barrier class lets you do that and hides the implementation
        // details at the same time.
    barrier_.threads(threads_);

        // Activate the tasks as usual...
    return this->activate(THR_NEW_LWP, threads_, 1);
}

void Test::threads(int _threads)
{
    threads_ = _threads;
}

int Test::threads(void)
{
    return threads_;
}

/* svc() will execute in each thread & do a few things with the
   Barrier we have.
 */
int Test::svc(void)
{
        // Say hello to everyone first.
    ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Created\n" ));

        // Increment and save the "tcount" value.  We'll use it in
        // just a moment...
    int me = ++tcount_;

        // Wait for all initial threads to get to this point before we
        // go any further.  This is standard barrier usage...
    barrier_.wait();

        // Setup our random number generator.
    ACE_Time_Value now(ACE_OS::gettimeofday());
    ACE_RANDR_TYPE seed = now.usec();
    ACE_OS::srand(seed);
    int delay;

        // We'll arbitrarily choose the first activated thread to be
        // the controller.  After it sleeps a few seconds, it will add
        // five threads.
    if( me == 1 )
    {
            // Sleep from 1 to 10 seconds so that some of the other
            // threads will be into their for() loop.
        delay = ACE_OS::rand_r(seed)%10;
        ACE_OS::sleep(abs(delay)+1);

            // Make ourselves the barrier owner so that we can change
            // the number of threads.  This should be done with care...
        barrier_.owner( ACE_OS::thr_self() );

            // Add 5 threads to the barrier and then activate() to
            // make them real.  Notice the third parameter to
            // activate().  Without this parameter, the threads won't
            // be created.
        if( barrier_.threads(threads_+5) == 0 )
        {
            this->activate(THR_NEW_LWP,5,1);
        }
    }

        // This for() loop represents an "infinite" work loop in an
        // application. The theory is that the threads are dividing up
        // some work but need to "recalibrate" if more threads are
        // added.  I'll just do five iterations so that the test
        // doesn't run forever.
    int i;
    for( i = 0 ; i < 5 ; ++i )
    {
            // The sleep() represents time doing work.
        delay = ACE_OS::rand_r(seed)%7;
        ACE_OS::sleep(abs(delay)+1);

        ACE_DEBUG(( LM_INFO, "(%P|%t|%T)\tThread %.2d of %.2d iteration %.2d\n", me, threads_, i ));

            // If the local threads_ variable doesn't match the number
            // in the barrier, then the controller must have changed
            // the thread count.  We'll wait() for everyone and then
            // recalibrate ourselves before continuing.
        if( this->threads_ != barrier_.threads() )
        {
            ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Waiting for thread count to increase to %d from %d\n",
                        barrier_.threads(), this->threads_ ));

                // Wait for all our sibling threads...
            barrier_.wait();

                // Set our local variable so that we don't come here again.
            this->threads_ = barrier_.threads();

                // Recalibration can be anything you want.  At this
                // point, we know that all of the threads are synch'd
                // and ready to go.
        }
    }

        // Re-synch all of the threads before they exit.  This isn't
        // really necessary but I like to do it.
    barrier_.done();

    return(0);
}

/* Our test application...
 */
int main(int, char**)
{
        // Create the test object with 5 threads
    Test test(5);

        // and open it to test the barrier.
    test.open();
        // Now wait for them all to exit.
    test.wait();

    return(0);
}