summaryrefslogtreecommitdiff
path: root/ACE/examples/Reactor/Misc/test_demuxing.cpp
blob: 1bac58322765f1a0c5f8c7d9860b9563c2aa072c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
// Perform an extensive test of all the ACE_Reactor's event handler
// dispatching mechanisms.  These mechanisms illustrate how I/O,
// timeout, and signal events, as well as ACE_Message_Queues, can all
// be handled within the same demultiplexing and dispatching
// framework.  In addition, this example illustrates how to use the
// ACE_Reactor for devices that perform I/O via signals (such as SVR4
// message queues).

#include "ace/ACE.h"
#include "ace/Service_Config.h"
#include "ace/Reactor.h"
#include "ace/Task.h"
#include "ace/Reactor_Notification_Strategy.h"
#include "ace/Signal.h"
#include "ace/OS_NS_fcntl.h"
#include "ace/OS_NS_unistd.h"



// Default is to have a 2 second timeout.
static int timeout = 2;

class Sig_Handler : public ACE_Event_Handler
{
  // = TITLE
  //   This class illustrates how to handle signal-driven I/O using
  //   the <ACE_Reactor> framework.  Note that signals may be caught
  //   and processed without requiring the use of global signal
  //   handler functions or global signal handler data.
public:
  Sig_Handler (void);
  virtual ACE_HANDLE get_handle (void) const;
  virtual int handle_input (ACE_HANDLE);

  //FUZZ: disable check_for_lack_ACE_OS
  virtual int shutdown (ACE_HANDLE, ACE_Reactor_Mask);
  //FUZZ: enable check_for_lack_ACE_OS

  virtual int handle_signal (int signum, siginfo_t * = 0,
                             ucontext_t * = 0);

private:
  ACE_HANDLE handle_;
};

// A dummy_handle is required to reserve a slot in the ACE_Reactor's
// descriptor table.

Sig_Handler::Sig_Handler (void)
{
  // Assign the Sig_Handler a dummy I/O descriptor.  Note that even
  // though we open this file "Write Only" we still need to use the
  // ACE_Event_Handler::NULL_MASK when registering this with the
  // ACE_Reactor (see below).
  this->handle_ = ACE_OS::open (ACE_DEV_NULL, O_WRONLY);
  ACE_ASSERT (this->handle_ != ACE_INVALID_HANDLE);

  // Register signal handler object.  Note that NULL_MASK is used to
  // keep the ACE_Reactor from calling us back on the "/dev/null"
  // descriptor.  NULL_MASK just reserves a "slot" in the Reactor's
  // internal demuxing table, but doesn't cause it to dispatch the
  // event handler directly.  Instead, we use the signal handler to do
  // this.
  ACE_Reactor_Mask mask = ACE_Event_Handler::NULL_MASK;
  if (ACE_Reactor::instance ()->register_handler (this, mask) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n%a",
                "register_handler",
                1));

  // Create a sigset_t corresponding to the signals we want to catch.
  ACE_Sig_Set sig_set;

  sig_set.sig_add (SIGINT);
  sig_set.sig_add (SIGQUIT);
  sig_set.sig_add (SIGALRM);

  // Register the signal handler object to catch the signals.
  if (ACE_Reactor::instance ()->register_handler
      (sig_set, this) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n%a",
                "register_handler",
                1));
}

// Called by the ACE_Reactor to extract the handle.

ACE_HANDLE
Sig_Handler::get_handle (void) const
{
  return this->handle_;
}

// In a real application, this method would be where the read on the
// signal-driven I/O device would occur asynchronously.  For now we'll
// just print a greeting to let you know that everything is working
// properly!

int
Sig_Handler::handle_input (ACE_HANDLE)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%t) handling asynchonrous input...\n"));
  return 0;
}

// In a real application, this method would do any cleanup activities
// required when shutting down the I/O device.

int
Sig_Handler::shutdown (ACE_HANDLE, ACE_Reactor_Mask)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%t) closing down Sig_Handler...\n"));
  return 0;
}

// This method handles all the signals that are being caught by this
// object.  In our simple example, we are simply catching SIGALRM,
// SIGINT, and SIGQUIT.  Anything else is logged and ignored.  Note
// that the ACE_Reactor's signal handling mechanism eliminates the
// need to use global signal handler functions and data.

int
Sig_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
{
  switch (signum)
    {
#if !defined (ACE_WIN32)
    case SIGALRM:
      // Rearm the alarm.
      ACE_OS::alarm (4);
      break;
#endif /* !ACE_WIN32 */
    case SIGINT:
      // Tell the ACE_Reactor to enable the ready bit for
      // this->handle_.  The ACE_Reactor will subsequently call the
      // <Sig_Handler::handle_input> method from within its event
      // loop, i.e., the behavior triggered by the signal is handled
      // in the main event loop, rather than in the signal handler.
      return ACE_Reactor::instance ()->ready_ops
        (this->handle_,
         ACE_Event_Handler::READ_MASK,
         ACE_Reactor::ADD_MASK);
#if defined (SIGTERM) && (SIGTERM != 0)
    case SIGTERM:
      // This is coded thusly to avoid problems if SIGQUIT is a legit
      // value but is not a preprocessor macro.
#elif !defined (SIGQUIT) || (SIGQUIT != 0)
    case SIGQUIT:
#endif /* SIGTERM != 0 */
      ACE_Reactor::end_event_loop ();
      break;
    default:
      ACE_ERROR_RETURN ((LM_ERROR, "invalid signal"), -1);
      /* NOTREACHED */
    }

  return 0;
}

class STDIN_Handler : public ACE_Event_Handler
{
  // = TITLE
  //   This class illustrates that the ACE_Reactor can handle signals,
  //   STDIO, and timeouts using the same mechanisms.
public:
  STDIN_Handler (void);
  ~STDIN_Handler (void);
  virtual int handle_input (ACE_HANDLE);
  virtual int handle_timeout (const ACE_Time_Value &,
                              const void *arg);
};

STDIN_Handler::STDIN_Handler (void)
{
  if (ACE_Event_Handler::register_stdin_handler (this,
                                                 ACE_Reactor::instance (),
                                                 ACE_Thread_Manager::instance ()) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n",
                "register_stdin_handler"));

  // Register the <STDIN_Handler> to be dispatched once every
  // <timeout> seconds starting in <timeout> seconds.  This example
  // uses the "interval timer" feature of the <ACE_Reactor>'s timer
  // queue.
  else if (ACE_Reactor::instance ()->schedule_timer
           (this,
            0,
            ACE_Time_Value (timeout),
            ACE_Time_Value (timeout)) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n%a",
                "schedule_timer",
                1));
}

STDIN_Handler::~STDIN_Handler (void)
{
  if (ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (),
                                               ACE_Thread_Manager::instance ()) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n",
                "remove_stdin_handler"));
  else if (ACE_Reactor::instance ()->cancel_timer
           (this) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n%a",
                "cancel_timer",
                1));
}

int
STDIN_Handler::handle_timeout (const ACE_Time_Value &tv,
                               const void *)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%t) timeout occurred at %d sec, %d usec\n",
              tv.sec (),
              tv.usec ()));
  return 0;
}

// Read from input handle and write to stdout handle.

int
STDIN_Handler::handle_input (ACE_HANDLE handle)
{
  char buf[BUFSIZ];
  ssize_t n = ACE_OS::read (handle, buf, sizeof buf);

  switch (n)
    {
    case -1:
      if (errno == EINTR)
        return 0;
        /* NOTREACHED */
      else
        ACE_ERROR ((LM_ERROR,
                    "%p\n",
                    "read"));
      /* FALLTHROUGH */
    case 0:
      ACE_Reactor::end_event_loop ();
      break;
    default:
      {
        ssize_t result = ACE::write_n (ACE_STDOUT, buf, n);

        if (result != n)
          ACE_ERROR_RETURN ((LM_ERROR,
                             "%p\n",
                             "write"),
                            result == -1 && errno == EINTR ? 0 : -1);
      }
    }
  return 0;
}

class Message_Handler : public ACE_Task <ACE_SYNCH>
{
public:
  Message_Handler (void);

  virtual int handle_input (ACE_HANDLE);
  // Called back within the context of the <ACE_Reactor> Singleton to
  // dequeue and process the message on the <ACE_Message_Queue>.

  virtual int svc (void);
  // Run the "event-loop" periodically putting messages to our
  // internal <Message_Queue> that we inherit from <ACE_Task>.

private:
  ACE_Reactor_Notification_Strategy notification_strategy_;
  // This strategy will notify the <ACE_Reactor> Singleton when a new
  // message is enqueued.
};

Message_Handler::Message_Handler (void)
  : notification_strategy_ (ACE_Reactor::instance (),
                            this,
                            ACE_Event_Handler::READ_MASK)
{
  // Set this to the Reactor notification strategy.
  this->msg_queue ()->notification_strategy (&this->notification_strategy_);

  if (this->activate ())
    ACE_ERROR ((LM_ERROR,
                "%p\n",
                "activate"));
}

int
Message_Handler::svc (void)
{
  for (int i = 0;; i++)
    {
      ACE_Message_Block *mb = 0;

      ACE_NEW_RETURN (mb,
                      ACE_Message_Block (1),
                      0);

      mb->msg_priority (i);
      ACE_OS::sleep (1);

      // Note that this putq() call with automagically invoke the
      // notify() hook of our ACE_Reactor_Notification_Strategy,
      // thereby informing the <ACE_Reactor> Singleton to call our
      // <handle_input> method.
      if (this->putq (mb) == -1)
        {
          if (errno == ESHUTDOWN)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "(%t) queue is deactivated"), 0);
          else
            ACE_ERROR_RETURN ((LM_ERROR,
                               "(%t) %p\n",
                               "putq"),
                              -1);
        }
    }

  ACE_NOTREACHED (return 0);
}

int
Message_Handler::handle_input (ACE_HANDLE)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%t) Message_Handler::handle_input\n"));

  ACE_Message_Block *mb = 0;

  if (this->getq (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
    ACE_ERROR ((LM_ERROR,
                "(%t) %p\n",
                "dequeue_head"));
  else
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%t) priority = %d\n",
                  mb->msg_priority ()));
      mb->release ();
    }

  return 0;
}

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  ACE_Service_Config daemon (argv [0]);

  // Optionally start the alarm.
  if (argc > 1)
    {
      ACE_OS::alarm (4);
      timeout = ACE_OS::atoi (argv[1]);
    }

  // Signal handler.
  Sig_Handler sh;

  // Define an I/O handler object.
  STDIN_Handler ioh;

  // Define a message handler.
  Message_Handler mh;

  // Loop handling signals and I/O events until SIGQUIT occurs.

  while (ACE_Reactor::instance ()->event_loop_done () == 0)
    ACE_Reactor::instance ()->run_reactor_event_loop ();

  // Deactivate the message queue.
  mh.msg_queue ()->deactivate ();

  // Wait for the thread to exit.
  ACE_Thread_Manager::instance ()->wait ();
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) leaving main\n")));
  return 0;
}