summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h638
1 files changed, 0 insertions, 638 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
deleted file mode 100644
index 70c3df83ba7..00000000000
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
+++ /dev/null
@@ -1,638 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-//
-// ============================================================================
-//
-// = LIBRARY
-// TAO services
-//
-// = FILENAME
-// EC_Gateway_UDP
-//
-// = AUTHOR
-// Carlos O'Ryan
-//
-// = DESCRIPTION
-// Define helper classes to propagate events between ECs using
-// either UDP or multicast.
-// The architecture is a bit complicated and deserves some
-// explanation: sending the events over UDP (or mcast) is easy, a
-// Consumer (TAO_ECG_UDP_Sender) subscribes for a certain set of
-// events, its push() method marshalls the event set into a CDR
-// stream that is sent using an ACE_SOCK_CODgram. The subscription
-// set and IP address can be configured.
-// Another helper class (TAO_ECG_UDP_Receiver) acts as a supplier of
-// events; it receives a callback when an event is available on an
-// ACE_SOCK_Dgram, it demarshalls the event and pushes it to the
-// EC. Two ACE_Event_Handler classes are provided that can forward
-// the events to this Supplier: TAO_ECG_Mcast_EH can receive events
-// from a multicast group; TAO_ECG_UDP_EH can receive events from a
-// regular UDP socket.
-//
-// Matching of the events types carried by a multicast group (or IP
-// address) is up to the application. Gateway classes can be
-// implemented to automate this: the EC informs its gateways about
-// local changes in the subscriptions (for example), the Gateway
-// could then consult an administrative server that will inform it
-// which multicast groups carry those event types, it can then
-// create the proper event handlers and TAO_ECG_Receivers. An
-// analogous class can be implemented for the Supplier side.
-//
-// An alternative approach would be to look the current set of
-// multicast groups and the events carried on each, using that
-// information a suitable TAO_ECG_UDP_Receiver can be configured
-// (and of course the Senders on the client side).
-//
-// = TODO
-// The class makes an extra copy of the events, we need to
-// investigate if closer collaboration with its collocated EC could
-// be used to remove that copy.
-//
-// ============================================================================
-
-#ifndef TAO_EC_GATEWAY_UDP_H
-#define TAO_EC_GATEWAY_UDP_H
-
-#include "ace/SOCK_CODgram.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-#include "ace/SOCK_Dgram_Mcast.h"
-#include "orbsvcs/RtecEventChannelAdminS.h"
-#include "orbsvcs/RtecUDPAdminS.h"
-#include "orbsvcs/orbsvcs_export.h"
-
-class TAO_ECG_UDP_Out_Endpoint;
-
-class TAO_ORBSVCS_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushConsumer
-{
- //
- // = TITLE
- // Send events received from a "local" EC using UDP.
- //
- // = DESCRIPTION
- // This class connect as a consumer to an EventChannel
- // and it sends the events using UDP, the UDP address can be a
- // normal IP address or it can be a multicast group.
- // The UDP address is obtained from a RtecUDPAdmin::AddrServer
- // class.
- // It marshalls the events using TAO CDR classes.
- //
- // = MESSAGE FORMAT
- //
- // The messages header are encapsulated using CDR, with the
- // following format:
- //
- // struct Header {
- // octet byte_order_flags;
- // // bit 0 represents the byte order as in GIOP 1.1
- // // bit 1 is set if this is the last fragment
- //
- // unsigned long request_id;
- // // The request ID, senders must not send two requests with
- // // the same ID, senders can be distinguished using recvfrom..
- //
- // unsigned long request_size;
- // // The size of this request, this can be used to pre-allocate
- // // the request buffer.
- //
- // unsgined long fragment_size;
- // // The size of this fragment, excluding the header...
- //
- // unsigned long fragment_offset;
- // // Where does this fragment fit in the complete message...
- //
- // unsigned long fragment_id;
- // // The ID of this fragment...
- //
- // unsigned long fragment_count;
- // // The total number of fragments to expect in this request
- // // @@ TODO This could be eliminated if efficient reassembly
- // // could be implemented without it.
- //
- // octet padding[4];
- // // Ensures the header ends at an 8-byte boundary.
- // }; // size (in CDR stream) = 32
- //
- //
-public:
- TAO_ECG_UDP_Sender (void);
-
- enum {
- ECG_HEADER_SIZE = 32,
- ECG_MIN_MTU = 32 + 8,
- ECG_MAX_MTU = 65536, // Really optimistic...
- ECG_DEFAULT_MTU = 1024
- };
-
- int get_local_addr (ACE_INET_Addr& addr);
- // Get the local endpoint used to send the events.
-
- void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
- RtecScheduler::Scheduler_ptr lcl_sched,
- const char* lcl_name,
- RtecUDPAdmin::AddrServer_ptr addr_server,
- TAO_ECG_UDP_Out_Endpoint* endpoint,
- CORBA::Environment &_env);
- // To do its job this class requires to know the local EC it will
- // connect to; it also requires to build an RT_Info for the local
- // scheduler.
- // It only keeps a copy of its SupplierProxy, used for later
- // connection and disconnections.
- // @@ TODO part of the RT_Info is hardcoded, we need to make it
- // parametric.
-
- int mtu (CORBA::ULong mtu);
- CORBA::ULong mtu (void) const;
- // The sender may need to fragment the message, otherwise the
- // network may drop the packets.
- // Setting the MTU can fail if the value is too small (at least the
- // header + 8 bytes must fit).
-
- void shutdown (CORBA::Environment&);
- // Disconnect and shutdown the sender, no further connections will
- // work unless init() is called again.
-
- void open (RtecEventChannelAdmin::ConsumerQOS& sub,
- CORBA::Environment& env);
- // Connect (or reconnect) to the EC with the given subscriptions.
-
- void close (CORBA::Environment& _env);
- // Disconnect from the EC, but reconnection is still possible.
-
- virtual void disconnect_push_consumer (CORBA::Environment &);
- virtual void push (const RtecEventComm::EventSet &events,
- CORBA::Environment &);
- // The PushConsumer methods.
-
-private:
- ACE_SOCK_Dgram& dgram (void);
- // Return the datagram...
-
- void send_fragment (const RtecUDPAdmin::UDP_Addr& udp_addr,
- CORBA::ULong request_id,
- CORBA::ULong request_size,
- CORBA::ULong fragment_size,
- CORBA::ULong fragment_offset,
- CORBA::ULong fragment_id,
- CORBA::ULong fragment_count,
- iovec iov[],
- int iovcnt,
- CORBA::Environment& _env);
- // Send one fragment, the first entry in the iovec is used to send
- // the header, the rest of the iovec array should contain pointers
- // to the actual data.
-
- CORBA::ULong compute_fragment_count (const ACE_Message_Block* begin,
- const ACE_Message_Block* end,
- int iov_size,
- CORBA::ULong max_fragment_payload,
- CORBA::ULong& total_length);
- // Count the number of fragments that will be required to send the
- // message blocks in the range [begin,end)
- // The maximum fragment payload (i.e. the size without the header is
- // also required); <total_length> returns the total message size.
-
-private:
- RtecEventChannelAdmin::EventChannel_var lcl_ec_;
- // The remote and the local EC, so we can reconnect when the
- // subscription list changes.
-
- RtecScheduler::handle_t lcl_info_;
- // Our local and remote RT_Infos.
-
- RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
- // We talk to the EC (as a consumer) using this proxy.
-
- RtecUDPAdmin::AddrServer_var addr_server_;
- // We query this object to determine where are the events sent.
-
- TAO_ECG_UDP_Out_Endpoint *endpoint_;
- // The datagram used to sendto(), this object is *not* owned by the
- // UDP_Sender.
-
- CORBA::ULong mtu_;
- // The MTU for this sender...
-};
-
-// ****************************************************************
-
-class TAO_ORBSVCS_Export TAO_ECG_UDP_Out_Endpoint
-{
- //
- // = TITLE
- // Maintains information about an outgoing endpoint.
- //
- // = DESCRIPTION
- // UDP senders can share a single endpoint to send UDP packets,
- // but there is more state associated with this endpoint than its
- // datagram SAP; for instance we need to keep track of the request
- // id.
-public:
- TAO_ECG_UDP_Out_Endpoint (void);
- // Constructor
-
- ~TAO_ECG_UDP_Out_Endpoint (void);
- // Constructor
-
- ACE_SOCK_Dgram& dgram (void);
- // Obtain the datagram associated with this endpoint. Clients of
- // this class must open, and register (if necessary) this datagram.
-
- CORBA::ULong next_request_id (void);
- // Obtain the next request id.
-
- CORBA::Boolean is_loopback (const ACE_INET_Addr& from);
- // The endpoint can detect if a data-gram was sent by itself, this
- // is useful to ignore or remove messages sent by the same process.
-
-private:
- ACE_Atomic_Op<ACE_SYNCH_MUTEX,CORBA::ULong> request_id_generator_;
- // The request id....
-
- ACE_SOCK_Dgram dgram_;
- // The datagram....
-
- u_short port_number_;
- // cache the port-number so we can quickly determine if an event is
- // coming from another endpoint.
-
- size_t if_count_;
- ACE_INET_Addr* ifs_;
- // Keep the list of local interfaces, needed for the is_loopback
- // method.
-};
-
-// ****************************************************************
-
-class TAO_ORBSVCS_Export TAO_ECG_UDP_Request_Index
-{
- // = TITLE
- // Index to the request map.
- //
- // = DESCRIPTION
- // This object is used to index the map of incomplete (due to
- // fragmentation) requests.
- //
-public:
- TAO_ECG_UDP_Request_Index (void);
- TAO_ECG_UDP_Request_Index (const ACE_INET_Addr& from,
- CORBA::ULong request_id);
- // default copy ctor, dtor and operator=
-
- ACE_INLINE u_long hash (void) const;
- // Return a hash value...
-
- ACE_INLINE int operator== (const TAO_ECG_UDP_Request_Index& rhs) const;
- ACE_INLINE int operator!= (const TAO_ECG_UDP_Request_Index& rhs) const;
- // Compare
-
- ACE_INET_Addr from;
- CORBA::ULong request_id;
-};
-
-// ****************************************************************
-
-class TAO_ORBSVCS_Export TAO_ECG_UDP_Request_Entry
-{
- // = TITLE
- // Keeps information about an incomplete request.
- //
- // = DESCRIPTION
- // When a request arrives in fragments this object is used to
- // keep track of the incoming data.
- //
-public:
- enum {
- ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
- };
-
- // TAO_ECG_UDP_Request_Entry (void);
- // TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry& rhs);
- // TAO_ECG_UDP_Request_Entry& operator=(const TAO_ECG_UDP_Request_Entry& rhs);
- ~TAO_ECG_UDP_Request_Entry (void);
-
- TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
- CORBA::ULong request_id,
- CORBA::ULong request_size,
- CORBA::ULong fragment_count);
- // Initialize the fragment, allocating memory, etc.
-
- int validate_fragment (CORBA::Boolean byte_order,
- CORBA::ULong request_size,
- CORBA::ULong fragment_size,
- CORBA::ULong fragment_offset,
- CORBA::ULong fragment_id,
- CORBA::ULong fragment_count) const;
- // Validate a fragment, it should be rejected if it is invalid..
-
- int test_received (CORBA::ULong fragment_id) const;
- // Has <fragment_id> been received?
-
- void mark_received (CORBA::ULong fragment_id);
- // Mark <fragment_id> as received, reset timeout counter...
-
- int complete (void) const;
- // Is the message complete?
-
- char* fragment_buffer (CORBA::ULong fragment_offset);
- // Return a buffer for the fragment at offset <fragment_offset>
-
- void decode (RtecEventComm::EventSet& event,
- CORBA::Environment& _env);
- // Decode the events, the message must be complete.
-
- void inc_timeout (void);
- // Increment the timeout counter...
-
- int get_timeout (void) const;
- // Get the timeout counter....
-
-private:
- CORBA::Boolean byte_order_;
- CORBA::ULong request_id_;
- CORBA::ULong request_size_;
- CORBA::ULong fragment_count_;
- // This attributes should remain constant in all the fragments, used
- // for validation....
-
- CORBA::ULong timeout_counter_;
- ACE_Message_Block payload_;
-
- CORBA::ULong* received_fragments_;
- int own_received_fragments_;
- CORBA::ULong received_fragments_size_;
- CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
- // This is a bit vector, used to keep track of the received buffers.
-};
-
-// ****************************************************************
-
-class TAO_ECG_UDP_Receiver;
-
-class TAO_ORBSVCS_Export TAO_ECG_UDP_TH : public ACE_Event_Handler
-{
- //
- // = TITLE
- // Timer Handler for the UDP Receivers.
- //
- // = DESCRIPTION
- // This object receives timer events from the reactor and forwards
- // them to the UDP_Receiver; which uses those events to expire old
- // messages that did not receive all their fragments.
- //
-public:
- TAO_ECG_UDP_TH (TAO_ECG_UDP_Receiver *recv);
-
- // Reactor callbacks
- virtual int handle_timeout (const ACE_Time_Value& tv,
- const void* act);
-
-private:
- TAO_ECG_UDP_Receiver* receiver_;
- // We callback to this object when a message arrives.
-};
-
-// ****************************************************************
-
-class TAO_ORBSVCS_Export TAO_ECG_UDP_Receiver : public POA_RtecEventComm::PushSupplier
-{
- //
- // = TITLE
- // Decodes events from an ACE_SOCK_Dgram and pushes them to the
- // Event_Channel.
- //
- // = DESCRIPTION
- // This supplier receives events from an ACE_SOCK_Dgram, either
- // from a UDP socket or a Mcast group, decodes them and push them
- // to the EC.
- //
- // = REASSEMBLY
- //
- // Whenever an incomplete fragment is received (one with
- // fragment_count > 1) we allocate an entry for the message in an
- // map indexed by (host,port,request_id). The entry contains the
- // buffer, a bit vector to keep track of the fragments received
- // so far, and a timeout counter. This timeout counter is set to
- // 0 on each (new) fragment arrival, and incremented on a regular
- // basis. If the counter reaches a maximum value the message is
- // dropped.
- // Once all the fragments have been received the message is sent
- // up, and the memory reclaimed. The entry is *not* removed until
- // the timer expires (to handle re-transmitions).
- //
-public:
- TAO_ECG_UDP_Receiver (void);
-
- void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
- RtecScheduler::Scheduler_ptr lcl_sched,
- const char* lcl_name,
- TAO_ECG_UDP_Out_Endpoint* ignore_from,
- RtecUDPAdmin::AddrServer_ptr addr_server,
- ACE_Reactor *reactor,
- const ACE_Time_Value &expire_interval,
- int max_timeout,
- CORBA::Environment &_env);
- // To do its job this class requires to know the local EC it will
- // connect to; it also requires to build an RT_Info for the local
- // scheduler.
- // The <reactor> is used to receive timeout events..
- // The <ignore_from> endpoint is used to remove events generated by
- // the same process.
- // @@ TODO part of the RT_Info is hardcoded, we need to make it
- // parametric.
-
- void shutdown (CORBA::Environment&);
- // Disconnect and shutdown the gateway, no further connectsions
-
- void open (RtecEventChannelAdmin::SupplierQOS& pub,
- CORBA::Environment &env);
- // Connect to the EC using the given publications lists.
-
- virtual void close (CORBA::Environment& env);
- // Disconnect to the EC.
-
- int handle_input (ACE_SOCK_Dgram& dgram);
- // The Event_Handlers call this method when data is available at the
- // socket, the <dgram> must be ready for reading and contain a full
- // event.
-
- int handle_timeout (const ACE_Time_Value& tv,
- const void* act);
- // The timer has expired, must update all the unreceived
- // messages...
-
- // The PushSupplier method.
- virtual void disconnect_push_supplier (CORBA::Environment &);
-
-
- void get_addr (const RtecEventComm::EventHeader& header,
- RtecUDPAdmin::UDP_Addr_out addr,
- CORBA::Environment& env);
- // Call the RtecUDPAdmin::AddrServer
-
-private:
- RtecEventChannelAdmin::EventChannel_var lcl_ec_;
- // The remote and the local EC, so we can reconnect when the list changes.
-
- RtecScheduler::handle_t lcl_info_;
- // Our RT_Info.
-
- RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_;
- // We talk to the EC (as a consumer) using this proxy.
-
- TAO_ECG_UDP_Out_Endpoint* ignore_from_;
- // Ignore any events coming from this IP addres.
-
- RtecUDPAdmin::AddrServer_var addr_server_;
- // The server used to map event types into multicast groups.
-
- typedef ACE_Hash_Map_Manager<TAO_ECG_UDP_Request_Index,
- TAO_ECG_UDP_Request_Entry*,
- ACE_SYNCH_MUTEX> Request_Map;
- typedef ACE_Hash_Map_Entry<TAO_ECG_UDP_Request_Index,
- TAO_ECG_UDP_Request_Entry*> Request_Map_Entry;
-
- Request_Map request_map_;
- // The map containing all the incoming requests which have been
- // partially received.
-
- TAO_ECG_UDP_TH timeout_handler_;
- // To receive the timeouts..
-
- ACE_Reactor* reactor_;
- // The reactor we are using for the timeout handler...
-
- int max_timeout_;
- // How many timeouts before we expire a message...
-};
-
-// ****************************************************************
-
-class TAO_ORBSVCS_Export TAO_ECG_UDP_EH : public ACE_Event_Handler
-{
- //
- // = TITLE
- // Event Handler for UDP messages.
- //
- // = DESCRIPTION
- // This object receives callbacks from the Reactor when data is
- // available on a UDP socket, it forwards to the ECG_UDP_Receiver
- // which reads the events and transform it into an event.
- //
-public:
- TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv);
-
- int open (const ACE_INET_Addr& ipaddr);
- // Open the datagram and register with this->reactor()
-
- int close (void);
- // Close the datagram and unregister with the reactor.
-
- // Reactor callbacks
- virtual int handle_input (ACE_HANDLE fd);
- virtual ACE_HANDLE get_handle (void) const;
-
-private:
- ACE_SOCK_Dgram dgram_;
- // The datagram used to receive the data.
-
- TAO_ECG_UDP_Receiver* receiver_;
- // We callback to this object when a message arrives.
-};
-
-// ****************************************************************
-
-class TAO_ORBSVCS_Export TAO_ECG_Mcast_EH : public ACE_Event_Handler
-{
- //
- // = TITLE
- // Event Handler for UDP messages.
- //
- // = DESCRIPTION
- // This object receives callbacks from the Reactor when data is
- // available on the mcast socket, it forwards to the UDP_Receive
- // gateway which reads the events and transform it into an event.
- //
-public:
- TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv);
-
- int open (RtecEventChannelAdmin::EventChannel_ptr ec,
- CORBA::Environment& _env);
- // Register for changes in the EC subscription list.
- // When the subscription list becomes non-empty we join the proper
- // multicast groups (using the receiver to translate between event
- // types and mcast groups) and the class registers itself with the
- // reactor.
-
- int close (CORBA::Environment& _env);
- // Remove ourselves from the event channel, unsubscribe from the
- // multicast groups, close the sockets and unsubscribe from the
- // reactor.
-
- virtual int handle_input (ACE_HANDLE fd);
- virtual ACE_HANDLE get_handle (void) const;
- // Reactor callbacks
-
- void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub,
- CORBA::Environment& _env);
- void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub,
- CORBA::Environment& _env);
- // The Observer methods
-
- class Observer : public POA_RtecEventChannelAdmin::Observer
- {
- // = TITLE
- // Observe changes in the EC subscriptions.
- //
- // = DESCRIPTION
- // As the subscriptions on the EC change we also change the
- // mcast groups that we join.
- // We could use the TIE classes, but they don't work in all
- // compilers.
- public:
- Observer (TAO_ECG_Mcast_EH* eh);
- // We report changes in the EC subscriptions to the event
- // handler.
-
- // The Observer methods
- virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub,
- CORBA::Environment& _env);
- virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub,
- CORBA::Environment& _env);
-
- private:
- TAO_ECG_Mcast_EH* eh_;
- // Our callback object.
- };
-
-private:
- int subscribe (const ACE_INET_Addr &mcast_addr);
- int unsubscribe (const ACE_INET_Addr &mcast_addr);
- // Control the multicast group subscriptions
-
-private:
- ACE_SOCK_Dgram_Mcast dgram_;
- // The datagram used to receive the data.
-
- TAO_ECG_UDP_Receiver* receiver_;
- // We callback to this object when a message arrives.
-
- Observer observer_;
- // This object will call us back when the subscription list
- // changes.
-
- RtecEventChannelAdmin::Observer_Handle handle_;
- // Keep the handle of the observer so we can unregister later.
-
- RtecEventChannelAdmin::EventChannel_var ec_;
- // The Event Channel.
-};
-
-#if defined(__ACE_INLINE__)
-#include "EC_Gateway_UDP.i"
-#endif /* __ACE_INLINE__ */
-
-#endif /* ACE_EVENT_CHANNEL_UDP_H */