diff options
Diffstat (limited to 'ACE/apps/Gateway/Peer/Peer.h')
-rw-r--r-- | ACE/apps/Gateway/Peer/Peer.h | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/ACE/apps/Gateway/Peer/Peer.h b/ACE/apps/Gateway/Peer/Peer.h new file mode 100644 index 00000000000..0723882a243 --- /dev/null +++ b/ACE/apps/Gateway/Peer/Peer.h @@ -0,0 +1,257 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Peer.h +// +// = DESCRIPTION +// These classes process Supplier/Consumer events sent from the +// gateway (gatewayd) to its various peers (peerd). The general +// collaboration works as follows: +// +// 1. <Peer_Acceptor> creates a listener endpoint and waits +// passively for gatewayd to connect with it. +// +// 2. When a gatewayd connects, <Peer_Acceptor> creates an +// <Peer_Handler> object that sends/receives events from +// gatewayd on that connection. +// +// 3. The <Peer_Handler> waits for gatewayd to inform it of its +// connection ID, which is prepended to all subsequent outgoing +// events sent from peerd. +// +// 4. Once the connection ID is set, peerd periodically sends events +// to gatewayd. Peerd also receives and "processes" events +// forwarded to it from gatewayd. In this program, peerd +// "processes" the events sent to it by writing them to stdout. +// +// Note that in the current peerd implementation, one Peer process +// cannot serve as both a Consumer and Supplier of Events. This is +// because the gatewayd establishes a separate connection for +// Suppliers and Consumers and the peerd only maintains a single +// <Peer_Handler> object to handle this one connection. Enhancing +// this implementation to be both a Consumer and Supplier +// simultaneously is straightforward, however. In addition, +// multiple peerd processes can already work together to play these +// different roles. +// +// = AUTHOR +// Douglas C. Schmidt +// +// ============================================================================ + +#ifndef PEER_H +#define PEER_H + +#include "ace/Service_Config.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Acceptor.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Connector.h" +#include "ace/Svc_Handler.h" +#include "ace/Connector.h" +#include "ace/Null_Condition.h" +#include "ace/Null_Mutex.h" +#include "Options.h" + +ACE_SVC_FACTORY_DECLARE (Peer_Factory) + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class ACE_Svc_Export ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +class ACE_Svc_Export Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +{ + // = TITLE + // Handle Peer events arriving from a Gateway. +public: + // = Initialization and termination methods. + Peer_Handler (void); + // Initialize the peer. + + ~Peer_Handler (void); + // Shutdown the Peer. + + virtual int open (void * = 0); + // Initialize the handler when called by + // <ACE_Acceptor::handle_input>. + + virtual int handle_input (ACE_HANDLE); + // Receive and process peer events. + + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + // Send a event to a gateway (may be queued if necessary due to flow + // control). + + virtual int handle_output (ACE_HANDLE); + // Finish sending a event when flow control conditions abate. + + virtual int handle_timeout (const ACE_Time_Value &, + const void *arg); + // Periodically send events via <ACE_Reactor> timer mechanism. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + // Perform object termination. + +protected: + typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited; + + int transmit (ACE_Message_Block *mb, + size_t n, + int event_type); + // Transmit <mb> to the gatewayd. + + virtual int recv (ACE_Message_Block *&mb); + // Receive an Peer event from gatewayd. + + virtual int send (ACE_Message_Block *mb); + // Send an Peer event to gatewayd, using <nonblk_put>. + + virtual int nonblk_put (ACE_Message_Block *mb); + // Perform a non-blocking <put>, which tries to send an event to the + // gatewayd, but only if it isn't flow controlled. + + int subscribe (void); + // Register Consumer subscriptions with the gateway. + + // = Event/state/action handlers. + int transmit_stdin (void); + // Receive a event from stdin and send it to the gateway. + + int await_connection_id (void); + // Action that receives the route id. + + int await_events (void); + // Action that receives events. + + int (Peer_Handler::*do_action_)(void); + // Pointer-to-member-function for the current action to run in this + // state. This points to one of the preceding 3 methods. + + CONNECTION_ID connection_id_; + // Connection ID of the peer, which is obtained from the gatewayd. + + ACE_Message_Block *msg_frag_; + // Keep track of event fragments that arrive in non-blocking recv's + // from the gatewayd. + + size_t total_bytes_; + // The total number of bytes sent/received to the gatewayd thus far. + + int first_time_; + // Used to call register_stdin_handle only once. Otherwise, thread + // leak will occur on Win32. +}; + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class ACE_Svc_Export ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +class ACE_Svc_Export Peer_Acceptor : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> +{ + // = TITLE + // Passively accept connections from gatewayd and dynamically + // create a new <Peer_Handler> object to communicate with the + // gatewayd. +public: + // = Initialization and termination methods. + Peer_Acceptor (void); + // Default initialization. + + int start (u_short); + // the <Peer_Acceptor>. + + int close (void); + // Terminate the <Peer_Acceptor>. + + virtual int make_svc_handler (Peer_Handler *&); + // Factory method that creates a <Peer_Handler> just once. + +private: + int open_acceptor (u_short port); + // Factor out common code for initializing the <Peer_Acceptor>. + + Peer_Handler *peer_handler_; + // Pointer to <Peer_Handler> allocated just once. + + ACE_INET_Addr addr_; + // Our acceptor addr. + + typedef ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> inherited; +}; + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class ACE_Svc_Export ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +class ACE_Svc_Export Peer_Connector : public ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR> +{ + // = TITLE + // Actively establish connections with gatewayd and dynamically + // create a new <Peer_Handler> object to communicate with the + // gatewayd. +public: + // = Initialization method. + int open (ACE_Reactor * = 0, int = 0); + // Initialize the <Peer_Connector>. NOTE: the arguments are + // ignored. They are only provided to avoid a compiler warning + // about hiding the virtual function ACE_Connector<Peer_Handler, + // ACE_SOCK_CONNECTOR>::open(ACE_Reactor*, int). + +private: + int open_connector (Peer_Handler *&ph, u_short port); + // Factor out common code for initializing the <Peer_Connector>. + + Peer_Handler *consumer_peer_handler_; + // Consumer <Peer_Handler> that is connected to a gatewayd. + + Peer_Handler *supplier_peer_handler_; + // Supplier <Peer_Handler> that is connected to a gatewayd. +}; + +class ACE_Svc_Export Peer_Factory : public ACE_Service_Object +{ + // = TITLE + // A factory class that actively and/or passively establishes + // connections with the gatewayd. +public: + // = Dynamic initialization and termination hooks from <ACE_Service_Object>. + + virtual int init (int argc, ACE_TCHAR *argv[]); + // Initialize the acceptor and connector. + + virtual int fini (void); + // Perform termination activities. + + virtual int info (ACE_TCHAR **, size_t) const; + // Return info about this service. + + virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); + // Handle various signals (e.g., SIGPIPE, SIGINT, and SIGQUIT). + +private: + Peer_Acceptor consumer_acceptor_; + // Pointer to an instance of our <Peer_Acceptor> that's used to + // accept connections and create Consumers. + + Peer_Acceptor supplier_acceptor_; + // Pointer to an instance of our <Peer_Acceptor> that's used to + // accept connections and create Suppliers. + + Peer_Connector connector_; + // An instance of our <Peer_Connector>. Note that one + // <Peer_Connector> is used to establish <Peer_Handler>s for both + // Consumers and Suppliers. +}; + +#endif /* PEER_H */ |