/* -*- C++ -*- */ /** * @file EC_Gateway_UDP.h * * $Id$ * * @author Carlos O'Ryan (coryan@cs.wustl.edu) * * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and * other members of the DOC group. More details can be found in: * * http://doc.ece.uci.edu/~coryan/EC/index.html * * Define helper classes to propagate events between ECs using * either UDP or multicast. * The architecture is a bit complicated and deserves some * explanation: sending the events over UDP (or mcast) is easy, a * Consumer (TAO_ECG_UDP_Sender) subscribes for a certain set of * events, its push() method marshalls the event set into a CDR * stream that is sent using an ACE_SOCK_CODgram. The subscription * set and IP address can be configured. * Another helper class (TAO_ECG_UDP_Receiver) acts as a supplier of * events; it receives a callback when an event is available on an * ACE_SOCK_Dgram, it demarshalls the event and pushes it to the * EC. Two ACE_Event_Handler classes are provided that can forward * the events to this Supplier: TAO_ECG_Mcast_EH can receive events * from a multicast group; TAO_ECG_UDP_EH can receive events from a * regular UDP socket. * * Matching of the events types carried by a multicast group (or IP * address) is up to the application. Gateway classes can be * implemented to automate this: the EC informs its gateways about * local changes in the subscriptions (for example), the Gateway * could then consult an administrative server that will inform it * which multicast groups carry those event types, it can then * create the proper event handlers and TAO_ECG_Receivers. An * analogous class can be implemented for the Supplier side. * * An alternative approach would be to look the current set of * multicast groups and the events carried on each, using that * information a suitable TAO_ECG_UDP_Receiver can be configured * (and of course the Senders on the client side). * * @todo The class makes an extra copy of the events, we need to * investigate if closer collaboration with its collocated EC could * be used to remove that copy. * * */ #ifndef TAO_EC_GATEWAY_UDP_H #define TAO_EC_GATEWAY_UDP_H #include "ace/pre.h" #include "orbsvcs/RtecUDPAdminS.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "orbsvcs/RtecEventChannelAdminS.h" #include "orbsvcs/Event/event_export.h" #include "ace/SOCK_Dgram_Mcast.h" #include "ace/SOCK_CODgram.h" #include "ace/Hash_Map_Manager.h" class TAO_ECG_UDP_Out_Endpoint; /** * @class TAO_ECG_UDP_Sender * * @brief Send events received from a "local" EC using UDP. * * This class connect as a consumer to an EventChannel * and it sends the events using UDP, the UDP address can be a * normal IP address or it can be a multicast group. * The UDP address is obtained from a RtecUDPAdmin::AddrServer * class. * It marshalls the events using TAO CDR classes. * *

MESSAGE FORMAT

* The messages header are encapsulated using CDR, with the * following format: * struct Header { * octet byte_order_flags; * // bit 0 represents the byte order as in GIOP 1.1 * // bit 1 is set if this is the last fragment * unsigned long request_id; * // The request ID, senders must not send two requests with * // the same ID, senders can be distinguished using recvfrom.. * unsigned long request_size; * // The size of this request, this can be used to pre-allocate * // the request buffer. * unsgined long fragment_size; * // The size of this fragment, excluding the header... * unsigned long fragment_offset; * // Where does this fragment fit in the complete message... * unsigned long fragment_id; * // The ID of this fragment... * unsigned long fragment_count; * // The total number of fragments to expect in this request * * // @todo This could be eliminated if efficient reassembly * // could be implemented without it. * octet padding[4]; * * // Ensures the header ends at an 8-byte boundary. * }; // size (in CDR stream) = 32 */ class TAO_RTEvent_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushConsumer { public: TAO_ECG_UDP_Sender (void); enum { ECG_HEADER_SIZE = 32, ECG_MIN_MTU = 32 + 8, ECG_MAX_MTU = 65536, // Really optimistic... ECG_DEFAULT_MTU = 1024 }; /// Get the local endpoint used to send the events. int get_local_addr (ACE_INET_Addr& addr); /** * To do its job this class requires to know the local EC it will * connect to; it also requires to build an RT_Info for the local * scheduler. * It only keeps a copy of its SupplierProxy, used for later * connection and disconnections. * @todo part of the RT_Info is hardcoded, we need to make it * parametric. */ void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecUDPAdmin::AddrServer_ptr addr_server, TAO_ECG_UDP_Out_Endpoint *endpoint, CORBA::Environment &env = TAO_default_environment ()); /** * The sender may need to fragment the message, otherwise the * network may drop the packets. * Setting the MTU can fail if the value is too small (at least the * header + 8 bytes must fit). */ int mtu (CORBA::ULong mtu); CORBA::ULong mtu (void) const; /// Disconnect and shutdown the sender, no further connections will /// work unless init() is called again. void shutdown (CORBA::Environment & = TAO_default_environment ()); /// Connect (or reconnect) to the EC with the given subscriptions. void open (RtecEventChannelAdmin::ConsumerQOS &sub, CORBA::Environment &env = TAO_default_environment ()); /// Disconnect from the EC, but reconnection is still possible. void close (CORBA::Environment &env = TAO_default_environment ()); /// The PushConsumer methods. virtual void disconnect_push_consumer (CORBA::Environment & = TAO_default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException)); virtual void push (const RtecEventComm::EventSet &events, CORBA::Environment & = TAO_default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException)); private: /// Return the datagram... ACE_SOCK_Dgram& dgram (void); /** * Send one fragment, the first entry in the iovec is used to send * the header, the rest of the iovec array should contain pointers * to the actual data. */ void send_fragment (const RtecUDPAdmin::UDP_Addr& udp_addr, CORBA::ULong request_id, CORBA::ULong request_size, CORBA::ULong fragment_size, CORBA::ULong fragment_offset, CORBA::ULong fragment_id, CORBA::ULong fragment_count, iovec iov[], int iovcnt, CORBA::Environment &env = TAO_default_environment ()); /** * Count the number of fragments that will be required to send the * message blocks in the range [begin,end) * The maximum fragment payload (i.e. the size without the header is * also required); returns the total message size. */ CORBA::ULong compute_fragment_count (const ACE_Message_Block* begin, const ACE_Message_Block* end, int iov_size, CORBA::ULong max_fragment_payload, CORBA::ULong& total_length); private: /// The remote and the local EC, so we can reconnect when the /// subscription list changes. RtecEventChannelAdmin::EventChannel_var lcl_ec_; /// We talk to the EC (as a consumer) using this proxy. RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; /// We query this object to determine where are the events sent. RtecUDPAdmin::AddrServer_var addr_server_; /// The datagram used to sendto(), this object is *not* owned by the /// UDP_Sender. TAO_ECG_UDP_Out_Endpoint *endpoint_; /// The MTU for this sender... CORBA::ULong mtu_; }; // **************************************************************** /** * @class TAO_ECG_UDP_Out_Endpoint * * @brief Maintains information about an outgoing endpoint. * * UDP senders can share a single endpoint to send UDP packets, * but there is more state associated with this endpoint than its * datagram SAP; for instance we need to keep track of the request * id. */ class TAO_RTEvent_Export TAO_ECG_UDP_Out_Endpoint { public: /// Constructor TAO_ECG_UDP_Out_Endpoint (void); /// Constructor ~TAO_ECG_UDP_Out_Endpoint (void); /// Obtain the datagram associated with this endpoint. Clients of /// this class must open, and register (if necessary) this datagram. ACE_SOCK_Dgram& dgram (void); /// Obtain the next request id. CORBA::ULong next_request_id (void); /// The endpoint can detect if a data-gram was sent by itself, this /// is useful to ignore or remove messages sent by the same process. CORBA::Boolean is_loopback (const ACE_INET_Addr& from); private: /// The request id.... ACE_Atomic_Op request_id_generator_; /// The datagram.... ACE_SOCK_Dgram dgram_; /// cache the port-number so we can quickly determine if an event is /// coming from another endpoint. u_short port_number_; /// Keep the list of local interfaces, needed for the is_loopback /// method. size_t if_count_; ACE_INET_Addr* ifs_; }; // **************************************************************** /** * @class TAO_ECG_UDP_Request_Index * * @brief Index to the request map. * * This object is used to index the map of incomplete (due to * fragmentation) requests. */ class TAO_RTEvent_Export TAO_ECG_UDP_Request_Index { public: /// default copy ctor, dtor and operator= TAO_ECG_UDP_Request_Index (void); TAO_ECG_UDP_Request_Index (const ACE_INET_Addr& from, CORBA::ULong request_id); // The ACE_INLINE macros here are to keep g++ 2.7.X happy, // otherwise it thinks they are used as inline functions before // beign used as such.... Apparently in the template code for the // Hash_Map_Manager. /// Return a hash value... ACE_INLINE u_long hash (void) const; /// Compare ACE_INLINE int operator== (const TAO_ECG_UDP_Request_Index& rhs) const; ACE_INLINE int operator!= (const TAO_ECG_UDP_Request_Index& rhs) const; ACE_INET_Addr from; CORBA::ULong request_id; }; // **************************************************************** /** * @class TAO_ECG_UDP_Request_Entry * * @brief Keeps information about an incomplete request. * * When a request arrives in fragments this object is used to * keep track of the incoming data. */ class TAO_RTEvent_Export TAO_ECG_UDP_Request_Entry { public: enum { ECG_DEFAULT_FRAGMENT_BUFSIZ = 8 }; // TAO_ECG_UDP_Request_Entry (void); // TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry& rhs); // TAO_ECG_UDP_Request_Entry& operator=(const TAO_ECG_UDP_Request_Entry& rhs); ~TAO_ECG_UDP_Request_Entry (void); /// Initialize the fragment, allocating memory, etc. TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order, CORBA::ULong request_id, CORBA::ULong request_size, CORBA::ULong fragment_count); /// Validate a fragment, it should be rejected if it is invalid.. int validate_fragment (CORBA::Boolean byte_order, CORBA::ULong request_size, CORBA::ULong fragment_size, CORBA::ULong fragment_offset, CORBA::ULong fragment_id, CORBA::ULong fragment_count) const; /// Has been received? int test_received (CORBA::ULong fragment_id) const; /// Mark as received, reset timeout counter... void mark_received (CORBA::ULong fragment_id); /// Is the message complete? int complete (void) const; /// Return a buffer for the fragment at offset char* fragment_buffer (CORBA::ULong fragment_offset); /// Decode the events, the message must be complete. void decode (RtecEventComm::EventSet& event, CORBA::Environment &env = TAO_default_environment ()); /// Increment the timeout counter... void inc_timeout (void); /// Get the timeout counter.... int get_timeout (void) const; private: /// This attributes should remain constant in all the fragments, used /// for validation.... CORBA::Boolean byte_order_; CORBA::ULong request_id_; CORBA::ULong request_size_; CORBA::ULong fragment_count_; CORBA::ULong timeout_counter_; ACE_Message_Block payload_; /// This is a bit vector, used to keep track of the received buffers. CORBA::ULong* received_fragments_; int own_received_fragments_; CORBA::ULong received_fragments_size_; CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ]; }; // **************************************************************** class TAO_ECG_UDP_Receiver; /** * @class TAO_ECG_UDP_TH * * @brief Timer Handler for the UDP Receivers. * * This object receives timer events from the reactor and forwards * them to the UDP_Receiver; which uses those events to expire old * messages that did not receive all their fragments. */ class TAO_RTEvent_Export TAO_ECG_UDP_TH : public ACE_Event_Handler { public: TAO_ECG_UDP_TH (TAO_ECG_UDP_Receiver *recv); // Reactor callbacks virtual int handle_timeout (const ACE_Time_Value& tv, const void* act); private: /// We callback to this object when a message arrives. TAO_ECG_UDP_Receiver* receiver_; }; // **************************************************************** /** * @class TAO_ECG_UDP_Receiver * * @brief Decodes events from an ACE_SOCK_Dgram and pushes them to the * Event_Channel. * * This supplier receives events from an ACE_SOCK_Dgram, either * from a UDP socket or a Mcast group, decodes them and push them * to the EC. * = REASSEMBLY * Whenever an incomplete fragment is received (one with * fragment_count > 1) we allocate an entry for the message in an * map indexed by (host,port,request_id). The entry contains the * buffer, a bit vector to keep track of the fragments received * so far, and a timeout counter. This timeout counter is set to * 0 on each (new) fragment arrival, and incremented on a regular * basis. If the counter reaches a maximum value the message is * dropped. * Once all the fragments have been received the message is sent * up, and the memory reclaimed. The entry is *not* removed until * the timer expires (to handle re-transmitions). */ class TAO_RTEvent_Export TAO_ECG_UDP_Receiver : public POA_RtecEventComm::PushSupplier { public: TAO_ECG_UDP_Receiver (void); /** * To do its job this class requires to know the local EC it will * connect to; it also requires to build an RT_Info for the local * scheduler. * The is used to receive timeout events.. * The endpoint is used to remove events generated by * the same process. * @todo part of the RT_Info is hardcoded, we need to make it * parametric. */ void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, TAO_ECG_UDP_Out_Endpoint* ignore_from, RtecUDPAdmin::AddrServer_ptr addr_server, ACE_Reactor *reactor, const ACE_Time_Value &expire_interval, int max_timeout, CORBA::Environment &env = TAO_default_environment ()); /// Disconnect and shutdown the gateway, no further connectsions void shutdown (CORBA::Environment & = TAO_default_environment ()); /// Connect to the EC using the given publications lists. void open (RtecEventChannelAdmin::SupplierQOS& pub, CORBA::Environment &env = TAO_default_environment ()); /// Disconnect to the EC. virtual void close (CORBA::Environment &env = TAO_default_environment ()); /** * The Event_Handlers call this method when data is available at the * socket, the must be ready for reading and contain a full * event. */ int handle_input (ACE_SOCK_Dgram& dgram); /// The timer has expired, must update all the unreceived /// messages... int handle_timeout (const ACE_Time_Value& tv, const void* act); // The PushSupplier method. virtual void disconnect_push_supplier (CORBA::Environment & = TAO_default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException)); /// Call the RtecUDPAdmin::AddrServer void get_addr (const RtecEventComm::EventHeader& header, RtecUDPAdmin::UDP_Addr_out addr, CORBA::Environment &env = TAO_default_environment ()); private: /// The remote and the local EC, so we can reconnect when the list changes. RtecEventChannelAdmin::EventChannel_var lcl_ec_; /// We talk to the EC (as a consumer) using this proxy. RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; /// Ignore any events coming from this IP addres. TAO_ECG_UDP_Out_Endpoint* ignore_from_; /// The server used to map event types into multicast groups. RtecUDPAdmin::AddrServer_var addr_server_; typedef ACE_Hash_Map_Manager Request_Map; typedef ACE_Hash_Map_Entry Request_Map_Entry; /// The map containing all the incoming requests which have been /// partially received. Request_Map request_map_; /// To receive the timeouts.. TAO_ECG_UDP_TH timeout_handler_; /// The reactor we are using for the timeout handler... ACE_Reactor* reactor_; /// How many timeouts before we expire a message... int max_timeout_; }; // **************************************************************** /** * @class TAO_ECG_UDP_EH * * @brief Event Handler for UDP messages. * * This object receives callbacks from the Reactor when data is * available on a UDP socket, it forwards to the ECG_UDP_Receiver * which reads the events and transform it into an event. */ class TAO_RTEvent_Export TAO_ECG_UDP_EH : public ACE_Event_Handler { public: TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv); /// Open the datagram and register with this->reactor() int open (const ACE_INET_Addr& ipaddr, int reuse_addr = 0); /// Close the datagram and unregister with the reactor. int close (void); /** * Obtain the dgram, this is one of those "controlled violations of * type safety", allowing the user to setup options and gain access * to low-level features. */ ACE_SOCK_Dgram &dgram (void); // Reactor callbacks virtual int handle_input (ACE_HANDLE fd); virtual ACE_HANDLE get_handle (void) const; private: /// The datagram used to receive the data. ACE_SOCK_Dgram dgram_; /// We callback to this object when a message arrives. TAO_ECG_UDP_Receiver* receiver_; }; #if defined(__ACE_INLINE__) #include "EC_Gateway_UDP.i" #endif /* __ACE_INLINE__ */ #include "ace/post.h" #endif /* ACE_EVENT_CHANNEL_UDP_H */