summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
blob: 25f04944d387a2a6c8bb07adde39718f1b3c5c5f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/* -*- C++ -*- */
/**
 *  @file   EC_Gateway.h
 *
 *  $Id$
 *
 *  @author Carlos O'Ryan (coryan@cs.wustl.edu)
 *
 * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and
 * other members of the DOC group. More details can be found in:
 *
 * http://doc.ece.uci.edu/~coryan/EC/index.html
 */

#ifndef TAO_EC_GATEWAY_H
#define TAO_EC_GATEWAY_H
#include "ace/pre.h"

#include "orbsvcs/Event/event_export.h"
#include "orbsvcs/RtecEventChannelAdminS.h"
#include "orbsvcs/RtecEventCommS.h"
#include "orbsvcs/Channel_Clients.h"
#include "ace/Map_Manager.h"

/**
 * @class TAO_EC_Gateway
 *
 * @brief Event Channel Gateway
 *
 * There are several ways to connect several EC together, for
 * instance:
 * + A single class can use normal IIOP and connect to one EC as
 * a supplier and to another EC as a consumer.
 * + A class connects as a consumer and transmit the events using
 * multicast, another class receives the multicast messages and
 * transform them back into a push() call.
 * This is an abstract class to represent all the different
 * strategies for EC distribution.
 *
 */
class TAO_RTEvent_Export TAO_EC_Gateway : public POA_RtecEventChannelAdmin::Observer
{
public:
  /// Default constructor.
  TAO_EC_Gateway (void);

  /// Destructor
  virtual ~TAO_EC_Gateway (void);

  /// The gateway must disconnect from all the relevant event channels,
  /// or any other communication media (such as multicast groups).
  virtual void close (CORBA::Environment &env = TAO_default_environment ()) = 0;

  /// Obtain and modify the observer handle.
  void observer_handle (RtecEventChannelAdmin::Observer_Handle h);
  RtecEventChannelAdmin::Observer_Handle observer_handle (void) const;

private:
  RtecEventChannelAdmin::Observer_Handle handle_;
};

// ****************************************************************
/**
 * @class TAO_EC_Gateway_IIOP
 *
 * @brief Event Channel Gateway using IIOP.
 *
 * This class mediates among two event channels, it connects as a
 * consumer of events with a remote event channel, and as a supplier
 * of events with the local EC.
 * As a consumer it gives a QoS designed to only accept the events
 * in which *local* consumers are interested.
 * Eventually the local EC should create this object and compute its
 * QoS in an automated manner; but this requires some way to filter
 * out the peers registered as consumers, otherwise we will get
 * loops in the QoS graph.
 * It uses exactly the same set of events in the publications list
 * when connected as a supplier.
 *
 * @note
 * An alternative implementation would be to register with the
 * remote EC as a supplier, and then filter on the remote EC, but
 * one of the objectives is to minimize network traffic.
 * On the other hand the events will be pushed to remote consumers,
 * event though they will be dropped upon receipt (due to the TTL
 * field); IMHO this is another suggestion that the EC needs to know
 * (somehow) which consumers are truly its peers in disguise.
 *
 * @todo: The class makes an extra copy of the events, we need to
 * investigate if closer collaboration with its collocated EC could
 * be used to remove that copy.
 */
class TAO_RTEvent_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway
{
public:
  TAO_EC_Gateway_IIOP (void);
  ~TAO_EC_Gateway_IIOP (void);

  /// To do its job this class requires to know the local and remote
  /// ECs it will connect to,
  void init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
             RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
             CORBA::Environment &ACE_TRY_ENV);

  /// The channel is disconnecting.
  void disconnect_push_supplier (CORBA::Environment & = TAO_default_environment ());

  /// The channel is disconnecting.
  void disconnect_push_consumer (CORBA::Environment & = TAO_default_environment ());

  /// This is the Consumer side behavior, it pushes the events to the
  /// local event channel.
  void push (const RtecEventComm::EventSet &events,
             CORBA::Environment & = TAO_default_environment ());

  /// Disconnect and shutdown the gateway
  int shutdown (CORBA::Environment & = TAO_default_environment ());

  // The following methods are documented in the base class.
  virtual void close (CORBA::Environment &env = TAO_default_environment ());
  virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub,
                                CORBA::Environment &env = TAO_default_environment ())
      ACE_THROW_SPEC ((CORBA::SystemException));
  virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub,
                                CORBA::Environment &env = TAO_default_environment ())
      ACE_THROW_SPEC ((CORBA::SystemException));

private:
  void close_i (CORBA::Environment &);

  void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub,
                          CORBA::Environment& env);

protected:
  /// Do the real work in init()
  void init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
               RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
               CORBA::Environment &ACE_TRY_ENV);

protected:
  /// Lock to synchronize internal changes
  TAO_SYNCH_MUTEX lock_;

  /// How many threads are running push() we cannot make changes until
  /// that reaches 0
  CORBA::ULong busy_count_;

  /**
   * An update_consumer() message arrived *while* we were doing a
   * push() the modification is stored <pub_>, if multiple
   * update_consumer messages arrive only the last one is executed.
   */
  int update_posted_;
  RtecEventChannelAdmin::ConsumerQOS c_qos_;

  /// The remote and the local EC, so we can reconnect when the list changes.
  RtecEventChannelAdmin::EventChannel_var rmt_ec_;
  RtecEventChannelAdmin::EventChannel_var lcl_ec_;

  /// Our local and remote RT_Infos.
  RtecBase::handle_t rmt_info_;
  RtecBase::handle_t lcl_info_;

  /// Our consumer personality....
  /// If it is not 0 then we must deactivate the supplier
  ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_;
  int consumer_is_active_;

  /// Our supplier personality....
  /// If it is not 0 then we must deactivate the supplier
  ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_;
  int supplier_is_active_;

  // We use a different Consumer_Proxy
  typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map;
  typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator;

  /// We talk to the EC (as a supplier) using either an per-supplier
  /// proxy or a generic proxy for the type only subscriptions.
  Consumer_Map consumer_proxy_map_;
  RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_;

  /// We talk to the EC (as a consumer) using this proxy.
  RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
};

#include "ace/post.h"
#endif /* ACE_EC_GATEWAY_H */