summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_EH.h
blob: 3bb1a70847617d85bee82476d4e86157e740da12 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
/* -*- C++ -*- */
/**
 * @file ECG_Mcast_EH.h
 *
 * $Id$
 *
 * @author Carlos O'Ryan <coryan@uci.edu>
 * @author Jaiganesh Balasubramanian <jai@doc.ece.uci.edu>
 *
 * http://doc.ece.uci.edu/~coryan/EC/index.html
 *
 */
#ifndef TAO_ECG_MCAST_EH_H
#define TAO_ECG_MCAST_EH_H
#include "ace/pre.h"

#include "orbsvcs/Event/event_export.h"
#include "orbsvcs/RtecEventChannelAdminS.h"
#include "ace/Event_Handler.h"
#include "ace/Hash_Map_Manager.h"
#include "ace/Array_Base.h"
#include "ace/SOCK_Dgram_Mcast.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

class TAO_ECG_UDP_Receiver;

/**
 * @class TAO_ECG_Mcast_EH
 *
 * @brief Event Handler for UDP messages.
 *
 * This object receives callbacks from the Reactor when data is
 * available on the mcast socket, it forwards to the UDP_Receive
 * gateway which reads the events and transform it into an event.
 */
class TAO_RTEvent_Export TAO_ECG_Mcast_EH : public ACE_Event_Handler
{
public:
  /**
   * Constructor, the messages received by this EH are forwarded to
   * the <recv>.
   * It is possible to select the NIC where the multicast messages are
   * expected using <net_if>
   */
  TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv,
                    const ACE_TCHAR *net_if = 0);

  /// Destructor
  virtual ~TAO_ECG_Mcast_EH (void);

  /**
   * Register for changes in the EC subscription list.
   * When the subscription list becomes non-empty we join the proper
   * multicast groups (using the receiver to translate between event
   * types and mcast groups) and the class registers itself with the
   * reactor.
   */
  int open (RtecEventChannelAdmin::EventChannel_ptr ec,
            CORBA::Environment &env = TAO_default_environment ());

  /**
   * Remove ourselves from the event channel, unsubscribe from the
   * multicast groups, close the sockets and unsubscribe from the
   * reactor.
   */
  int close (CORBA::Environment &env = TAO_default_environment ());

  /// Reactor callbacks
  virtual int handle_input (ACE_HANDLE fd);

  /// The Observer methods
  void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub,
                        CORBA::Environment &env = TAO_default_environment ())
      ACE_THROW_SPEC ((CORBA::SystemException));
  void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub,
                        CORBA::Environment &env = TAO_default_environment ())
      ACE_THROW_SPEC ((CORBA::SystemException));

  /**
   * @class Observer
   *
   * @brief Observe changes in the EC subscriptions.
   *
   * As the subscriptions on the EC change we also change the
   * mcast groups that we join.
   * We could use the TIE classes, but they don't work in all
   * compilers.
   */

  class Observer : public POA_RtecEventChannelAdmin::Observer
  {
  public:
    /// We report changes in the EC subscriptions to the event
    /// handler.
    Observer (TAO_ECG_Mcast_EH* eh);

    // The Observer methods
    virtual void update_consumer (
        const RtecEventChannelAdmin::ConsumerQOS& sub,
        CORBA::Environment &env =
            TAO_default_environment ())
      ACE_THROW_SPEC ((CORBA::SystemException));
    virtual void update_supplier (
        const RtecEventChannelAdmin::SupplierQOS& pub,
        CORBA::Environment &env =
            TAO_default_environment ())
      ACE_THROW_SPEC ((CORBA::SystemException));

  private:
    /// Our callback object.
    TAO_ECG_Mcast_EH *eh_;
  };

private:
  typedef ACE_Unbounded_Set<ACE_INET_Addr> Address_Set;

  /// Compute the list of multicast addresses that we need to be
  /// subscribed to, in order to receive the events described in the
  /// ConsumerQOS parameter.
  /**
   * @param sub The list of event types that our event channel
   *        consumers are interested into.
   * @param multicast_addresses Returns the list of multicast
   *        addresses that we need to subscribe to.
   * @param env Used to raise CORBA exceptions when there is no
   *        support for native C++ exceptions.
   *
   * @throw CORBA::SystemException This method needs to perform
   *        several CORBA invocations and propagates any exceptions
   *        back to the caller.
   */

  void compute_required_subscriptions (
        const RtecEventChannelAdmin::ConsumerQOS& sub,
              Address_Set& multicast_addresses,
              CORBA::Environment &ACE_TRY_ENV)
              ACE_THROW_SPEC ((CORBA::SystemException));

  /// Delete the list of multicast addresses that we need not
  /// subscribe to, in order to receive the events described in the
  /// ConsumerQOS parameter.
  /**
   * @param multicast_addresses Returns the list of multicast
   *        addresses that we need to subscribe to.
   */

  int delete_unwanted_subscriptions (
              Address_Set& multicast_addresses);

  /// Add the list of new multicast addresses that we need to
  /// subscribe to, in order to receive the events described in the
  /// ConsumerQOS parameter.
  /**
   * @param multicast_addresses Returns the list of multicast
   *        addresses that we need to subscribe to.
   */

  void add_new_subscriptions (
              Address_Set& multicast_addresses);

  /// Subscribe an existing socket to a multicast group.
  /**
   * @param multicast_group Returns the multicast
   *        address that we need to subscribe to.
   */
  int subscribe_to_existing_socket (ACE_INET_Addr& multicast_group);

  /// Subscribe a new socket to a multicast group.
  /**
   * @param multicast_group Returns the multicast
   *        address that we need to subscribe to.
   */
  void subscribe_to_new_socket (ACE_INET_Addr& multicast_group);

private:
  /// The NIC name used to subscribe for multicast traffic.
  ACE_TCHAR *net_if_;

  /// Define the collection used to keep the iterator
  typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> Subscriptions_Map;
  typedef ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex> Subscriptions_Iterator;

  /// @@ Please describe this map and its role in the class!
  Subscriptions_Map subscriptions_;

  /// The datagram used to receive the data.
  typedef ACE_Array_Base<ACE_SOCK_Dgram_Mcast*> Socket_List;
  Socket_List sockets_;

  /// We callback to this object when a message arrives.
  TAO_ECG_UDP_Receiver* receiver_;

  /// This object will call us back when the subscription list
  /// changes.
  Observer observer_;

  /// Keep the handle of the observer so we can unregister later.
  RtecEventChannelAdmin::Observer_Handle handle_;

  /// The Event Channel.
  RtecEventChannelAdmin::EventChannel_var ec_;
};

#if defined(__ACE_INLINE__)
#include "ECG_Mcast_EH.i"
#endif /* __ACE_INLINE__ */

#include "ace/post.h"
#endif /* TAO_ECG_Mcast_EH_H */