summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h318
1 files changed, 318 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h
new file mode 100644
index 00000000000..500f4b2eb4c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h
@@ -0,0 +1,318 @@
+// -*- C++ -*-
+/**
+ * @file ECG_CDR_Message_Receiver.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan (coryan@cs.wustl.edu)
+ * @author Marina Spivak (marina@atdesk.com)
+ */
+
+#ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H
+#define TAO_ECG_CDR_MESSAGE_RECEIVER_H
+
+#include /**/ "ace/pre.h"
+
+#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "tao/CDR.h"
+#include "tao/Environment.h"
+
+#include "ace/Hash_Map_Manager.h"
+#include "ace/INET_Addr.h"
+#include "ace/Null_Mutex.h"
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+/**
+ * @class TAO_ECG_CDR_Processor
+ *
+ * @brief Interface for callback objects used by
+ * TAO_ECG_CDR_Message_Receiver to propagate received data to
+ * its callers.
+ */
+class TAO_ECG_CDR_Processor
+{
+public:
+ virtual ~TAO_ECG_CDR_Processor (void);
+
+ /// Extracts data from <cdr>. Returns 0 on success, -1 on error.
+ virtual int decode (TAO_InputCDR &cdr) = 0;
+};
+
+// ****************************************************************
+/**
+ * @class TAO_ECG_UDP_Request_Entry
+ *
+ * @brief Keeps information about an incomplete request.
+ *
+ * When a request arrives in fragments this object is used to
+ * keep track of the incoming data.
+ */
+class TAO_ECG_UDP_Request_Entry
+{
+public:
+ enum {
+ ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
+ };
+
+ /// Initialize the fragment, allocating memory, etc.
+ TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
+ CORBA::ULong request_id,
+ CORBA::ULong request_size,
+ CORBA::ULong fragment_count);
+
+ ~TAO_ECG_UDP_Request_Entry (void);
+
+ /// Validate a fragment, it should be rejected if it is invalid..
+ int validate_fragment (CORBA::Boolean byte_order,
+ CORBA::ULong request_size,
+ CORBA::ULong fragment_size,
+ CORBA::ULong fragment_offset,
+ CORBA::ULong fragment_id,
+ CORBA::ULong fragment_count) const;
+
+ /// Has @a fragment_id been received?
+ int test_received (CORBA::ULong fragment_id) const;
+
+ /// Mark @a fragment_id as received, reset timeout counter...
+ void mark_received (CORBA::ULong fragment_id);
+
+ /// Is the message complete?
+ int complete (void) const;
+
+ /// Return a buffer for the fragment at offset @a fragment_offset
+ char* fragment_buffer (CORBA::ULong fragment_offset);
+
+private:
+
+ TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs);
+ TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs);
+
+private:
+ /// This attributes should remain constant in all the fragments, used
+ /// for validation....
+ CORBA::Boolean byte_order_;
+ CORBA::ULong request_id_;
+ CORBA::ULong request_size_;
+ CORBA::ULong fragment_count_;
+
+ ACE_Message_Block payload_;
+
+ /// This is a bit vector, used to keep track of the received buffers.
+ CORBA::ULong* received_fragments_;
+ int own_received_fragments_;
+ CORBA::ULong received_fragments_size_;
+ CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
+};
+
+// ****************************************************************
+
+/**
+ * @class TAO_ECG_CDR_Message_Receiver
+ *
+ * @brief Receives UDP and Multicast messages.
+ *
+ * @todo Update class documentation below.
+ *
+ * 5) Make status array size and purge_count configurable.
+ *
+ * This class receives UDP and Multicast message fragments, assembles
+ * them (described in detail below), and passes complete messages
+ * in the form of cdr streams to the calling classes.
+ *
+ * This class is used by various Gateway classes (Senders/Receivers)
+ * responsible for federating Event Channels with UDP/Mcast.
+ *
+ * = REASSEMBLY
+ * Fragmentation is described in ECG_CDR_Message_Sender.h
+ * Whenever an incomplete fragment is received (one with
+ * fragment_count > 1) we allocate an entry for the message in an
+ * map indexed by (host,port,request_id). The entry contains the
+ * buffer, a bit vector to keep track of the fragments received
+ * so far, and a timeout counter. This timeout counter is set to
+ * 0 on each (new) fragment arrival, and incremented on a regular
+ * basis. If the counter reaches a maximum value the message is
+ * dropped.
+ * Once all the fragments have been received the message is sent
+ * up to the calling classes, and the memory reclaimed.
+ */
+class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Receiver
+{
+public:
+ /// Initialization and termination methods.
+ //@{
+ TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc);
+ ~TAO_ECG_CDR_Message_Receiver (void);
+
+ /**
+ * @param ignore_from Endpoint used to remove events generated by
+ * the same process.
+ */
+ void init (TAO_ECG_Refcounted_Endpoint ignore_from
+ /* , ACE_Lock *lock = 0 */);
+
+ // Shutdown the component: close down the request map, etc.
+ void shutdown (void);
+ //@}
+
+ /// Main method: read the data from @a dgram and either pass ready data
+ /// to @a cdr_processor or update the <request_map_> if the request
+ /// is not yet complete.
+ /**
+ * Returns 1 if data was read successfully and accepted by
+ * <cdr_processor> without errors.
+ * Returns 0 if there were no errors, but no data has been passed to
+ * <cdr_processor>, either due to request being incomplete (not all
+ * fragments received), or it being a duplicate.
+ * Returns -1 if there were errors.
+ */
+ int handle_input (ACE_SOCK_Dgram& dgram,
+ TAO_ECG_CDR_Processor *cdr_processor);
+
+ /// Represents any request that has been fully received and
+ /// serviced, to simplify the internal logic.
+ static TAO_ECG_UDP_Request_Entry Request_Completed_;
+
+private:
+
+ enum {
+ ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024,
+ ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
+ };
+
+ struct Mcast_Header;
+ class Requests;
+
+ typedef ACE_Hash_Map_Manager<ACE_INET_Addr,
+ Requests*,
+ ACE_Null_Mutex> Request_Map;
+
+private:
+
+ /// Returns 1 on success, 0 if <request_id> has already been
+ /// received or is below current request range, and -1 on error.
+ int mark_received (const ACE_INET_Addr &from,
+ CORBA::ULong request_id);
+
+ /// Returns 1 if complete request is received and <event> is
+ /// populated, 0 if request has only partially been received or is a
+ /// duplicate, and -1 on error.
+ int process_fragment (const ACE_INET_Addr &from,
+ const Mcast_Header &header,
+ char * data_buf,
+ TAO_ECG_CDR_Processor *cdr_processor);
+
+
+ Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from);
+
+private:
+
+ /// Ignore any events coming from this IP address.
+ TAO_ECG_Refcounted_Endpoint ignore_from_;
+
+ /// The map containing all the incoming requests which have been
+ /// partially received.
+ Request_Map request_map_;
+
+ /// Serializes use of <request_map_>.
+ // ACE_Lock* lock_;
+
+ /// Size of a fragmented requests array, i.e., max number of
+ /// partially received requests kept at any given time per source.
+ size_t max_requests_;
+
+ /// Minimum number of requests purged from a fragmented requests
+ /// array when the range of requests represented there needs to be
+ /// shifted.
+ size_t min_purge_count_;
+
+ /// Flag to indicate whether CRC should be computed and checked.
+ CORBA::Boolean check_crc_;
+};
+
+// ****************************************************************
+
+/// Helper for decoding, validating and storing mcast header.
+struct TAO_ECG_CDR_Message_Receiver::Mcast_Header
+{
+ int byte_order;
+ CORBA::ULong request_id;
+ CORBA::ULong request_size;
+ CORBA::ULong fragment_size;
+ CORBA::ULong fragment_offset;
+ CORBA::ULong fragment_id;
+ CORBA::ULong fragment_count;
+ CORBA::ULong crc;
+ int read (char * header,
+ size_t bytes_received,
+ CORBA::Boolean checkcrc = 0);
+};
+
+// ****************************************************************
+
+/// Once init() has been called:
+/// Invariant: id_range_high_- id_range_low_ == size_ - 1
+class TAO_ECG_CDR_Message_Receiver::Requests
+{
+public:
+
+ Requests (void);
+ ~Requests (void);
+
+ /// Allocates and initializes <fragmented_requests_>.
+ int init (size_t size, size_t min_purge_count);
+
+ /// Returns pointer to a <fragmented_requests_> element
+ /// representing <request_id>.
+ /**
+ * If <request_id> < <id_range_low> return 0.
+ * If <request_id> > <id_range_high>, shift the range so it
+ * includes <request_id>, purging incomplete requests as needed.
+ */
+ TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id);
+
+private:
+
+ /// Delete any outstanding requests with ids in the range
+ /// [<purge_first>, <purge_last>] from <fragmented_requests> and
+ /// and reset their slots.
+ void purge_requests (CORBA::ULong purge_first,
+ CORBA::ULong purge_last);
+
+ Requests & operator= (const Requests &rhs);
+ Requests (const Requests &rhs);
+
+private:
+ /// Array, used in a circular fashion, that stores partially received
+ /// requests (and info on which requests have been fully received
+ /// and processed) for a range of request ids.
+ TAO_ECG_UDP_Request_Entry** fragmented_requests_;
+
+ /// Size of <fragmented_requests_> array.
+ size_t size_;
+
+ /// The range of request ids, currently represented in
+ /// <fragmented_requests>.
+ //@{
+ CORBA::ULong id_range_low_;
+ CORBA::ULong id_range_high_;
+ //@}
+
+ /// Minimum range shifting amount.
+ size_t min_purge_count_;
+};
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#if defined(__ACE_INLINE__)
+#include "orbsvcs/Event/ECG_CDR_Message_Receiver.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+
+#endif /* TAO_ECG_CDR_MESSAGE_RECEIVER_H */