diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h new file mode 100644 index 00000000000..500f4b2eb4c --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h @@ -0,0 +1,318 @@ +// -*- C++ -*- +/** + * @file ECG_CDR_Message_Receiver.h + * + * $Id$ + * + * @author Carlos O'Ryan (coryan@cs.wustl.edu) + * @author Marina Spivak (marina@atdesk.com) + */ + +#ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H +#define TAO_ECG_CDR_MESSAGE_RECEIVER_H + +#include /**/ "ace/pre.h" + +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CDR.h" +#include "tao/Environment.h" + +#include "ace/Hash_Map_Manager.h" +#include "ace/INET_Addr.h" +#include "ace/Null_Mutex.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +/** + * @class TAO_ECG_CDR_Processor + * + * @brief Interface for callback objects used by + * TAO_ECG_CDR_Message_Receiver to propagate received data to + * its callers. + */ +class TAO_ECG_CDR_Processor +{ +public: + virtual ~TAO_ECG_CDR_Processor (void); + + /// Extracts data from <cdr>. Returns 0 on success, -1 on error. + virtual int decode (TAO_InputCDR &cdr) = 0; +}; + +// **************************************************************** +/** + * @class TAO_ECG_UDP_Request_Entry + * + * @brief Keeps information about an incomplete request. + * + * When a request arrives in fragments this object is used to + * keep track of the incoming data. + */ +class TAO_ECG_UDP_Request_Entry +{ +public: + enum { + ECG_DEFAULT_FRAGMENT_BUFSIZ = 8 + }; + + /// Initialize the fragment, allocating memory, etc. + TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order, + CORBA::ULong request_id, + CORBA::ULong request_size, + CORBA::ULong fragment_count); + + ~TAO_ECG_UDP_Request_Entry (void); + + /// Validate a fragment, it should be rejected if it is invalid.. + int validate_fragment (CORBA::Boolean byte_order, + CORBA::ULong request_size, + CORBA::ULong fragment_size, + CORBA::ULong fragment_offset, + CORBA::ULong fragment_id, + CORBA::ULong fragment_count) const; + + /// Has @a fragment_id been received? + int test_received (CORBA::ULong fragment_id) const; + + /// Mark @a fragment_id as received, reset timeout counter... + void mark_received (CORBA::ULong fragment_id); + + /// Is the message complete? + int complete (void) const; + + /// Return a buffer for the fragment at offset @a fragment_offset + char* fragment_buffer (CORBA::ULong fragment_offset); + +private: + + TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs); + TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs); + +private: + /// This attributes should remain constant in all the fragments, used + /// for validation.... + CORBA::Boolean byte_order_; + CORBA::ULong request_id_; + CORBA::ULong request_size_; + CORBA::ULong fragment_count_; + + ACE_Message_Block payload_; + + /// This is a bit vector, used to keep track of the received buffers. + CORBA::ULong* received_fragments_; + int own_received_fragments_; + CORBA::ULong received_fragments_size_; + CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ]; +}; + +// **************************************************************** + +/** + * @class TAO_ECG_CDR_Message_Receiver + * + * @brief Receives UDP and Multicast messages. + * + * @todo Update class documentation below. + * + * 5) Make status array size and purge_count configurable. + * + * This class receives UDP and Multicast message fragments, assembles + * them (described in detail below), and passes complete messages + * in the form of cdr streams to the calling classes. + * + * This class is used by various Gateway classes (Senders/Receivers) + * responsible for federating Event Channels with UDP/Mcast. + * + * = REASSEMBLY + * Fragmentation is described in ECG_CDR_Message_Sender.h + * Whenever an incomplete fragment is received (one with + * fragment_count > 1) we allocate an entry for the message in an + * map indexed by (host,port,request_id). The entry contains the + * buffer, a bit vector to keep track of the fragments received + * so far, and a timeout counter. This timeout counter is set to + * 0 on each (new) fragment arrival, and incremented on a regular + * basis. If the counter reaches a maximum value the message is + * dropped. + * Once all the fragments have been received the message is sent + * up to the calling classes, and the memory reclaimed. + */ +class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Receiver +{ +public: + /// Initialization and termination methods. + //@{ + TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc); + ~TAO_ECG_CDR_Message_Receiver (void); + + /** + * @param ignore_from Endpoint used to remove events generated by + * the same process. + */ + void init (TAO_ECG_Refcounted_Endpoint ignore_from + /* , ACE_Lock *lock = 0 */); + + // Shutdown the component: close down the request map, etc. + void shutdown (void); + //@} + + /// Main method: read the data from @a dgram and either pass ready data + /// to @a cdr_processor or update the <request_map_> if the request + /// is not yet complete. + /** + * Returns 1 if data was read successfully and accepted by + * <cdr_processor> without errors. + * Returns 0 if there were no errors, but no data has been passed to + * <cdr_processor>, either due to request being incomplete (not all + * fragments received), or it being a duplicate. + * Returns -1 if there were errors. + */ + int handle_input (ACE_SOCK_Dgram& dgram, + TAO_ECG_CDR_Processor *cdr_processor); + + /// Represents any request that has been fully received and + /// serviced, to simplify the internal logic. + static TAO_ECG_UDP_Request_Entry Request_Completed_; + +private: + + enum { + ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024, + ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32 + }; + + struct Mcast_Header; + class Requests; + + typedef ACE_Hash_Map_Manager<ACE_INET_Addr, + Requests*, + ACE_Null_Mutex> Request_Map; + +private: + + /// Returns 1 on success, 0 if <request_id> has already been + /// received or is below current request range, and -1 on error. + int mark_received (const ACE_INET_Addr &from, + CORBA::ULong request_id); + + /// Returns 1 if complete request is received and <event> is + /// populated, 0 if request has only partially been received or is a + /// duplicate, and -1 on error. + int process_fragment (const ACE_INET_Addr &from, + const Mcast_Header &header, + char * data_buf, + TAO_ECG_CDR_Processor *cdr_processor); + + + Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from); + +private: + + /// Ignore any events coming from this IP address. + TAO_ECG_Refcounted_Endpoint ignore_from_; + + /// The map containing all the incoming requests which have been + /// partially received. + Request_Map request_map_; + + /// Serializes use of <request_map_>. + // ACE_Lock* lock_; + + /// Size of a fragmented requests array, i.e., max number of + /// partially received requests kept at any given time per source. + size_t max_requests_; + + /// Minimum number of requests purged from a fragmented requests + /// array when the range of requests represented there needs to be + /// shifted. + size_t min_purge_count_; + + /// Flag to indicate whether CRC should be computed and checked. + CORBA::Boolean check_crc_; +}; + +// **************************************************************** + +/// Helper for decoding, validating and storing mcast header. +struct TAO_ECG_CDR_Message_Receiver::Mcast_Header +{ + int byte_order; + CORBA::ULong request_id; + CORBA::ULong request_size; + CORBA::ULong fragment_size; + CORBA::ULong fragment_offset; + CORBA::ULong fragment_id; + CORBA::ULong fragment_count; + CORBA::ULong crc; + int read (char * header, + size_t bytes_received, + CORBA::Boolean checkcrc = 0); +}; + +// **************************************************************** + +/// Once init() has been called: +/// Invariant: id_range_high_- id_range_low_ == size_ - 1 +class TAO_ECG_CDR_Message_Receiver::Requests +{ +public: + + Requests (void); + ~Requests (void); + + /// Allocates and initializes <fragmented_requests_>. + int init (size_t size, size_t min_purge_count); + + /// Returns pointer to a <fragmented_requests_> element + /// representing <request_id>. + /** + * If <request_id> < <id_range_low> return 0. + * If <request_id> > <id_range_high>, shift the range so it + * includes <request_id>, purging incomplete requests as needed. + */ + TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id); + +private: + + /// Delete any outstanding requests with ids in the range + /// [<purge_first>, <purge_last>] from <fragmented_requests> and + /// and reset their slots. + void purge_requests (CORBA::ULong purge_first, + CORBA::ULong purge_last); + + Requests & operator= (const Requests &rhs); + Requests (const Requests &rhs); + +private: + /// Array, used in a circular fashion, that stores partially received + /// requests (and info on which requests have been fully received + /// and processed) for a range of request ids. + TAO_ECG_UDP_Request_Entry** fragmented_requests_; + + /// Size of <fragmented_requests_> array. + size_t size_; + + /// The range of request ids, currently represented in + /// <fragmented_requests>. + //@{ + CORBA::ULong id_range_low_; + CORBA::ULong id_range_high_; + //@} + + /// Minimum range shifting amount. + size_t min_purge_count_; +}; + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined(__ACE_INLINE__) +#include "orbsvcs/Event/ECG_CDR_Message_Receiver.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_ECG_CDR_MESSAGE_RECEIVER_H */ |