summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h
blob: e63c40c571fc9cfd38f269a93d7c411fac58a3b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
/* -*- C++ -*- */
/**
 *  @file ECG_CDR_Message_Receiver.h
 *
 *  $Id$
 *
 *  @author Carlos O'Ryan (coryan@cs.wustl.edu)
 *  @author Marina Spivak (marina@atdesk.com)
 */

#ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H
#define TAO_ECG_CDR_MESSAGE_RECEIVER_H

#include /**/ "ace/pre.h"

#include "ECG_UDP_Out_Endpoint.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "tao/CDR.h"
#include "tao/Environment.h"

#include "ace/Hash_Map_Manager.h"
#include "ace/INET_Addr.h"
#include "ace/Null_Mutex.h"

/**
 * @class TAO_ECG_CDR_Processor
 *
 * @brief Interface for callback objects used by
 *        TAO_ECG_CDR_Message_Receiver to propagate received data to
 *        its callers.
 */
class TAO_ECG_CDR_Processor
{
public:
  virtual ~TAO_ECG_CDR_Processor (void);

  /// Extracts data from <cdr>.  Returns 0 on success, -1 on error.
  virtual int decode (TAO_InputCDR &cdr) = 0;
};

// ****************************************************************
/**
 * @class TAO_ECG_UDP_Request_Entry
 *
 * @brief Keeps information about an incomplete request.
 *
 * When a request arrives in fragments this object is used to
 * keep track of the incoming data.
 */
class TAO_ECG_UDP_Request_Entry
{
public:
  enum {
    ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
  };

  /// Initialize the fragment, allocating memory, etc.
  TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
                             CORBA::ULong request_id,
                             CORBA::ULong request_size,
                             CORBA::ULong fragment_count);

  ~TAO_ECG_UDP_Request_Entry (void);

  /// Validate a fragment, it should be rejected if it is invalid..
  int validate_fragment (CORBA::Boolean byte_order,
                         CORBA::ULong request_size,
                         CORBA::ULong fragment_size,
                         CORBA::ULong fragment_offset,
                         CORBA::ULong fragment_id,
                         CORBA::ULong fragment_count) const;

  /// Has @a fragment_id been received?
  int test_received (CORBA::ULong fragment_id) const;

  /// Mark @a fragment_id as received, reset timeout counter...
  void mark_received (CORBA::ULong fragment_id);

  /// Is the message complete?
  int complete (void) const;

  /// Return a buffer for the fragment at offset @a fragment_offset
  char* fragment_buffer (CORBA::ULong fragment_offset);

private:

  TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs);
  TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs);

private:
  /// This attributes should remain constant in all the fragments, used
  /// for validation....
  CORBA::Boolean byte_order_;
  CORBA::ULong request_id_;
  CORBA::ULong request_size_;
  CORBA::ULong fragment_count_;

  ACE_Message_Block payload_;

  /// This is a bit vector, used to keep track of the received buffers.
  CORBA::ULong* received_fragments_;
  int own_received_fragments_;
  CORBA::ULong received_fragments_size_;
  CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
};

// ****************************************************************

/**
 * @class TAO_ECG_CDR_Message_Receiver
 *
 * @brief Receives UDP and Multicast messages.
 *
 * @todo Update class documentation below.
 *
 *       5)  Make status array size and purge_count configurable.
 *
 * This class receives UDP and Multicast message fragments, assembles
 * them (described in detail below), and passes complete messages
 * in the form of cdr streams to the calling classes.
 *
 * This class is used by various Gateway classes (Senders/Receivers)
 * responsible for federating Event Channels with UDP/Mcast.
 *
 * = REASSEMBLY
 * Fragmentation is described in ECG_CDR_Message_Sender.h
 * Whenever an incomplete fragment is received (one with
 * fragment_count > 1) we allocate an entry for the message in an
 * map indexed by (host,port,request_id).  The entry contains the
 * buffer, a bit vector to keep track of the fragments received
 * so far, and a timeout counter.  This timeout counter is set to
 * 0 on each (new) fragment arrival, and incremented on a regular
 * basis.  If the counter reaches a maximum value the message is
 * dropped.
 * Once all the fragments have been received the message is sent
 * up to the calling classes, and the memory reclaimed.
 */
class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Receiver
{
public:
  /// Initialization and termination methods.
  //@{
  TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc);
  ~TAO_ECG_CDR_Message_Receiver (void);

  /**
   * @param ignore_from Endpoint used to remove events generated by
   *        the same process.
   */
  void init (TAO_ECG_Refcounted_Endpoint ignore_from
             /* , ACE_Lock *lock = 0 */);

  // Shutdown the component: close down the request map, etc.
  void shutdown (void);
  //@}

  /// Main method: read the data from @a dgram and either pass ready data
  /// to @a cdr_processor or update the <request_map_> if the request
  /// is not yet complete.
  /**
   * Returns 1 if data was read successfully and accepted by
   * <cdr_processor> without errors.
   * Returns 0 if there were no errors, but no data has been passed to
   * <cdr_processor>, either due to request being incomplete (not all
   * fragments received), or it being a duplicate.
   * Returns -1 if there were errors.
   */
  int handle_input (ACE_SOCK_Dgram& dgram,
                    TAO_ECG_CDR_Processor *cdr_processor);

  /// Represents any request that has been fully received and
  /// serviced, to simplify the internal logic.
  static TAO_ECG_UDP_Request_Entry Request_Completed_;

private:

  enum {
    ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024,
    ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
  };

  struct Mcast_Header;
  class Requests;

  typedef ACE_Hash_Map_Manager<ACE_INET_Addr,
                               Requests*,
                               ACE_Null_Mutex> Request_Map;

private:

  /// Returns 1 on success, 0 if <request_id> has already been
  /// received or is below current request range, and -1 on error.
  int mark_received (const ACE_INET_Addr &from,
                     CORBA::ULong request_id);

  /// Returns 1 if complete request is received and <event> is
  /// populated, 0 if request has only partially been received or is a
  /// duplicate, and -1 on error.
  int process_fragment (const ACE_INET_Addr &from,
                        const Mcast_Header &header,
                        char * data_buf,
                        TAO_ECG_CDR_Processor *cdr_processor);


  Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from);

private:

  /// Ignore any events coming from this IP address.
  TAO_ECG_Refcounted_Endpoint ignore_from_;

  /// The map containing all the incoming requests which have been
  /// partially received.
  Request_Map request_map_;

  /// Serializes use of <request_map_>.
  //  ACE_Lock* lock_;

  /// Size of a fragmented requests array, i.e., max number of
  /// partially received requests kept at any given time per source.
  size_t max_requests_;

  /// Minimum number of requests purged from a fragmented requests
  /// array when the range of requests represented there needs to be
  /// shifted.
  size_t min_purge_count_;

  /// Flag to indicate whether CRC should be computed and checked.
  CORBA::Boolean check_crc_;
};

// ****************************************************************

/// Helper for decoding, validating and storing mcast header.
struct TAO_ECG_CDR_Message_Receiver::Mcast_Header
{
  int byte_order;
  CORBA::ULong request_id;
  CORBA::ULong request_size;
  CORBA::ULong fragment_size;
  CORBA::ULong fragment_offset;
  CORBA::ULong fragment_id;
  CORBA::ULong fragment_count;
  CORBA::ULong crc;
  int read (char * header,
            size_t bytes_received,
            CORBA::Boolean checkcrc = 0);
};

// ****************************************************************

/// Once init() has been called:
/// Invariant: id_range_high_- id_range_low_ == size_ - 1
class TAO_ECG_CDR_Message_Receiver::Requests
{
public:

  Requests (void);
  ~Requests (void);

  /// Allocates and initializes <fragmented_requests_>.
  int init (size_t size, size_t min_purge_count);

  /// Returns pointer to a <fragmented_requests_> element
  /// representing <request_id>.
  /**
   * If <request_id> < <id_range_low> return 0.
   * If <request_id> > <id_range_high>, shift the range so it
   * includes <request_id>, purging incomplete requests as needed.
   */
  TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id);

private:

  /// Delete any outstanding requests with ids in the range
  /// [<purge_first>, <purge_last>] from <fragmented_requests> and
  /// and reset their slots.
  void purge_requests (CORBA::ULong purge_first,
                       CORBA::ULong purge_last);

  Requests & operator= (const Requests &rhs);
  Requests (const Requests &rhs);

private:
  /// Array, used in a circular fashion, that stores partially received
  /// requests (and info on which requests have been fully received
  /// and processed) for a range of request ids.
  TAO_ECG_UDP_Request_Entry** fragmented_requests_;

  /// Size of <fragmented_requests_> array.
  size_t size_;

  /// The range of request ids, currently represented in
  /// <fragmented_requests>.
  //@{
  CORBA::ULong id_range_low_;
  CORBA::ULong id_range_high_;
  //@}

  /// Minimum range shifting amount.
  size_t min_purge_count_;
};

#if defined(__ACE_INLINE__)
#include "ECG_CDR_Message_Receiver.i"
#endif /* __ACE_INLINE__ */

#include /**/ "ace/post.h"

#endif /* TAO_ECG_CDR_MESSAGE_RECEIVER_H */