summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Event_Channel.h
blob: d377fb9487173171371143adf613e5f0dd0990a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/* -*- C++ -*- */
// $Id$

// ============================================================================
//
// = LIBRARY
//    apps
// 
// = FILENAME
//    Event_Channel.h
//
// = AUTHOR
//    Doug Schmidt 
// 
// ============================================================================

#if !defined (ACE_EVENT_CHANNEL)
#define ACE_EVENT_CHANNEL

#include "Proxy_Handler_Connector.h"
#include "Proxy_Handler_Acceptor.h"
#include "Consumer_Dispatch_Set.h"
#include "Event_Forwarding_Discriminator.h"

typedef ACE_Null_Mutex MAP_MUTEX;

class ACE_Svc_Export ACE_Event_Channel_Options
  // = TITLE
  //    Maintains the options for an <ACE_Event_Channel>.
{
public:
  ACE_Event_Channel_Options (void);
  // Initialization.

  ~ACE_Event_Channel_Options (void);
  // Termination.

  ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *locking_strategy_;
  // Points to the locking strategy used for serializing access to the
  // reference count in <ACE_Message_Block>.  If it's 0, then there's
  // no locking strategy and we're using a REACTIVE concurrency
  // strategy.

  int performance_window_;
  // Number of seconds after connection establishment to report
  // throughput.
  
  int blocking_semantics_;
  // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.

  int socket_queue_size_;
  // Size of the socket queue (0 means "use default").

  enum
  {
    REACTIVE = 0,
    OUTPUT_MT = 1,
    INPUT_MT = 2
  };

  u_long threading_strategy_;
  // i.e., REACTIVE, OUTPUT_MT, and/or INPUT_MT.

  u_short acceptor_port_;
  // Port used to accept connections from Peers.

  int connector_role_;
  // Enabled if we are playing the role of the Connector.

  int acceptor_role_;
  // Enabled if we are playing the role of the Connector.

  int verbose_;
  // Enabled if we want verbose diagnostic output.
};

class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<ACE_SYNCH>
  // = TITLE
  //    Define a generic Event_Channel.
  //
  // = DESCRIPTION
{
public:
  // = Initialization and termination methods.
  ACE_Event_Channel (void);
  ~ACE_Event_Channel (void);

  virtual int open (void * = 0);
  // Open the channel.

  virtual int close (u_long = 0);
  // Close down the Channel.

  // = Proxy management methods.
  int initiate_proxy_connection (Proxy_Handler *);
  // Initiate the connection of the <Proxy_Handler> to its peer.

  int complete_proxy_connection (Proxy_Handler *);
  // Complete the initialization of the <Proxy_Handler> once it's
  // connected to its Peer.

  int reinitiate_proxy_connection (Proxy_Handler *);
  // Reinitiate a connection asynchronously when the Peer fails.

  int bind_proxy (Proxy_Handler *);
  // Bind the <Proxy_Handler> to the <proxy_map_>.

  int find_proxy (ACE_INT32 proxy_id, Proxy_Handler *&);
  // Locate the <Proxy_Handler> with <proxy_id>.

  int subscribe (const Event_Key &event_addr, 
		 Consumer_Dispatch_Set *cds);
  // Subscribe the <Consumer_Dispatch_Set> to receive events that
  // match <Event_Key>.

  // = Event forwarding method.
  virtual int put (ACE_Message_Block *mb, ACE_Time_Value * = 0);
  // Pass <mb> to the Event Channel so it can forward it to Consumers.

  ACE_Event_Channel_Options &options (void);
  // Points to the Event_Channel options.

  void initiate_connector (void);
  // Actively initiate connections to the Peers.

  void initiate_acceptor (void);
  // Passively initiate Peer acceptor.

private:
  virtual int svc (void);
  // Run as an active object.

  int parse_args (int argc, char *argv[]);
  // Parse the command-line arguments.

  int compute_performance_statistics (void);
  // Perform timer-based performance profiling.

  virtual int handle_timeout (const ACE_Time_Value &, 
			      const void *arg);
  // Periodically callback to perform timer-based performance
  // profiling.

  Proxy_Handler_Connector connector_;
  // Used to establish the connections actively.

  Proxy_Handler_Acceptor acceptor_;
  // Used to establish the connections passively.

  // = Make life easier by defining typedefs.
  typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> PROXY_MAP;
  typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> PROXY_MAP_ITERATOR;
  typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> PROXY_MAP_ENTRY;

  PROXY_MAP proxy_map_;
  // Table that maps Connection IDs to Proxy_Handler *'s.

  Event_Forwarding_Discriminator efd_;
  // Map that associates an event to a set of Consumer_Proxy *'s.

  ACE_Event_Channel_Options options_;
  // The options for the channel.
};

#endif /* ACE_EVENT_CHANNEL */