summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/IO_Handler.h
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/IO_Handler.h')
-rw-r--r--apps/Gateway/Gateway/IO_Handler.h224
1 files changed, 224 insertions, 0 deletions
diff --git a/apps/Gateway/Gateway/IO_Handler.h b/apps/Gateway/Gateway/IO_Handler.h
new file mode 100644
index 00000000000..c22f1a3df26
--- /dev/null
+++ b/apps/Gateway/Gateway/IO_Handler.h
@@ -0,0 +1,224 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// IO_Handler.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_IO_HANDLER)
+#define _IO_HANDLER
+
+#include "ace/Service_Config.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/Svc_Handler.h"
+#include "Consumer_Map.h"
+#include "Consumer_Entry.h"
+#include "Event.h"
+
+// Forward declaration.
+class IO_Handler_Connector;
+
+class IO_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
+ // = TITLE
+ // IO_Handler contains info about connection state and addressing.
+ //
+ // = DESCRIPTION
+ // The IO_Handler classes process events sent from the peers to the
+ // gateway. These classes works as follows:
+ //
+ // 1. IO_Handler_Connector creates a number of connections with the set of
+ // peers specified in a configuration file.
+ //
+ // 2. For each peer that connects successfully, IO_Handler_Connector
+ // creates an IO_Handler object. Each object assigns a unique routing
+ // id to its associated peer. The Handlers are used by gatewayd
+ // that to receive, route, and forward events from source peer(s)
+ // to destination peer(s).
+{
+public:
+ IO_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+
+ virtual int open (void * = 0);
+ // Initialize and activate a single-threaded IO_Handler (called by
+ // ACE_Connector::handle_output()).
+
+ int bind (const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ CONN_ID);
+ // Set the peer's addressing and routing information.
+
+ ACE_INET_Addr &remote_addr (void);
+ // Returns the peer's routing address.
+
+ ACE_INET_Addr &local_addr (void);
+ // Returns our local address.
+
+ // = Set/get routing id.
+ CONN_ID id (void);
+ void id (CONN_ID);
+
+ // = Set/get the current state of the IO_Handler.
+ enum State
+ {
+ IDLE = 1, // Prior to initialization.
+ CONNECTING, // During connection establishment.
+ ESTABLISHED, // IO_Handler is established and active.
+ DISCONNECTING, // IO_Handler is in the process of connecting.
+ FAILED // IO_Handler has failed.
+ };
+
+ // = Set/get the current state.
+ State state (void);
+ void state (State);
+
+ // = Set/get the current retry timeout delay.
+ int timeout (void);
+ void timeout (int);
+
+ // = Set/get the maximum retry timeout delay.
+ int max_timeout (void);
+ void max_timeout (int);
+
+ // = Set/get IO_Handler activity status.
+ int active (void);
+ void active (int);
+
+ // = Set/get direction (necessary for error checking).
+ char direction (void);
+ void direction (char);
+
+ // = The total number of bytes sent/received on this channel.
+ size_t total_bytes (void);
+ void total_bytes (size_t bytes);
+ // Increment count by <bytes>.
+
+ virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
+ // Perform timer-based IO_Handler reconnection.
+
+protected:
+ enum
+ {
+ MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout.
+ };
+
+ int initialize_connection (void);
+ // Perform the first-time initiation of a connection to the peer.
+
+ int reinitiate_connection (void);
+ // Reinitiate a connection asynchronously when peers fail.
+
+ void socket_queue_size (void);
+ // Set the socket queue size.
+
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ // Perform IO_Handler termination.
+
+ Consumer_Map *consumer_map_;
+ // Pointer to table that maps an event
+ // to a Set of IO_Handler *'s for output.
+
+ ACE_INET_Addr remote_addr_;
+ // Address of peer.
+
+ ACE_INET_Addr local_addr_;
+ // Address of us.
+
+ CONN_ID id_;
+ // The assigned routing ID of this entry.
+
+ size_t total_bytes_;
+ // The total number of bytes sent/received on this channel.
+
+ State state_;
+ // The current state of the channel.
+
+ IO_Handler_Connector *connector_;
+ // Back pointer to IO_Handler_Connector to reestablish broken
+ // connections.
+
+ int timeout_;
+ // Amount of time to wait between reconnection attempts.
+
+ int max_timeout_;
+ // Maximum amount of time to wait between reconnection attempts.
+
+ char direction_;
+ // Indicates which direction data flows through the channel ('O' ==
+ // output and 'I' == input).
+
+ int socket_queue_size_;
+ // Size of the socket queue (0 means "use default").
+};
+
+class Supplier_Handler : public IO_Handler
+ // = TITLE
+ // Handle reception of Peer events arriving as events.
+{
+public:
+ Supplier_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+ // Constructor sets the consumer map pointer.
+
+ virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
+ // Receive and process peer events.
+
+protected:
+ virtual int recv (ACE_Message_Block *&);
+ // Receive an event from a Supplier.
+
+ int forward (ACE_Message_Block *event);
+ // Forward the Event to a Consumer.
+
+ ACE_Message_Block *msg_frag_;
+ // Keep track of event fragment to handle non-blocking recv's from
+ // Suppliers.
+};
+
+class Consumer_Handler : public IO_Handler
+ // = TITLE
+ // Handle transmission of events to other Peers using a
+ // single-threaded approach.
+{
+public:
+ Consumer_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // Send an event to a Consumer (may be queued if necessary).
+
+protected:
+ // = We'll allow up to 16 megabytes to be queued per-output
+ // channel.
+ enum {QUEUE_SIZE = 1024 * 1024 * 16};
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive and process shutdowns from a Consumer.
+
+ virtual int handle_output (ACE_HANDLE);
+ // Finish sending event when flow control conditions abate.
+
+ int nonblk_put (ACE_Message_Block *mb);
+ // Perform a non-blocking put().
+
+ virtual int send (ACE_Message_Block *);
+ // Send an event to a Consumer.
+};
+
+#endif /* _IO_HANDLER */