diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.h')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.h | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.h b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.h new file mode 100644 index 00000000000..3c22ae7efb4 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.h @@ -0,0 +1,190 @@ +// -*- C++ -*- + +/** + * @file ECG_CDR_Message_Sender.h + * + * $Id$ + * + * @author Carlos O'Ryan (coryan@cs.wustl.edu) + * @author Marina Spivak (marina@atdesk.com) + */ + +#ifndef TAO_ECG_CDR_MESSAGE_SENDER_H +#define TAO_ECG_CDR_MESSAGE_SENDER_H + +#include /**/ "ace/pre.h" + +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include /**/ "orbsvcs/Event/event_serv_export.h" + +#include "tao/SystemException.h" +#include "tao/Environment.h" + +#include "ace/INET_Addr.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +/** + * @class TAO_ECG_CDR_Message_Sender + * + * @brief Sends CDR messages using UDP. + * NOT THREAD-SAFE. + * + * This class breaks up a CDR message into fragments and sends each + * fragment with a header (described below) using UDP. + * The UDP address can be a normal IP address or it can be a multicast + * group. The UDP address is obtained from a RtecUDPAdmin::AddrServer + * class. + * + * This class is used by various Gateway (Senders/Receivers) classes + * responsible for federating Event Channels with UDP/Mcast. + * + * <H2>MESSAGE FORMAT</H2> + * Message header are encapsulated using CDR, with the + * following format: + * struct Header { + * octet byte_order_flags; + * // bit 0 represents the byte order as in GIOP 1.1 + * // bit 1 is set if this is the last fragment + * unsigned long request_id; + * // The request ID, senders must not send two requests with + * // the same ID, senders can be distinguished using recvfrom.. + * unsigned long request_size; + * // The size of this request, this can be used to pre-allocate + * // the request buffer. + * unsgined long fragment_size; + * // The size of this fragment, excluding the header... + * unsigned long fragment_offset; + * // Where does this fragment fit in the complete message... + * unsigned long fragment_id; + * // The ID of this fragment... + * unsigned long fragment_count; + * // The total number of fragments to expect in this request + * + * // @todo This could be eliminated if efficient reassembly + * // could be implemented without it. + * octet padding[4]; + * + * // Ensures the header ends at an 8-byte boundary. + * }; // size (in CDR stream) = 32 + */ +class TAO_RTEvent_Serv_Export TAO_ECG_CDR_Message_Sender +{ +public: + + enum { + ECG_HEADER_SIZE = 32, + ECG_MIN_MTU = 32 + 8, + ECG_MAX_MTU = 65536, // Really optimistic... + ECG_DEFAULT_MTU = 1024 + }; + + /// Initialization and termination methods. + //@{ + TAO_ECG_CDR_Message_Sender (CORBA::Boolean crc = 0); + + /// Set the endpoint for sending messages. + /** + * If init () is successful, shutdown () must be called when the + * sender is no longer needed. If shutdown () is not called by the + * user, cleanup activities will be performed by the destructor. + */ + void init (TAO_ECG_Refcounted_Endpoint endpoint_rptr + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + + // Shutdown this component. Frees up the endpoint. + void shutdown (ACE_ENV_SINGLE_ARG_DECL); + //@} + + /// Setters/getters. + //@{ + /// Get the local endpoint used to send the events. + int get_local_addr (ACE_INET_Addr& addr); + + /** + * The sender may need to fragment the message, otherwise the + * network may drop the packets. + * Setting the MTU can fail if the value is too small (at least the + * header + 8 bytes must fit). + */ + int mtu (CORBA::ULong mtu); + CORBA::ULong mtu (void) const; + //@} + + /// The main method - send a CDR message. + /** + * @todo Under some platforms, notably Linux, the fragmentation code + * in this method is woefully naive. The fragments are sent it a + * big burst, unfortunately, that can fill up the local kernel + * buffer before all the data is sent. In those circumstances some + * of the fragments are silently (gulp!) dropped by the kernel, + * check the documentation for sendto(2) specially the ENOBUFS + * error condition. + * There is no easy solution that I know off, except "pacing" the + * fragments, i.e. never sending more than a prescribed number of + * bytes per-second, sleeping before sending more or queueing them + * to send later via the reactor. + */ + void send_message (const TAO_OutputCDR &cdr, + const ACE_INET_Addr &addr + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + /// Return the datagram... + ACE_SOCK_Dgram& dgram (void); + + /** + * Send one fragment, the first entry in the iovec is used to send + * the header, the rest of the iovec array should contain pointers + * to the actual data. + */ + void send_fragment (const ACE_INET_Addr &addr, + CORBA::ULong request_id, + CORBA::ULong request_size, + CORBA::ULong fragment_size, + CORBA::ULong fragment_offset, + CORBA::ULong fragment_id, + CORBA::ULong fragment_count, + iovec iov[], + int iovcnt + ACE_ENV_ARG_DECL); + + /** + * Count the number of fragments that will be required to send the + * message blocks in the range [begin,end) + * The maximum fragment payload (i.e. the size without the header is + * also required); <total_length> returns the total message size. + */ + CORBA::ULong compute_fragment_count (const ACE_Message_Block* begin, + const ACE_Message_Block* end, + int iov_size, + CORBA::ULong max_fragment_payload, + CORBA::ULong& total_length); + +private: + /// The datagram used for sendto (). + TAO_ECG_Refcounted_Endpoint endpoint_rptr_; + + /// The MTU for this sender... + CORBA::ULong mtu_; + + /// Should crc checksum be caluclated and sent? + CORBA::Boolean checksum_; +}; + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined(__ACE_INLINE__) +#include "orbsvcs/Event/ECG_CDR_Message_Sender.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_ECG_CDR_MESSAGE_SENDER_H */ |