summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h
blob: f62de40073a854b773f600196dcc784b61e1c398 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/* -*- C++ -*- */
/**
 * @file ECG_Mcast_EH.h
 *
 * $Id$
 *
 * @author Carlos O'Ryan <coryan@uci.edu>
 * @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu>
 * @author Marina Spivak <marina@atdesk.com>
 * @author Don Hinton <dhinton@ieee.org>
 *
 * http://doc.ece.uci.edu/~coryan/EC/index.html
 *
 */
#ifndef TAO_ECG_MCAST_EH_H
#define TAO_ECG_MCAST_EH_H

#include /**/ "ace/pre.h"
#include "ace/Event_Handler.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/Unbounded_Set.h"
#include "ace/Array_Base.h"
#include "ace/SOCK_Dgram_Mcast.h"

#include "orbsvcs/orbsvcs/RtecEventChannelAdminS.h"

#include /**/ "event_export.h"
#include "ECG_Adapters.h"
#include "EC_Lifetime_Utils.h"
#include "EC_Lifetime_Utils_T.h"

/**
 * @class TAO_ECG_Mcast_EH
 *
 * @brief Event Handler for Mcast messages.
 *        NOT THREAD-SAFE.
 *
 * This object acts as an Observer to Event Channel.  It subscribes to
 * multicast groups that carry events matching the EC's subscriptions.
 * This object then receives callbacks from the Reactor when data is
 * available on the mcast sockets and alerts TAO_ECG_Dgram_Handler,
 * which reads the data, transforms it into event and pushes to the
 * Event Channel.
 */
class TAO_RTEvent_Export TAO_ECG_Mcast_EH :
  public ACE_Event_Handler,
  public TAO_ECG_Handler_Shutdown
{
public:

  /// Initialization and termination methods.
  //@{
  /**
   * Constructor.  Messages received by this EH will be forwarded to
   * the \a recv.  \a net_if can be used to specify NIC where multicast
   * messages are expected. \buf_sz would be used to alter the default
   * buffer size.
   *
   * See comments for receiver_ data member on why raw pointer is
   * used for the \a recv argument.
   */
  TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
                    const ACE_TCHAR *net_if = 0,
		    CORBA::ULong buf_sz = 0);

  /// Destructor.
  virtual ~TAO_ECG_Mcast_EH (void);

  /**
   * Register for changes in the EC subscription list.
   * When the subscription list becomes non-empty we join the proper
   * multicast groups (using Dgram_Handler to translate between event
   * types and mcast groups) and the class registers itself with the
   * reactor.
   *
   * To insure proper resource clean up, if open () is successful,
   * the user MUST call shutdown () when handler is no longer needed
   * (and its reactor still exists).
   */
  void open (RtecEventChannelAdmin::EventChannel_ptr ec
             ACE_ENV_ARG_DECL_WITH_DEFAULTS);

  /// TAO_ECG_Handler_Shutdown method.
  /**
   * Remove ourselves from the event channel, unsubscribe from the
   * multicast groups, close the sockets and deregister from the
   * reactor.
   */
  virtual int shutdown (void);
  //@}

  /// Reactor callback.  Notify receiver_ that a dgram corresponding
  /// to \a fd is ready for reading.
  virtual int handle_input (ACE_HANDLE fd);

private:

  /**
   * @class Observer
   *
   * @brief Observes changes in the EC consumer subscriptions and notifies
   *        TAO_ECG_Mcast_EH  when there are changes.
   */
  class Observer :
    public virtual POA_RtecEventChannelAdmin::Observer,
    public virtual PortableServer::RefCountServantBase,
    public TAO_EC_Deactivated_Object
  {
  public:
    /// Constructor.  Changes in the EC subscriptions will be reported
    /// to the \a eh.
    Observer (TAO_ECG_Mcast_EH* eh);

    /// Shut down the observer: disconnect from EC and deactivate from
    /// POA.
    void shutdown (void);

    /// Event Channel Observer methods
    //@{
    virtual void update_consumer (
        const RtecEventChannelAdmin::ConsumerQOS& sub
        ACE_ENV_ARG_DECL_WITH_DEFAULTS)
      ACE_THROW_SPEC ((CORBA::SystemException));
    virtual void update_supplier (
        const RtecEventChannelAdmin::SupplierQOS& pub
        ACE_ENV_ARG_DECL_WITH_DEFAULTS)
      ACE_THROW_SPEC ((CORBA::SystemException));

  private:
    /// Handler we notify of subscriptions changes.
    /*
     * Observer can keep a raw pointer to mcast handler, because the handler
     * guarantees to notify the observer (by calling shutdown ())
     * before going away.
     */
    TAO_ECG_Mcast_EH *eh_;
  };

  /// Make update_consumer () accessible to Observer.
  friend class Observer;

  /// The Observer method.  Subscribe/unsubscribe to multicast groups
  /// according to changes in consumer subscriptions.
  void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub
                        ACE_ENV_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException));


  typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set;

  /// Helpers for updating multicast subscriptions based on changes in
  /// consumer subscriptions.
  //@{
  /// Compute the list of multicast addresses that we need to be
  /// subscribed to, in order to receive the events described in the
  /// ConsumerQOS parameter.
  /**
   * @param sub The list of event types that our event channel
   *        consumers are interested in.
   * @param multicast_addresses This method populates this list with
   *        multicast addresses that we need to be subscribed to in
   *        order to receive event types specified in /a sub.
   *
   * @throw CORBA::SystemException This method needs to perform
   *        several CORBA invocations, and it propagates any exceptions
   *        back to the caller.
   */
  void compute_required_subscriptions (
        const RtecEventChannelAdmin::ConsumerQOS& sub,
              Address_Set& multicast_addresses
              ACE_ENV_ARG_DECL)
              ACE_THROW_SPEC ((CORBA::SystemException));

  /// Unsubscribe from any multicast addresses we are currently
  /// subscribed to that are not in the \a multicast_addresses list.
  /// Also remove from /a multicast_addresses any addresses to which we are
  /// already subscribed.
  /**
   * @param multicast_addresses List of multicast
   *        addresses we need to be subscribed to in order receive all
   *        event types in the current consumer subscriptions.
   */
  int delete_unwanted_subscriptions (
              Address_Set& multicast_addresses);

  /// Subscribe to all multicast addresses in /a multicast_addresses -
  /// we are not subscribed to them yet, but need to be.
  /**
   * @param multicast_addresses List of multicast addresses to which
   *        we need to subscribe to in order to be receiving all event
   *        types in the current consumer subscriptions.
   */
  void add_new_subscriptions (
              Address_Set& multicast_addresses);
  //@}

  /**
   * @class Observer_Disconnect_Command
   *
   * @brief Disconnects Observer from the Event Channel
   *
   * Utility class for use as a template argument to TAO_EC_Auto_Command.
   * TAO_EC_Auto_Command<Observer_Disconnect_Command> manages
   * observer connection to the Event Channel, automatically
   * disconnecting from ec in its destructor, if necessary.
   */
  class TAO_RTEvent_Export Observer_Disconnect_Command
  {
  public:
    Observer_Disconnect_Command (void);
    Observer_Disconnect_Command (RtecEventChannelAdmin::Observer_Handle handle,
                                 RtecEventChannelAdmin::EventChannel_ptr ec);

    Observer_Disconnect_Command (const Observer_Disconnect_Command &rhs);
    Observer_Disconnect_Command & operator= (const Observer_Disconnect_Command & rhs);

    void execute (ACE_ENV_SINGLE_ARG_DECL);

  private:

    RtecEventChannelAdmin::Observer_Handle handle_;
    RtecEventChannelAdmin::EventChannel_var ec_;
  };

private:
  /// The NIC used to subscribe for multicast traffic.
  ACE_TCHAR *net_if_;

  typedef struct {
    ACE_INET_Addr mcast_addr;
    ACE_SOCK_Dgram_Mcast* dgram;
  } Subscription;
  typedef ACE_Array_Base<Subscription> Subscriptions;

  /// List of multicast addresses we subscribe to and dgrams we use.
  /*
   * We use a dedicated socket for each multicast subscription.  The
   * reason: we assume the underlying software, i.e., ACE, binds each
   * socket used to receive multicast to the multicast group (mcast addr
   * + port) to avoid receiving promiscuous traffic, in which case it is
   * not possible to subscribe to more than one mcast address on the same
   * socket.
   *
   * Performance.  We use array to store subscriptions (mcast addr / dgram
   * pairs).  If performance is not adequate, we should look into
   * using a hash map, keyed on file descriptors, instead.  When there
   * are many subscriptions, handle_input() is likely to be more
   * efficient with a hash lookup than an array iteration for locating a
   * target dgram.  Difference in costs of subscripton changes between
   * hash map and array would need to be looked at as well, although
   * it is probably highly dependent on the pattern of changes.
   */
  Subscriptions subscriptions_;

  /// We callback to this object when a message arrives.
  /*
   * We can keep a raw pointer to the receiver (even though it may
   * be a refcounted object) because receiver guarantees
   * to notify us (by calling shutdown ()) before going away.
   *
   * We have to use raw pointer instead of a refcounting mechanism
   * here to avoid a circular refcounting dependency between
   * receiver and handler.
   */
  TAO_ECG_Dgram_Handler * receiver_;

  /// SOCKbuf size
  CORBA::ULong recvbuf_size_;

  /// Event Channel Observer.  Detects changes in EC consumer subscriptions.
  /// ORDER DEPENDENCY: this member should be declared before
  /// <auto_observer_disconnect_>.
  TAO_EC_Servant_Var<Observer> observer_;

  /// Manages connection of our observer to the Event Channel.
  /// ORDER DEPENDENCY: this member should be declared AFTER <observer_>.
  TAO_EC_Auto_Command<Observer_Disconnect_Command> auto_observer_disconnect_;
};

#if defined(__ACE_INLINE__)
#include "ECG_Mcast_EH.i"
#endif /* __ACE_INLINE__ */

#include /**/ "ace/post.h"

#endif /* TAO_ECG_Mcast_EH_H */