summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
blob: e5bfe24a4e82f5a435ff419bb348284b9ac90565 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/* -*- C++ -*- */
// $Id$
//
// ============================================================================
//
// = LIBRARY
//   TAO services
//
// = FILENAME
//   EC_Gateway_UDP
//
// = AUTHOR
//   Carlos O'Ryan
//
// = DESCRIPTION
//   Define helper classes to propagate events between ECs using
//   either UDP or multicast.
//   The architecture is a bit complicated and deserves some
//   explanation: sending the events over UDP (or mcast) is easy, a
//   Consumer (TAO_ECG_UDP_Sender) subscribes for a certain set of
//   events, its push() method marshalls the event set into a CDR
//   stream that is sent using an ACE_SOCK_CODgram. The subscription
//   set and IP address can be configured.
//   Another helper class (TAO_ECG_UDP_Receiver) acts as a supplier of
//   events; it receives a callback when an event is available on an
//   ACE_SOCK_Dgram, it demarshalls the event and pushes it to the
//   EC.  Two ACE_Event_Handler classes are provided that can forward
//   the events to this Supplier: TAO_ECG_Mcast_EH can receive events
//   from a multicast group; TAO_ECG_UDP_EH can receive events from a
//   regular UDP socket.
//
//   Matching of the events types carried by a multicast group (or IP
//   address) is up to the application. Gateway classes can be
//   implemented to automate this: the EC informs its gateways about
//   local changes in the subscriptions (for example), the Gateway
//   could then consult an administrative server that will inform it
//   which multicast groups carry those event types, it can then
//   create the proper event handlers and TAO_ECG_Receivers. An
//   analogous class can be implemented for the Supplier side.
//
//   An alternative approach would be to look the current set of
//   multicast groups and the events carried on each, using that
//   information a suitable TAO_ECG_UDP_Receiver can be configured
//   (and of course the Senders on the client side).
//
// = TODO
//   The class makes an extra copy of the events, we need to
//   investigate if closer collaboration with its collocated EC could
//   be used to remove that copy.
//
// ============================================================================

#ifndef TAO_EC_GATEWAY_UDP_H
#define TAO_EC_GATEWAY_UDP_H

#include "ace/SOCK_CODgram.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "orbsvcs/RtecEventChannelAdminS.h"
#include "orbsvcs/RtecUDPAdminS.h"
#include "orbsvcs/orbsvcs_export.h"

class TAO_ORBSVCS_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushConsumer
{
  //
  // = TITLE
  //   Send events received from a "local" EC using UDP.
  //
  // = DESCRIPTION
  //   This class connect as a consumer to an EventChannel
  //   and it sends the events using UDP, the UDP address can be a
  //   normal IP address or it can be a multicast group.
  //   The UDP address is obtained from a RtecUDPAdmin::AddrServer
  //   class.
  //   It marshalls the events using TAO CDR classes.
  //   No provisions are taken for message fragmentation.
  //
public:
  TAO_ECG_UDP_Sender (void);

  int get_local_addr (ACE_INET_Addr& addr);
  // Get the local endpoint used to send the events.

  void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
             RtecScheduler::Scheduler_ptr lcl_sched,
             const char* lcl_name,
	     RtecUDPAdmin::AddrServer_ptr addr_server,
	     ACE_SOCK_Dgram* dgram,
             CORBA::Environment &_env);
  // To do its job this class requires to know the local EC it will
  // connect to; it also requires to build an RT_Info for the local
  // scheduler.
  // It only keeps a copy of its SupplierProxy, used for later
  // connection and disconnections.
  // @@ TODO part of the RT_Info is hardcoded, we need to make it
  // parametric.

  void shutdown (CORBA::Environment&);
  // Disconnect and shutdown the sender, no further connections will
  // work unless init() is called again.

  void open (RtecEventChannelAdmin::ConsumerQOS& sub,
             CORBA::Environment& env);
  // Connect (or reconnect) to the EC with the given subscriptions.

  void close (CORBA::Environment& _env);
  // Disconnect from the EC, but reconnection is still possible.

  virtual void disconnect_push_consumer (CORBA::Environment &);
  virtual void push (const RtecEventComm::EventSet &events,
                     CORBA::Environment &);
  // The PushConsumer methods.

private:
  RtecEventChannelAdmin::EventChannel_var lcl_ec_;
  // The remote and the local EC, so we can reconnect when the
  // subscription list changes.

  RtecScheduler::handle_t lcl_info_;
  // Our local and remote RT_Infos.

  RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
  // We talk to the EC (as a consumer) using this proxy.

  RtecUDPAdmin::AddrServer_var addr_server_;
  // We query this object to determine where are the events sent.

  ACE_SOCK_Dgram *dgram_;
  // The datagram used to sendto(), this object is *not* owned by the
  // UDP_Sender.
};

class TAO_ORBSVCS_Export TAO_ECG_UDP_Receiver : public POA_RtecEventComm::PushSupplier
{
  //
  // = TITLE
  //   Decodes events from an ACE_SOCK_Dgram and pushes them to the
  //   Event_Channel.
  //
  // = DESCRIPTION
  //   This supplier receives events from an ACE_SOCK_Dgram, either
  //   from a UDP socket or a Mcast group, decodes them and push them
  //   to the EC.
  //   No provisions are taken for message reassembly.
  //
public:
  TAO_ECG_UDP_Receiver (void);

  void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
             RtecScheduler::Scheduler_ptr lcl_sched,
             const char* lcl_name,
             const ACE_INET_Addr& ignore_from,
	     RtecUDPAdmin::AddrServer_ptr addr_server,
             CORBA::Environment &_env);
  // To do its job this class requires to know the local EC it will
  // connect to; it also requires to build an RT_Info for the local
  // scheduler.
  // @@ TODO part of the RT_Info is hardcoded, we need to make it
  // parametric.

  void shutdown (CORBA::Environment&);
  // Disconnect and shutdown the gateway, no further connectsions

  void open (RtecEventChannelAdmin::SupplierQOS& pub,
             CORBA::Environment &env);
  // Connect to the EC using the given publications lists.

  virtual void close (CORBA::Environment& env);
  // Disconnect to the EC.

  int handle_input (ACE_SOCK_Dgram& dgram);
  // The Event_Handlers call this method when data is available at the
  // socket, the <dgram> must be ready for reading and contain a full
  // event.

  // The PushSupplier method.
  virtual void disconnect_push_supplier (CORBA::Environment &);

  
  void get_addr (const RtecEventComm::EventHeader& header,
		 RtecUDPAdmin::UDP_Addr_out addr,
		 CORBA::Environment& env);
  // Call the RtecUDPAdmin::AddrServer

private:
  RtecEventChannelAdmin::EventChannel_var lcl_ec_;
  // The remote and the local EC, so we can reconnect when the list changes.

  RtecScheduler::handle_t lcl_info_;
  // Our RT_Info.

  RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_;
  // We talk to the EC (as a consumer) using this proxy.

  ACE_INET_Addr ignore_from_;
  // Ignore any events coming from this IP addres.

  RtecUDPAdmin::AddrServer_var addr_server_;
};

class TAO_ORBSVCS_Export TAO_ECG_UDP_EH : public ACE_Event_Handler
{
  //
  // = TITLE
  //   Event Handler for UDP messages.
  //
  // = DESCRIPTION
  //   This object receives callbacks from the Reactor when data is
  //   available on a UDP socket, it forwards to the ECG_UDP_Receiver
  //   which reads the events and transform it into an event.
public:
  TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv);

  int open (const ACE_INET_Addr& ipaddr);
  // Open the datagram and register with this->reactor()

  int close (void);
  // Close the datagram and unregister with the reactor.

  // Reactor callbacks
  virtual int handle_input (ACE_HANDLE fd);
  virtual ACE_HANDLE get_handle (void) const;

private:
  ACE_SOCK_Dgram dgram_;
  // The datagram used to receive the data.

  TAO_ECG_UDP_Receiver* receiver_;
  // We callback to this object when a message arrives.
};

class TAO_ORBSVCS_Export TAO_ECG_Mcast_EH : public ACE_Event_Handler
{
  //
  // = TITLE
  //   Event Handler for UDP messages.
  //
  // = DESCRIPTION
  //   This object receives callbacks from the Reactor when data is
  //   available on the mcast socket, it forwards to the UDP_Receive
  //   gateway which reads the events and transform it into an event.
  //
public:
  TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv);

  int open (RtecEventChannelAdmin::EventChannel_ptr ec,
	    CORBA::Environment& _env);
  // Register for changes in the EC subscription list.
  // When the subscription list becomes non-empty we join the proper
  // multicast groups (using the receiver to translate between event
  // types and mcast groups) and the class registers itself with the
  // reactor.

  int close (CORBA::Environment& _env);
  // Remove ourselves from the event channel, unsubscribe from the
  // multicast groups, close the sockets and unsubscribe from the
  // reactor.

  virtual int handle_input (ACE_HANDLE fd);
  virtual ACE_HANDLE get_handle (void) const;
  // Reactor callbacks

  void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub,
			CORBA::Environment& _env);
  void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub,
			CORBA::Environment& _env);
  // The Observer methods

  class Observer : public POA_RtecEventChannelAdmin::Observer
  {
    // = TITLE
    //   Observe changes in the EC subscriptions.
    //
    // = DESCRIPTION
    //   As the subscriptions on the EC change we also change the
    //   mcast groups that we join.
    //   We could use the TIE classes, but they don't work in all
    //   compilers.
  public:
    Observer (TAO_ECG_Mcast_EH* eh);
    // We report changes in the EC subscriptions to the event
    // handler.

    // The Observer methods
    virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub,
				  CORBA::Environment& _env);
    virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub,
				  CORBA::Environment& _env);

  private:
    TAO_ECG_Mcast_EH* eh_;
    // Our callback object.
  };

private:
  int subscribe (const ACE_INET_Addr &mcast_addr);
  int unsubscribe (const ACE_INET_Addr &mcast_addr);
  // Control the multicast group subscriptions

private:
  ACE_SOCK_Dgram_Mcast dgram_;
  // The datagram used to receive the data.

  TAO_ECG_UDP_Receiver* receiver_;
  // We callback to this object when a message arrives.

  Observer observer_;
  // This object will call us back when the subscription list
  // changes.

  RtecEventChannelAdmin::Observer_Handle handle_;
  // Keep the handle of the observer so we can unregister later.

  RtecEventChannelAdmin::EventChannel_var ec_;
  // The Event Channel.
};


#endif /* ACE_EVENT_CHANNEL_UDP_H */