summaryrefslogtreecommitdiff
path: root/examples/ASX/Event_Server/Event_Server/event_server.cpp
blob: 1e111a69fb999d431b72e4047d2896cca57bd28c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// $Id$

// Test the event server.


#include "ace/Stream.h"
#include "ace/Service_Config.h"
#include "Options.h"
#include "Consumer_Router.h"
#include "Event_Analyzer.h"
#include "Supplier_Router.h"

#if defined (ACE_HAS_THREADS)

typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
typedef ACE_Module<ACE_MT_SYNCH> MT_Module;

// Handle SIGINT and terminate the entire application.

class Quit_Handler : public ACE_Sig_Adapter
{
public:
  Quit_Handler (void);
  virtual int handle_input (ACE_HANDLE fd);
};

Quit_Handler::Quit_Handler (void)
  : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Service_Config::end_reactor_event_loop))
{  
  // Register to trap input from the user.
  if (ACE::register_stdin_handler (this,
				   ACE_Service_Config::reactor (),
				   ACE_Service_Config::thr_mgr ()) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
  // Register to trap the SIGINT signal.
  else if (ACE_Service_Config::reactor ()->register_handler 
	   (SIGINT, this) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
}

int
Quit_Handler::handle_input (ACE_HANDLE)
{
  options.stop_timer ();
  ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n"));
  options.print_results ();

  ACE_Service_Config::end_reactor_event_loop ();
  return 0;
}

int
main (int argc, char *argv[])
{
  ACE_Service_Config daemon;
  
  options.parse_args (argc, argv);

  {
    // Primary ACE_Stream for EVENT_SERVER application.
    MT_Stream event_server; 

    // Enable graceful shutdowns...
    Quit_Handler quit_handler;

    // Create the Supplier Router module.

    MT_Module *sr = new MT_Module ("Supplier_Router", 
				   new Supplier_Router (ACE_Service_Config::thr_mgr ()));

    // Create the Event Analyzer module.

    MT_Module *ea = new MT_Module ("Event_Analyzer", 
				   new Event_Analyzer, 
				   new Event_Analyzer);

    // Create the Consumer Router module.

    MT_Module *cr = new MT_Module ("Consumer_Router", 
				   0, // 0 triggers the creation of a ACE_Thru_Task...
				   new Consumer_Router (ACE_Service_Config::thr_mgr ()));

    // Push the Modules onto the event_server stream.

    if (event_server.push (sr) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1);
					
    if (event_server.push (ea) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1);

    if (event_server.push (cr) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1);

    // Set the high and low water marks appropriately.

    int wm = options.low_water_mark ();

    if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1);

    wm = options.high_water_mark ();
    if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1);

    options.start_timer ();

    // Perform the main event loop waiting for the user to type ^C or to
    // enter a line on the ACE_STDIN.

    daemon.run_reactor_event_loop ();
    // The destructor of event_server will close down the stream and
    // call the close() hooks on all the ACE_Tasks.
  }

  // Wait for the threads to exit.
  ACE_Service_Config::thr_mgr ()->wait ();
  ACE_DEBUG ((LM_DEBUG, "exiting main\n"));
  return 0;
}
#else
int 
main (void)
{
  ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1);
}
#endif /* ACE_HAS_THREADS */