summaryrefslogtreecommitdiff
path: root/examples/ASX/Event_Server/Event_Server/event_server.cpp
blob: 6dba2c3828303f43e3698ff92de39a5c1a3ee9c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// $Id$

// Test the event server.

#include "ace/Stream.h"
#include "ace/Service_Config.h"
#include "Options.h"
#include "Consumer_Router.h"
#include "Event_Analyzer.h"
#include "Supplier_Router.h"

typedef ACE_Stream<ACE_SYNCH> MT_Stream;
typedef ACE_Module<ACE_SYNCH> MT_Module;

class Quit_Handler : public ACE_Sig_Adapter
  // = TITLE
  //     Handle SIGINT and terminate the entire application.
{
public:
  Quit_Handler (void);
  virtual int handle_input (ACE_HANDLE fd);
};

Quit_Handler::Quit_Handler (void)
  : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop))
{  
  // Register to trap input from the user.
  if (ACE::register_stdin_handler (this,
				   ACE_Reactor::instance (),
				   ACE_Thread_Manager::instance ()) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
  // Register to trap the SIGINT signal.
  else if (ACE_Reactor::instance ()->register_handler 
	   (SIGINT, this) == -1)
    ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
}

int
Quit_Handler::handle_input (ACE_HANDLE)
{
  // This code here will make sure we actually wait for the user to
  // type something. On platforms like Win32, handle_input() is called
  // prematurely (even when there is no data).
  char temp_buffer [BUFSIZ];
  ACE_OS::read (ACE_STDIN, temp_buffer, sizeof (temp_buffer));

  Options::instance ()->stop_timer ();
  ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n"));
  Options::instance ()->print_results ();

  ACE_Reactor::end_event_loop();
  return -1;
}

int
main (int argc, char *argv[])
{
#if defined (ACE_HAS_THREADS)
  ACE_Service_Config daemon;
  
  Options::instance ()->parse_args (argc, argv);
  {
    // Primary ACE_Stream for EVENT_SERVER application.
    MT_Stream event_server; 

    // Enable graceful shutdowns...
    Quit_Handler quit_handler;

    Peer_Router_Context *src;
    // Create the Supplier_Router's routing context, which contains
    // context shared by both the write-side and read-side of the
    // Supplier_Router Module.
    ACE_NEW_RETURN (src, 
		    Peer_Router_Context (Options::instance ()->supplier_port ()),
		    -1);

    MT_Module *srm = 0;
    // Create the Supplier Router module.
    ACE_NEW_RETURN (srm, MT_Module 
		    ("Supplier_Router", 
		     new Supplier_Router (src),
		     new Supplier_Router (src)),
		    -1);

    MT_Module *eam = 0;
    // Create the Event Analyzer module.
    ACE_NEW_RETURN (eam, MT_Module 
		    ("Event_Analyzer", 
		     new Event_Analyzer, 
		     new Event_Analyzer), 
		    -1);

    Peer_Router_Context *crc;
    // Create the Consumer_Router's routing context, which contains
    // context shared by both the write-side and read-side of the
    // Consumer_Router Module.
    ACE_NEW_RETURN (crc, 
		    Peer_Router_Context (Options::instance ()->consumer_port ()),
		    -1);

    MT_Module *crm = 0;
    // Create the Consumer Router module.
    ACE_NEW_RETURN (crm, MT_Module 
		    ("Consumer_Router",
		     new Consumer_Router (crc),
		     new Consumer_Router (crc)),
		    -1);

    // Push the Modules onto the event_server stream.

    if (event_server.push (srm) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1);
					
    if (event_server.push (eam) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1);

    if (event_server.push (crm) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1);

    // Set the high and low water marks appropriately.

    int wm = Options::instance ()->low_water_mark ();

    if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1);

    wm = Options::instance ()->high_water_mark ();
    if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
      ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1);

    Options::instance ()->start_timer ();

    // Perform the main event loop waiting for the user to type ^C or
    // to enter a line on the ACE_STDIN.

    ACE_Reactor::run_event_loop ();
    // The destructor of event_server will close down the stream and
    // call the close() hooks on all the ACE_Tasks.
  }

  // Wait for the threads to exit.
  ACE_Thread_Manager::instance ()->wait ();
  ACE_DEBUG ((LM_DEBUG, "exiting main\n"));
#else
  ACE_UNUSED_ARG (argc);
  ACE_UNUSED_ARG (argv);
  ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
#endif /* ACE_HAS_THREADS */
  return 0;
}