summaryrefslogtreecommitdiff
path: root/examples/Reactor/Misc/test_demuxing.cpp
blob: f8f7992ce37e7fa8656500450a12ef90cf08a885 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// $Id$

// Perform an extensive test of all the ACE_Reactor's event handler
// dispatching mechanisms.  These mechanisms illustrate how I/O,
// timeout, and signal events, as well as ACE_Message_Queues, can all
// be handled within the same demultiplexing and dispatching
// framework.  In addition, this example illustrates how to use the
// ACE_Reactor for devices that perform I/O via signals (such as SVR4
// message queues).

#include "ace/Service_Config.h"
#include "ace/Task.h"

// Used to shut down the event loop.
static sig_atomic_t done = 0;

// This class illustrates how to handle signal-driven I/O using the
// ACE_Reactor framework.  Note that signals may be caught and
// processed without requiring the use of global signal handler
// functions or global signal handler data.

class Sig_Handler : public ACE_Event_Handler
{
public:
  Sig_Handler (void);
  virtual ACE_HANDLE get_handle (void) const;
  virtual int handle_input (ACE_HANDLE);
  virtual int shutdown (ACE_HANDLE, ACE_Reactor_Mask);
  virtual int handle_signal (ACE_HANDLE signum, siginfo_t * = 0, 
			     ucontext_t * = 0);

private:
  ACE_HANDLE handle_;
};

// A dummy_handle is required to reserve a slot in the ACE_Reactor's
// descriptor table.

Sig_Handler::Sig_Handler (void)
{
  // Assign the Sig_Handler a dummy I/O descriptor.  Note that even
  // though we open this file "Write Only" we still need to use the
  // ACE_Event_Handler::NULL_MASK when registering this with the
  // ACE_Reactor (see below).
  this->handle_ = ACE_OS::open (ACE_DEV_NULL, O_WRONLY);
  ACE_ASSERT (this->handle_ != -1);

  // Register signal handler object.  Note that NULL_MASK is used to
  // keep the ACE_Reactor from calling us back on the "/dev/null"
  // descriptor.
  if (ACE_Service_Config::reactor ()->register_handler 
      (this, ACE_Event_Handler::NULL_MASK) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n%a", "register_handler", 1));

  // Create a sigset_t corresponding to the signals we want to catch.
  ACE_Sig_Set sig_set;

  sig_set.sig_add (SIGINT);
  sig_set.sig_add (SIGQUIT);
  sig_set.sig_add (SIGALRM);  

  // Register the signal handler object to catch the signals.
  if (ACE_Service_Config::reactor ()->register_handler (sig_set, this) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n%a", "register_handler", 1));
}

// Called by the ACE_Reactor to extract the fd.

ACE_HANDLE
Sig_Handler::get_handle (void) const
{
  return this->handle_;
}

// In a real application, this method would be where the read on the
// signal-driven I/O device would occur asynchronously.  For now we'll
// just print a greeting to let you know that everything is working
// properly!

int 
Sig_Handler::handle_input (ACE_HANDLE)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) handling asynchonrous input...\n"));
  return 0;
}

// In a real application, this method would do any cleanup activities
// required when shutting down the I/O device.

int 
Sig_Handler::shutdown (ACE_HANDLE, ACE_Reactor_Mask)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) closing down Sig_Handler...\n"));
  return 0;
}

// This method handles all the signals that are being caught by this
// object.  In our simple example, we are simply catching SIGALRM,
// SIGINT, and SIGQUIT.  Anything else is logged and ignored.
//
// There are several advantages to using this approach.  First, 
// the behavior triggered by the signal is handled in the main event
// loop, rather than in the signal handler.  Second, the ACE_Reactor's 
// signal handling mechanism eliminates the need to use global signal 
// handler functions and data. 

int
Sig_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) received signal %S\n", signum));

  switch (signum)
    {
    case SIGALRM:
      // Rearm the alarm.
      ACE_OS::alarm (4);
      break;
    case SIGINT:
      // Tell the ACE_Reactor to enable the ready bit for
      // this->handle_.  The ACE_Reactor will subsequently call the
      // Sig_Handler::handle_input method from within its event loop.
      return ACE_Service_Config::reactor ()->ready_ops 
	(this->handle_, ACE_Event_Handler::READ_MASK, ACE_Reactor::ADD_MASK);
    case SIGQUIT:
      ACE_DEBUG ((LM_DEBUG, "(%t) %S: shutting down signal tester\n", signum));
      ACE_Service_Config::end_reactor_event_loop ();
      break;
    default: 
      ACE_DEBUG ((LM_DEBUG, 
		  "(%t) %S: not handled, returning to program\n", signum));
      break;
    }
  return 0;
}

// This class illustrates that the ACE_Reactor can handle signals,
// STDIO, and timeouts using the same mechanisms.

class STDIN_Handler : public ACE_Event_Handler
{
public:
  STDIN_Handler (void);
  virtual int handle_input (ACE_HANDLE);
  virtual int handle_timeout (const ACE_Time_Value &, 
			      const void *arg);
};

STDIN_Handler::STDIN_Handler (void)
{
  if (ACE::register_stdin_handler (this,
				   ACE_Service_Config::reactor (),
				   ACE_Service_Config::thr_mgr ()) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));

  // Register the STDIN_Handler to be dispatched once every two seconds.
  else if (ACE_Service_Config::reactor ()->schedule_timer
	   (this, 0, ACE_Time_Value (2), ACE_Time_Value (2)) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n%a", "schedule_timer", 1));
}

int 
STDIN_Handler::handle_timeout (const ACE_Time_Value &tv,
			       const void *)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) timeout occurred at %d sec, %d usec\n",
	      tv.sec (), tv.usec ()));
  return 0;
}

// Read from input descriptor and write to stdout descriptor.

int 
STDIN_Handler::handle_input (ACE_HANDLE handle)
{
  ssize_t n;
  char buf[BUFSIZ];

  switch (n = ACE_OS::read (handle, buf, sizeof buf))
    {
    case -1:
      if (errno == EINTR)
	return 0;
        /* NOTREACHED */
      else
	ACE_ERROR ((LM_ERROR, "%p\n", "read"));
      /* FALLTHROUGH */
    case 0:
      ACE_Service_Config::end_reactor_event_loop ();
      break;
    default:
      {
	ssize_t result = ACE::write_n (ACE_STDOUT, buf, n);

	if (result != n)
	  ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "write"), 
			    result == -1 && errno == EINTR ? 0 : -1);
      }
    }
  return 0;
}

class Message_Handler : public ACE_Task <ACE_MT_SYNCH>
{
public:
  Message_Handler (void);

  virtual int handle_input (ACE_HANDLE);
  // Called back within the context of the <ACE_Reactor> Singleton to
  // dequeue and process the message on the <ACE_Message_Queue>.

  virtual int svc (void);
  // Run the "event-loop" periodically putting messages to our
  // internal <Message_Queue> that we inherit from <ACE_Task>.

private:
  ACE_Reactor_Notification_Strategy notification_strategy_;
  // This strategy will notify the <ACE_Reactor> Singleton when a new
  // message is enqueued.
};

Message_Handler::Message_Handler (void)
  : notification_strategy_ (ACE_Service_Config::reactor (),
			    this,
			    ACE_Event_Handler::READ_MASK)
{
  // Set this to the Reactor notification strategy.
  this->msg_queue ()->notification_strategy (&this->notification_strategy_);

  if (this->activate ())
    ACE_ERROR ((LM_ERROR, "%p\n", "activate"));
}

int
Message_Handler::svc (void)
{
  for (int i = 0;; i++)
    {
      ACE_Message_Block *mb;

      ACE_NEW_RETURN (mb, ACE_Message_Block (1), 0);

      mb->msg_priority (i);
      ACE_OS::sleep (1);

      // Note that this putq() call with automagically invoke the
      // notify() hook of our ACE_Reactor_Notification_Strategy,
      // thereby informing the <ACE_Reactor> Singleton to call our
      // <handle_input> method.
      if (this->putq (mb) == -1)
	{
	  if (errno == ESHUTDOWN)
	    ACE_ERROR_RETURN ((LM_ERROR, "(%t) queue is deactivated"), 0);
	  else
	    ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "putq"), -1);
	}
    }

  return 0;
}

int
Message_Handler::handle_input (ACE_HANDLE)
{
  ACE_DEBUG ((LM_DEBUG, "(%t) Message_Handler::handle_input\n"));

  ACE_Message_Block *mb;

  if (this->getq (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
    ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
  else
    {
      ACE_DEBUG ((LM_DEBUG, "(%t) priority = %d\n", mb->msg_priority ()));
      delete mb;
    }

  return 0;
}

int
main (int argc, char *argv[])
{
  ACE_Service_Config daemon (argv [0]);

  // Signal handler.
  Sig_Handler sh;

  // Define an I/O handler object.
  STDIN_Handler ioh;

  // Define a message handler.
  Message_Handler mh;

  // Optionally start the alarm.
  if (argc > 1)
    ACE_OS::alarm (4);

  // Loop handling signals and I/O events until SIGQUIT occurs.

  while (daemon.reactor_event_loop_done () == 0)
    daemon.run_reactor_event_loop ();

  // Deactivate the message queue.
  mh.msg_queue ()->deactivate ();

  // Wait for the thread to exit.
  ACE_Service_Config::thr_mgr ()->wait ();
  ACE_DEBUG ((LM_DEBUG, "(%t) leaving main\n"));
  return 0;
}