diff options
Diffstat (limited to 'apps')
31 files changed, 0 insertions, 4621 deletions
diff --git a/apps/Gateway/Gateway/Channel.cpp b/apps/Gateway/Gateway/Channel.cpp deleted file mode 100644 index 99699a6ee87..00000000000 --- a/apps/Gateway/Gateway/Channel.cpp +++ /dev/null @@ -1,710 +0,0 @@ - -// $Id$ - -#include "Routing_Entry.h" -#include "Channel_Connector.h" - -// Convenient short-hands. -#define CO CONDITION -#define MU MUTEX - -// = The total number of bytes sent/received on this channel. -size_t -Channel::total_bytes (void) -{ - return this->total_bytes_; -} - -void -Channel::total_bytes (size_t bytes) -{ - this->total_bytes_ += bytes; -} - -Channel::Channel (ROUTING_TABLE *rt, - Channel_Connector *cc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH> (thr_mgr), - routing_table_ (rt), - id_ (-1), - total_bytes_ (0), - state_ (Channel::IDLE), - connector_ (cc), - timeout_ (1), - max_timeout_ (Channel::MAX_RETRY_TIMEOUT), - socket_queue_size_ (socket_queue_size) -{ -} - -// Set the associated channel. - -void -Channel::active (int a) -{ - this->state (a == 0 ? Channel::IDLE : Channel::ESTABLISHED); -} - -// Get the associated channel. - -int -Channel::active (void) -{ - return this->state () == Channel::ESTABLISHED; -} - -// Set the direction. - -void -Channel::direction (char d) -{ - this->direction_ = d; -} - -// Get the direction. - -char -Channel::direction (void) -{ - return this->direction_; -} - -// Sets the timeout delay. - -void -Channel::timeout (int to) -{ - if (to > this->max_timeout_) - to = this->max_timeout_; - - this->timeout_ = to; -} - -// Recalculate the current retry timeout delay using exponential -// backoff. Returns the original timeout (i.e., before the -// recalculation). - -int -Channel::timeout (void) -{ - int old_timeout = this->timeout_; - this->timeout_ *= 2; - - if (this->timeout_ > this->max_timeout_) - this->timeout_ = this->max_timeout_; - - return old_timeout; -} - -// Sets the max timeout delay. - -void -Channel::max_timeout (int mto) -{ - this->max_timeout_ = mto; -} - -// Gets the max timeout delay. - -int -Channel::max_timeout (void) -{ - return this->max_timeout_; -} - -// Restart connection asynchronously when timeout occurs. - -int -Channel::handle_timeout (const ACE_Time_Value &, const void *) -{ - ACE_DEBUG ((LM_DEBUG, - "(%t) attempting to reconnect Channel %d with timeout = %d\n", - this->id (), this->timeout_)); - return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); -} - -// Restart connection (blocking_semantics dicates whether we -// restart synchronously or asynchronously). - -int -Channel::reinitiate_connection (void) -{ - // Skip over deactivated descriptors. - if (this->get_handle () != -1) - { - // Make sure to close down peer to reclaim descriptor. - this->peer ().close (); - -#if 0 -// if (this->state () == FAILED) -// { - // Reinitiate timeout to improve reconnection time. -// this->timeout (1); -#endif - - ACE_DEBUG ((LM_DEBUG, - "(%t) scheduling reinitiation of Channel %d\n", - this->id ())); - - // Reschedule ourselves to try and connect again. - if (ACE_Service_Config::reactor ()->schedule_timer (this, 0, - this->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - return 0; -} - -// Handle shutdown of the Channel object. - -int -Channel::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down Channel %d on handle %d\n", - this->id (), this->get_handle ())); - - return this->reinitiate_connection (); -} - -// Set the state of the channel. - -void -Channel::state (Channel::State s) -{ - this->state_ = s; -} - -// Perform the first-time initiation of a connection to the peer. - -int -Channel::initialize_connection (void) -{ - this->state_ = Channel::ESTABLISHED; - - // Restart the timeout to 1. - this->timeout (1); - -#if defined (ASSIGN_ROUTING_ID) - // Action that sends the route id to the peerd. - - CONN_ID id = htons (this->id ()); - - ssize_t n = this->peer ().send ((const void *) &id, sizeof id); - - if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - n == 0 ? "gatewayd has closed down unexpectedly" : "send"), -1); -#endif /* ASSIGN_ROUTING_ID */ - return 0; -} - -// Set the size of the socket queue. - -void -Channel::socket_queue_size (void) -{ - if (this->socket_queue_size_ > 0) - { - int option = this->direction_ == 'I' ? SO_RCVBUF : SO_SNDBUF; - - if (this->peer ().set_option (SOL_SOCKET, option, - &this->socket_queue_size_, sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); - } -} - -// Upcall from the ACE_Acceptor::handle_input() that -// delegates control to our application-specific Channel. - -int -Channel::open (void *a) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) Channel's fd = %d\n", this->peer ().get_handle ())); - - // Set the size of the socket queue. - this->socket_queue_size (); - - // Turn on non-blocking I/O. - if (this->peer ().enable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - // Call down to the base class to activate and register this handler. - if (this->ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH>::open (a) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); - - return this->initialize_connection (); -} - -// Return the current state of the channel. - -Channel::State -Channel::state (void) -{ - return this->state_; -} - -void -Channel::id (CONN_ID id) -{ - this->id_ = id; -} - -CONN_ID -Channel::id (void) -{ - return this->id_; -} - -// Set the peer's address information. -int -Channel::bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - CONN_ID id) -{ - this->remote_addr_ = remote_addr; - this->local_addr_ = local_addr; - this->id_ = id; - return 0; -} - -ACE_INET_Addr & -Channel::remote_addr (void) -{ - return this->remote_addr_; -} - -ACE_INET_Addr & -Channel::local_addr (void) -{ - return this->local_addr_; -} - -// Constructor sets the routing table pointer. - -Output_Channel::Output_Channel (ROUTING_TABLE *rt, - Channel_Connector *cc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Channel (rt, cc, thr_mgr, socket_queue_size) -{ - this->direction_ = 'O'; - this->msg_queue ()->high_water_mark (Output_Channel::QUEUE_SIZE); -} - -// This method should be called only when the peer shuts down -// unexpectedly. This method simply marks the Channel as -// having failed so that handle_close () can reconnect. - -int -Output_Channel::handle_input (ACE_HANDLE) -{ - char buf[1]; - - this->state (Channel::FAILED); - - switch (this->peer ().recv (buf, sizeof buf)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has failed unexpectedly for Output Channel %d\n", - this->id ()), -1); - /* NOTREACHED */ - case 0: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has shutdown unexpectedly for Output Channel %d\n", - this->id ()), -1); - /* NOTREACHED */ - default: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer is sending input on Output Channel %d\n", - this->id ()), -1); - /* NOTREACHED */ - } -} - -int -Output_Channel::svc (void) -{ - ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Output_Channel!\n"), -1); -} - -// Perform a non-blocking put() of message MB. If we are unable to -// send the entire message the remainder is re-queued at the *front* of -// the Message_List. - -int -Output_Channel::nonblk_put (ACE_Message_Block *mb) -{ - // Try to send the message. If we don't send it all (e.g., due to - // flow control), then re-queue the remainder at the head of the - // Message_List and ask the ACE_Reactor to inform us (via - // handle_output()) when it is possible to try again. - - ssize_t n; - - if ((n = this->send_peer (mb)) == -1) - { - // Things have gone wrong, let's try to close down and set up a new reconnection. - this->state (Channel::FAILED); - this->handle_close (); - return -1; - } - else if (errno == EWOULDBLOCK) // Didn't manage to send everything. - { - ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", - this->get_handle (), this->id ())); - - // ACE_Queue in *front* of the list to preserve order. - if (this->msg_queue ()->enqueue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); - - // Tell ACE_Reactor to call us back when we can send again. - else if (ACE_Service_Config::reactor ()-> - schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); - return 0; - } - else - return n; -} - -int -Output_Channel::send_peer (ACE_Message_Block *mb) -{ - ssize_t n; - size_t len = mb->length (); - - if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0) - return errno == EWOULDBLOCK ? 0 : n; - else if (n < len) - // Re-adjust pointer to skip over the part we did send. - mb->rd_ptr (n); - else /* if (n == length) */ - { - // The whole message is sent, we can now safely deallocate the buffer. - // Note that this should decrement a reference count... - delete mb; - errno = 0; - } - this->total_bytes (n); - return n; -} - -// Finish sending a message when flow control conditions abate. -// This method is automatically called by the ACE_Reactor. - -int -Output_Channel::handle_output (ACE_HANDLE) -{ - ACE_Message_Block *mb = 0; - int status = 0; - - ACE_DEBUG ((LM_DEBUG, "(%t) in handle_output on handle %d\n", this->get_handle ())); - // The list had better not be empty, otherwise there's a bug! - - if (this->msg_queue ()->dequeue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) - { - switch (this->nonblk_put (mb)) - { - case 0: // Partial send. - ACE_ASSERT (errno == EWOULDBLOCK); - // Didn't write everything this time, come back later... - break; - - case -1: - // Caller is responsible for freeing a ACE_Message_Block if failures occur. - delete mb; - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); - - /* FALLTHROUGH */ - default: // Sent the whole thing. - - // If we succeed in writing the entire message (or we did not fail - // due to EWOULDBLOCK) then check if there are more messages on the Message_List. - // If there aren't, tell the ACE_Reactor not to notify us anymore (at least - // until there are new messages queued up). - - if (this->msg_queue ()->is_empty ()) - { - ACE_DEBUG ((LM_DEBUG, "(%t) queueing deactivated on handle %d to routing id %d\n", - this->get_handle (), this->id ())); - - - if (ACE_Service_Config::reactor ()-> - cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup")); - } - } - } - else - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head")); - return 0; -} - -// Send a message to a peer (may queue if necessary). - -int -Output_Channel::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - if (this->msg_queue ()->is_empty ()) - // Try to send the message *without* blocking! - return this->nonblk_put (mb); - else - // If we have queued up messages due to flow control - // then just enqueue and return. - return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -// Constructor sets the routing table pointer and the connector pointer. - -Input_Channel::Input_Channel (ROUTING_TABLE *rt, - Channel_Connector *cc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : msg_frag_ (0), - Channel (rt, cc, thr_mgr, socket_queue_size) -{ - this->direction_ = 'I'; - this->msg_queue ()->high_water_mark (0); -} - -int -Input_Channel::put (ACE_Message_Block *, ACE_Time_Value *) -{ - ACE_ERROR_RETURN ((LM_ERROR, "(%t) put should not be called on Input_Channel!\n"), -1); -} - -int -Input_Channel::svc (void) -{ - ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Input_Channel!\n"), -1); -} - -// Receive a Peer message from peerd. Handles fragmentation. -// -// The routing message returned from recv_peer consists of two parts: -// 1. The Address part, contains the virtual routing id. -// 2. The Data part, which contains the actual data to be routed. -// -// The reason for having two parts is to shield the higher layers -// of software from knowledge of the message structure. - -int -Input_Channel::recv_peer (ACE_Message_Block *&route_addr) -{ - Peer_Message *peer_msg; - size_t len; - ssize_t n = 0; - ssize_t m = 0; - size_t offset = 0; - - if (this->msg_frag_ == 0) - // No existing fragment... - ACE_NEW_RETURN (this->msg_frag_, - ACE_Message_Block (sizeof (Peer_Message)), - -1); - - peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr (); - - const ssize_t HEADER_SIZE = sizeof (Peer_Header); - ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length (); - - if (header_bytes_left_to_read > 0) - { - n = this->peer ().recv (this->msg_frag_->wr_ptr (), header_bytes_left_to_read); - - if (n == -1 /* error */ - || n == 0 /* EOF */) - { - ACE_ERROR ((LM_ERROR, "%p\n", "Recv error during header read ")); - ACE_DEBUG ((LM_DEBUG, "attempted to read %d\n", header_bytes_left_to_read)); - delete this->msg_frag_; - this->msg_frag_ = 0; - return n; - } - - // Bump the write pointer by the amount read. - this->msg_frag_->wr_ptr (n); - - // At this point we may or may not have the ENTIRE header. - if (this->msg_frag_->length () < HEADER_SIZE) - { - ACE_DEBUG ((LM_DEBUG, "Partial header received: only %d bytes\n", - this->msg_frag_->length ())); - // Notify the caller that we didn't get an entire message. - errno = EWOULDBLOCK; - return -1; - } - } - - // At this point there is a complete, valid header in msg_frag_ - len = sizeof peer_msg->buf_ + HEADER_SIZE - this->msg_frag_->length (); - - // Try to receive the remainder of the message - - switch (m = this->peer ().recv (peer_msg->buf_ + offset, len)) - { - case -1: - if (errno == EWOULDBLOCK) - { - // This shouldn't happen since the ACE_Reactor - // just triggered us to handle pending I/O! - ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n")); - errno = EWOULDBLOCK; - return -1; - } - else - /* FALLTHROUGH */; - - case 0: // Premature EOF. - delete this->msg_frag_; - this->msg_frag_ = 0; - return 0; - - default: - if (m != len) - // Re-adjust pointer to skip over the part we've read. - { - this->msg_frag_->wr_ptr (m); - errno = EWOULDBLOCK; - return -1; // Inform caller that we didn't get the whole message. - } - else - { - // Set the write pointer at 1 past the end of the message. - this->msg_frag_->wr_ptr (m); - - // Set the read pointer to the beginning of the message. - this->msg_frag_->rd_ptr (this->msg_frag_->base ()); - - // Allocate a routing message header and chain the data portion - // onto its continuation field. - ACE_NEW_RETURN (route_addr, - ACE_Message_Block (sizeof (Peer_Addr), - ACE_Message_Block::MB_PROTO, - this->msg_frag_), - -1); - - Peer_Addr peer_addr (this->id (), peer_msg->header_.routing_id_, 0); - // Copy the routing address from the Peer_Message into routing_addr. - route_addr->copy ((char *) &peer_addr, sizeof (Peer_Addr)); - - // Reset the pointer to indicate we've got an entire message. - this->msg_frag_ = 0; - } - this->total_bytes (m + n); -#if defined (VERBOSE) - ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s", - peer_addr.conn_id_, peer_msg->header_.routing_id_, peer_msg->header_.len_, - peer_msg->header_.len_, peer_msg->buf_)); -#else - ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n", - peer_msg->header_.routing_id_, peer_msg->header_.len_, this->total_bytes ())); -#endif - return m + n; - } -} - -// Receive various types of input (e.g., Peer message from the -// gatewayd, as well as stdio). - -int -Input_Channel::handle_input (ACE_HANDLE) -{ - ACE_Message_Block *route_addr = 0; - - switch (this->recv_peer (route_addr)) - { - case 0: - // Note that a peer should never initiate a shutdown. - this->state (Channel::FAILED); - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has closed down unexpectedly for Input Channel %d\n", - this->id ()), -1); - /* NOTREACHED */ - case -1: - if (errno == EWOULDBLOCK) - // A short-read, we'll come back and finish it up later on! - return 0; - else // A weird problem occurred, shut down and start again. - { - this->state (Channel::FAILED); - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Channel %d\n", - "Peer has failed unexpectedly", - this->id ()), -1); - } - /* NOTREACHED */ - default: - return this->route_message (route_addr); - } -} - -// Route a message to its appropriate destination. - -int -Input_Channel::route_message (ACE_Message_Block *route_addr) -{ - // We got a valid message, so determine its virtual routing id, - // which is stored in the first of the two message blocks chained together. - - Peer_Addr *routing_key = (Peer_Addr *) route_addr->rd_ptr (); - - // Skip over the address portion. - const ACE_Message_Block *const data = route_addr->cont (); - - // RE points to the routing entry located for this routing id. - Routing_Entry *re = 0; - - if (this->routing_table_->find (*routing_key, re) != -1) - { - // Check to see if there are any destinations. - if (re->destinations ()->size () == 0) - ACE_DEBUG ((LM_WARNING, - "there are no active destinations for this message currently\n")); - - else // There are destinations, so forward the message. - { - Routing_Entry::ENTRY_SET *esp = re->destinations (); - Routing_Entry::ENTRY_ITERATOR si (*esp); - - for (Channel **channel = 0; si.next (channel) != 0; si.advance ()) - { - // Only process active channels. - if ((*channel)->active ()) - { - // Clone the message portion (should be doing reference counting here...) - ACE_Message_Block *newmsg = data->clone (); - - ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ())); - - if ((*channel)->put (newmsg) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", - "gateway is flow controlled, so we're dropping messages")); - else - ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n", - "put", (*channel)->id ())); - - // Caller is responsible for freeing a ACE_Message_Block if failures occur. - delete newmsg; - } - } - } - // Will become superfluous once we have reference counting... - delete route_addr; - return 0; - } - } - delete route_addr; - // Failure return. - ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", - routing_key->conn_id_, routing_key->logical_id_, routing_key->payload_)); - return 0; -} - -#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Map_Manager<Peer_Addr, Routing_Entry *, MUTEX>; -template class ACE_Map_Iterator<Peer_Addr, Routing_Entry *, MUTEX>; -template class ACE_Map_Entry<Peer_Addr, Routing_Entry *>; -#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/Channel.h b/apps/Gateway/Gateway/Channel.h deleted file mode 100644 index 339716bc55a..00000000000 --- a/apps/Gateway/Gateway/Channel.h +++ /dev/null @@ -1,280 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Channel.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_CHANNEL) -#define _CHANNEL - -#include "ace/Service_Config.h" -#include "ace/INET_Addr.h" -#include "ace/SOCK_Connector.h" -#include "ace/Svc_Handler.h" -#include "Routing_Table.h" -#include "Routing_Entry.h" -#include "Peer_Message.h" - -// The following typedefs are used in order to parameterize the -// synchronization policies without changing the source code! - -// If we don't have threads then use the single-threaded synchronization. -#if !defined (ACE_HAS_THREADS) -#define SYNCH ACE_NULL_SYNCH -typedef ACE_Null_Mutex MUTEX; -#define CHANNEL_PEER_STREAM ACE_SOCK_STREAM -#define CHANNEL_PEER_CONNECTOR ACE_SOCK_CONNECTOR -#else /* ACE_HAS_THREADS */ - -// Select communication mechanisms. -#if 0 // defined (ACE_HAS_TLI) -// Note that due to inconsistencies between the semantics of sockets -// and TLI with respect to establishing non-blocking connections it's -// not a good idea to use TLI... -#include "ace/TLI_Connector.h" -#define CHANNEL_PEER_STREAM ACE_TLI_STREAM -#define CHANNEL_PEER_CONNECTOR ACE_TLI_CONNECTOR -#else -#define CHANNEL_PEER_STREAM ACE_SOCK_STREAM -#define CHANNEL_PEER_CONNECTOR ACE_SOCK_CONNECTOR -#endif /* 0 */ - -// Note that we only need to make the ACE_Task thread-safe if we -// are using the multi-threaded Thr_Output_Channel... -#if defined (USE_OUTPUT_MT) -#define SYNCH ACE_MT_SYNCH -#else -#define SYNCH ACE_NULL_SYNCH -#endif /* USE_OUTPUT_MT || USE_INPUT_MT */ - -// Note that we only need to make the ACE_Map_Manager thread-safe if -// we are using the multi-threaded Thr_Input_Channel... -#if defined (USE_INPUT_MT) -typedef ACE_RW_Mutex MUTEX; -#else -typedef ACE_Null_Mutex MUTEX; -#endif /* USE_INPUT_MT */ -#endif /* ACE_HAS_THREADS */ - -// Typedef for the routing table. -typedef Routing_Table<Peer_Addr, Routing_Entry, MUTEX> - ROUTING_TABLE; - -// Forward declaration. -class Channel_Connector; - -class Channel : public ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH> - // = TITLE - // Channel contains info about connection state and addressing. - // - // = DESCRIPTION - // The Channel classes process messages sent from the peers to the - // gateway. These classes works as follows: - // - // 1. Channel_Connector creates a number of connections with the set of - // peers specified in a configuration file. - // - // 2. For each peer that connects successfully, Channel_Connector - // creates an Channel object. Each object assigns a unique routing - // id to its associated peer. The Channels are used by gatewayd - // that to receive, route, and forward messages from source peer(s) - // to destination peer(s). -{ -public: - Channel (ROUTING_TABLE *, - Channel_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - - virtual int open (void * = 0); - // Initialize and activate a single-threaded Channel (called by - // ACE_Connector::handle_output()). - - int bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - CONN_ID); - // Set the peer's addressing and routing information. - - ACE_INET_Addr &remote_addr (void); - // Returns the peer's routing address. - - ACE_INET_Addr &local_addr (void); - // Returns our local address. - - // = Set/get routing id. - CONN_ID id (void); - void id (CONN_ID); - - // = Set/get the current state of the Channel. - enum State - { - IDLE = 1, // Prior to initialization. - CONNECTING, // During connection establishment. - ESTABLISHED, // Channel is established and active. - DISCONNECTING, // Channel is in the process of connecting. - FAILED // Channel has failed. - }; - - // = Set/get the current state. - State state (void); - void state (State); - - // = Set/get the current retry timeout delay. - int timeout (void); - void timeout (int); - - // = Set/get the maximum retry timeout delay. - int max_timeout (void); - void max_timeout (int); - - // = Set/get Channel activity status. - int active (void); - void active (int); - - // = Set/get direction (necessary for error checking). - char direction (void); - void direction (char); - - // = The total number of bytes sent/received on this channel. - size_t total_bytes (void); - void total_bytes (size_t bytes); - // Increment count by <bytes>. - - virtual int handle_timeout (const ACE_Time_Value &, const void *arg); - // Perform timer-based Channel reconnection. - -protected: - enum - { - MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. - }; - - int initialize_connection (void); - // Perform the first-time initiation of a connection to the peer. - - int reinitiate_connection (void); - // Reinitiate a connection asynchronously when peers fail. - - void socket_queue_size (void); - // Set the socket queue size. - - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); - // Perform Channel termination. - - ROUTING_TABLE *routing_table_; - // Pointer to table that maps a Peer_Addr - // to a Set of Channel *'s for output. - - ACE_INET_Addr remote_addr_; - // Address of peer. - - ACE_INET_Addr local_addr_; - // Address of us. - - CONN_ID id_; - // The assigned routing ID of this entry. - - size_t total_bytes_; - // The total number of bytes sent/received on this channel. - - State state_; - // The current state of the channel. - - Channel_Connector *connector_; - // Back pointer to Channel_Connector to reestablish broken - // connections. - - int timeout_; - // Amount of time to wait between reconnection attempts. - - int max_timeout_; - // Maximum amount of time to wait between reconnection attempts. - - char direction_; - // Indicates which direction data flows through the channel ('O' == - // output and 'I' == input). - - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). -}; - -class Input_Channel : public Channel - // = TITLE - // Handle reception of Peer messages arriving as events. -{ -public: - Input_Channel (ROUTING_TABLE *, - Channel_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - // Constructor sets the routing table pointer. - - virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - // Receive and process peer messages. - -protected: - virtual int recv_peer (ACE_Message_Block *&); - // Receive a message from a peer. - - int route_message (ACE_Message_Block *); - // Action that receives messages from peerd. - - ACE_Message_Block *msg_frag_; - // Keep track of message fragment to handle non-blocking recv's from - // peers. - - virtual int svc (void); - // This method is not used since we are single-threaded. - -private: - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); - // This methods should not be called to handle input. -}; - -class Output_Channel : public Channel - // = TITLE - // Handle transmission of messages to other Peers using a - // single-threaded approach. -{ -public: - Output_Channel (ROUTING_TABLE *, - Channel_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); - // Send a message to a gateway (may be queued if necessary). - -protected: - // = We'll allow up to 16 megabytes to be queued per-output - // channel. - enum {QUEUE_SIZE = 1024 * 1024 * 16}; - - virtual int handle_input (ACE_HANDLE); - // Receive and process shutdowns from peer. - - virtual int handle_output (ACE_HANDLE); - // Finish sending a message when flow control conditions abate. - - int nonblk_put (ACE_Message_Block *mb); - // Perform a non-blocking put(). - - virtual int send_peer (ACE_Message_Block *); - // Send a message to a peer. - - virtual int svc (void); - // This method is not used since we are single-threaded. -}; - -#endif /* _CHANNEL */ diff --git a/apps/Gateway/Gateway/Channel_Connector.cpp b/apps/Gateway/Gateway/Channel_Connector.cpp deleted file mode 100644 index a5394e8b013..00000000000 --- a/apps/Gateway/Gateway/Channel_Connector.cpp +++ /dev/null @@ -1,92 +0,0 @@ -#include "Channel_Connector.h" -// $Id$ - - -Channel_Connector::Channel_Connector (void) -{ -} - -// Override the connection-failure method to add timer support. -// Note that these timers perform "expoential backoff" to -// avoid rapidly trying to reestablish connections when a link -// goes down. - -int -Channel_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) -{ - ACE_Connector<Channel, CHANNEL_PEER_CONNECTOR>::AST *stp = 0; - - // Locate the ACE_Svc_Handler corresponding to the socket descriptor. - if (this->handler_map_.find (sd, stp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", - sd, "find"), -1); - - Channel *channel = stp->svc_handler (); - - // Schedule a reconnection request at some point in the future - // (note that channel uses an exponential backoff scheme). - if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, - channel->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - return 0; -} - -// Initiate (or reinitiate) a connection to the Channel. - -int -Channel_Connector::initiate_connection (Channel *channel, - ACE_Synch_Options &synch_options) -{ - char buf[MAXHOSTNAMELEN]; - - // Mark ourselves as idle so that the various iterators - // will ignore us until we are reconnected. - channel->state (Channel::IDLE); - - if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 - || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "can't obtain peer's address"), -1); - - // Try to connect to the Peer. - - if (this->connect (channel, channel->remote_addr (), - synch_options, channel->local_addr ()) == -1) - { - if (errno != EWOULDBLOCK) - { - channel->state (Channel::FAILED); - ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", - "connect", buf)); - - // Reschedule ourselves to try and connect again. - if (synch_options[ACE_Synch_Options::USE_REACTOR]) - { - if (ACE_Service_Config::reactor ()->schedule_timer - (channel, 0, channel->timeout ()) == 0) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - else - // Failures on synchronous connects are reported as errors - // so that the caller can decide how to proceed. - return -1; - } - else - { - channel->state (Channel::CONNECTING); - ACE_DEBUG ((LM_DEBUG, - "(%t) in the process of connecting %s to %s\n", - synch_options[ACE_Synch_Options::USE_REACTOR] - ? "asynchronously" : "synchronously", buf)); - } - } - else - { - channel->state (Channel::ESTABLISHED); - ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", - buf, channel->get_handle ())); - } - return 0; -} diff --git a/apps/Gateway/Gateway/Channel_Connector.h b/apps/Gateway/Gateway/Channel_Connector.h deleted file mode 100644 index 3e27f37355a..00000000000 --- a/apps/Gateway/Gateway/Channel_Connector.h +++ /dev/null @@ -1,41 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Channel_Connector.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_CHANNEL_CONNECTOR) -#define _CHANNEL_CONNECTOR - -#include "ace/Connector.h" -#include "Thr_Channel.h" - -class Channel_Connector : public ACE_Connector<Channel, CHANNEL_PEER_CONNECTOR> - // = TITLE - // A concrete factory class that setups connections to peerds - // and produces a new Channel object to do the dirty work... -{ -public: - Channel_Connector (void); - - // Initiate (or reinitiate) a connection on the Channel. - int initiate_connection (Channel *, - ACE_Synch_Options & = ACE_Synch_Options::synch); - -protected: - // Override the connection-failure method to add timer support. - virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask); -}; - -#endif /* _CHANNEL_CONNECTOR */ diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h deleted file mode 100644 index 28e59a4b2e6..00000000000 --- a/apps/Gateway/Gateway/Concurrency_Strategies.h +++ /dev/null @@ -1,74 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Concurrency_strategies.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_CONCURRENCY_STRATEGIES) -#define _CONCURRENCY_STRATEGIES - -#include "ace/Synch.h" - -// The following typedefs are used in order to parameterize the -// synchronization policies without changing the source code! - -// If we don't have threads then use the single-threaded synchronization. -#if !defined (ACE_HAS_THREADS) -#define SYNCH_STRATEGY ACE_NULL_SYNCH -typedef ACE_Null_Mutex MAP_MUTEX; -#else /* ACE_HAS_THREADS */ - -// Note that we only need to make the ACE_Task thread-safe if we are -// using the multi-threaded Thr_Consumer_Proxy... -#if defined (USE_OUTPUT_MT) -#define SYNCH_STRATEGY ACE_MT_SYNCH -#else -#define SYNCH_STRATEGY ACE_NULL_SYNCH -#endif /* USE_OUTPUT_MT || USE_INPUT_MT */ - -// Note that we only need to make the ACE_Map_Manager thread-safe if -// we are using the multi-threaded Thr_Supplier_Proxy. In this -// case, we use an RW_Mutex since we'll lookup Consumers far more -// often than we'll update them. -#if defined (USE_INPUT_MT) -typedef ACE_RW_Mutex MAP_MUTEX; -#else -typedef ACE_Null_Mutex MAP_MUTEX; -#endif /* USE_INPUT_MT */ -#endif /* ACE_HAS_THREADS */ - -// = Forward decls -class Thr_Consumer_Proxy; -class Thr_Supplier_Proxy; -class Consumer_Proxy; -class Supplier_Proxy; - -#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) -#if defined (USE_OUTPUT_MT) -typedef Thr_Consumer_Proxy CONSUMER_PROXY; -#else -typedef Consumer_Proxy CONSUMER_PROXY; -#endif /* USE_OUTPUT_MT */ - -#if defined (USE_INPUT_MT) -typedef Thr_Supplier_Proxy SUPPLIER_PROXY; -#else -typedef Supplier_Proxy SUPPLIER_PROXY; -#endif /* USE_INPUT_MT */ -#else -// Instantiate a non-multi-threaded Gateway. -typedef Supplier_Proxy SUPPLIER_PROXY; -typedef Consumer_Proxy CONSUMER_PROXY; -#endif /* ACE_HAS_THREADS */ - -#endif /* _CONCURRENCY_STRATEGIES */ diff --git a/apps/Gateway/Gateway/Consumer_Entry.cpp b/apps/Gateway/Gateway/Consumer_Entry.cpp deleted file mode 100644 index c3dcd96ebbf..00000000000 --- a/apps/Gateway/Gateway/Consumer_Entry.cpp +++ /dev/null @@ -1,31 +0,0 @@ -// Defines an entry in the Consumer Map. -// $Id$ - -#include "Consumer_Entry.h" - -Consumer_Entry::Consumer_Entry (void) -{ - ACE_NEW (this->destinations_, Consumer_Entry::ENTRY_SET); -} - -Consumer_Entry::~Consumer_Entry (void) -{ - delete this->destinations_; -} - -// Get the associated set of destinations. - -Consumer_Entry::ENTRY_SET * -Consumer_Entry::destinations (void) -{ - return this->destinations_; -} - -// Set the associated set of destinations. - -void -Consumer_Entry::destinations (Consumer_Entry::ENTRY_SET *s) -{ - this->destinations_ = s; -} - diff --git a/apps/Gateway/Gateway/Consumer_Entry.h b/apps/Gateway/Gateway/Consumer_Entry.h deleted file mode 100644 index fe502991514..00000000000 --- a/apps/Gateway/Gateway/Consumer_Entry.h +++ /dev/null @@ -1,45 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Consumer_Entry.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_ROUTING_ENTRY) -#define _ROUTING_ENTRY - -#include "ace/Set.h" - -// Forward reference. -class IO_Handler; - -class Consumer_Entry -{ - // = TITLE - // Defines an entry in the Consumer_Map. -public: - Consumer_Entry (void); - ~Consumer_Entry (void); - - typedef ACE_Unbounded_Set<IO_Handler *> ENTRY_SET; - typedef ACE_Unbounded_Set_Iterator<IO_Handler *> ENTRY_ITERATOR; - - // = Set/get the associated set of destinations. - ENTRY_SET *destinations (void); - void destinations (ENTRY_SET *); - -protected: - ENTRY_SET *destinations_; - // The set of destinations; -}; - -#endif /* _ROUTING_ENTRY */ diff --git a/apps/Gateway/Gateway/Consumer_Map.cpp b/apps/Gateway/Gateway/Consumer_Map.cpp deleted file mode 100644 index 6d16601f949..00000000000 --- a/apps/Gateway/Gateway/Consumer_Map.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -#if !defined (_CONSUMER_MAP_C) -#define _CONSUMER_MAP_C - -#include "Consumer_Map.h" - -// Bind the Event_Addr to the INT_ID. - -int -Consumer_Map::bind (Event_Addr event_addr, - Consumer_Entry *Consumer_Entry) -{ - return this->map_.bind (event_addr, Consumer_Entry); -} - -// Find the Consumer_Entry corresponding to the Event_Addr. - -int -Consumer_Map::find (Event_Addr event_addr, - Consumer_Entry *&Consumer_Entry) -{ - return this->map_.find (event_addr, Consumer_Entry); -} - -// Unbind (remove) the Event_Addr from the map. - -int -Consumer_Map::unbind (Event_Addr event_addr) -{ - return this->map_.unbind (event_addr); -} - -Consumer_Map_Iterator::Consumer_Map_Iterator (Consumer_Map &rt) - : map_iter_ (rt.map_) -{ -} - -int -Consumer_Map_Iterator::next (Consumer_Entry *&ss) -{ - // Loop in order to skip over inactive entries if necessary. - - for (ACE_Map_Entry<Event_Addr, Consumer_Entry *> *temp = 0; - this->map_iter_.next (temp) != 0; - this->advance ()) - { - // Otherwise, return the next item. - ss = temp->int_id_; - return 1; - } - return 0; -} - -int -Consumer_Map_Iterator::advance (void) -{ - return this->map_iter_.advance (); -} -#endif /* _CONSUMER_MAP_C */ diff --git a/apps/Gateway/Gateway/Consumer_Map.h b/apps/Gateway/Gateway/Consumer_Map.h deleted file mode 100644 index fd392afaf6e..00000000000 --- a/apps/Gateway/Gateway/Consumer_Map.h +++ /dev/null @@ -1,62 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Consumer_Map.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_CONSUMER_MAP_H) -#define _CONSUMER_MAP_H - -#include "ace/Map_Manager.h" -#include "Concurrency_Strategies.h" -#include "Event.h" -#include "Consumer_Entry.h" - -class Consumer_Map -{ - // = TITLE - // Define a generic consumer map based on the ACE Map_Manager. - // - // = DESCRIPTION - // This class makes it easier to use the Map_Manager. -public: - int bind (Event_Addr event, Consumer_Entry *Consumer_Entry); - // Associate Event with the Consumer_Entry. - - int find (Event_Addr event, Consumer_Entry *&Consumer_Entry); - // Break any association of EXID. - - int unbind (Event_Addr event); - // Locate EXID and pass out parameter via INID. If found, - // return 0, else -1. - -public: - ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_; - // Map that associates Event Addrs (external ids) with Consumer_Entry *'s - // <internal IDs>. -}; - -class Consumer_Map_Iterator -{ - // = TITLE - // Define an iterator for the Consumer Map. -public: - Consumer_Map_Iterator (Consumer_Map &mm); - int next (Consumer_Entry *&); - int advance (void); - -private: - ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_iter_; - // Map we are iterating over. -}; -#endif /* _CONSUMER_MAP_H */ diff --git a/apps/Gateway/Gateway/Dispatch_Set.h b/apps/Gateway/Gateway/Dispatch_Set.h deleted file mode 100644 index a867f1ca5ff..00000000000 --- a/apps/Gateway/Gateway/Dispatch_Set.h +++ /dev/null @@ -1,28 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Dispatch_Set.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_DISPATCH_SET) -#define _DISPATCH_SET - -#include "ace/Set.h" - -// Forward reference. -class Proxy_Handler; - -typedef ACE_Unbounded_Set<Proxy_Handler *> Dispatch_Set; -typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Dispatch_Set_Iterator; - -#endif /* _DISPATCH_SET */ diff --git a/apps/Gateway/Gateway/IO_Handler.cpp b/apps/Gateway/Gateway/IO_Handler.cpp deleted file mode 100644 index ba1b355b3ba..00000000000 --- a/apps/Gateway/Gateway/IO_Handler.cpp +++ /dev/null @@ -1,710 +0,0 @@ -// $Id$ - -#include "Consumer_Entry.h" -#include "IO_Handler_Connector.h" - -// Convenient short-hands. -#define CO CONDITION -#define MU MAP_MUTEX - -// The total number of bytes sent/received on this channel. - -size_t -IO_Handler::total_bytes (void) -{ - return this->total_bytes_; -} - -void -IO_Handler::total_bytes (size_t bytes) -{ - this->total_bytes_ += bytes; -} - -IO_Handler::IO_Handler (Consumer_Map *consumer_map, - IO_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), - consumer_map_ (consumer_map), - id_ (-1), - total_bytes_ (0), - state_ (IO_Handler::IDLE), - connector_ (ioc), - timeout_ (1), - max_timeout_ (IO_Handler::MAX_RETRY_TIMEOUT), - socket_queue_size_ (socket_queue_size) -{ -} - -// Set the associated channel. - -void -IO_Handler::active (int a) -{ - this->state (a == 0 ? IO_Handler::IDLE : IO_Handler::ESTABLISHED); -} - -// Get the associated channel. - -int -IO_Handler::active (void) -{ - return this->state () == IO_Handler::ESTABLISHED; -} - -// Set the direction. - -void -IO_Handler::direction (char d) -{ - this->direction_ = d; -} - -// Get the direction. - -char -IO_Handler::direction (void) -{ - return this->direction_; -} - -// Sets the timeout delay. - -void -IO_Handler::timeout (int to) -{ - if (to > this->max_timeout_) - to = this->max_timeout_; - - this->timeout_ = to; -} - -// Recalculate the current retry timeout delay using exponential -// backoff. Returns the original timeout (i.e., before the -// recalculation). - -int -IO_Handler::timeout (void) -{ - int old_timeout = this->timeout_; - this->timeout_ *= 2; - - if (this->timeout_ > this->max_timeout_) - this->timeout_ = this->max_timeout_; - - return old_timeout; -} - -// Sets the max timeout delay. - -void -IO_Handler::max_timeout (int mto) -{ - this->max_timeout_ = mto; -} - -// Gets the max timeout delay. - -int -IO_Handler::max_timeout (void) -{ - return this->max_timeout_; -} - -// Restart connection asynchronously when timeout occurs. - -int -IO_Handler::handle_timeout (const ACE_Time_Value &, const void *) -{ - ACE_DEBUG ((LM_DEBUG, - "(%t) attempting to reconnect IO_Handler %d with timeout = %d\n", - this->id (), this->timeout_)); - return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); -} - -// Restart connection (blocking_semantics dicates whether we -// restart synchronously or asynchronously). - -int -IO_Handler::reinitiate_connection (void) -{ - // Skip over deactivated descriptors. - if (this->get_handle () != -1) - { - // Make sure to close down peer to reclaim descriptor. - this->peer ().close (); - -#if 0 -// if (this->state () == FAILED) -// { - // Reinitiate timeout to improve reconnection time. -// this->timeout (1); -#endif - - ACE_DEBUG ((LM_DEBUG, - "(%t) scheduling reinitiation of IO_Handler %d\n", - this->id ())); - - // Reschedule ourselves to try and connect again. - if (ACE_Service_Config::reactor ()->schedule_timer - (this, 0, this->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - return 0; -} - -// Handle shutdown of the IO_Handler object. - -int -IO_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down IO_Handler %d on handle %d\n", - this->id (), this->get_handle ())); - - return this->reinitiate_connection (); -} - -// Set the state of the channel. - -void -IO_Handler::state (IO_Handler::State s) -{ - this->state_ = s; -} - -// Perform the first-time initiation of a connection to the peer. - -int -IO_Handler::initialize_connection (void) -{ - this->state_ = IO_Handler::ESTABLISHED; - - // Restart the timeout to 1. - this->timeout (1); - -#if defined (ASSIGN_SUPPLIER_ID) - // Action that sends the route id to the peerd. - - CONN_ID id = htons (this->id ()); - - ssize_t n = this->peer ().send ((const void *) &id, sizeof id); - - if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - n == 0 ? "gatewayd has closed down unexpectedly" : "send"), - -1); -#endif /* ASSIGN_SUPPLIER_ID */ - return 0; -} - -// Set the size of the socket queue. - -void -IO_Handler::socket_queue_size (void) -{ - if (this->socket_queue_size_ > 0) - { - int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; - - if (this->peer ().set_option (SOL_SOCKET, option, - &this->socket_queue_size_, sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); - } -} - -// Upcall from the ACE_Acceptor::handle_input() that -// delegates control to our application-specific IO_Handler. - -int -IO_Handler::open (void *a) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) IO_Handler's fd = %d\n", - this->peer ().get_handle ())); - - // Set the size of the socket queue. - this->socket_queue_size (); - - // Turn on non-blocking I/O. - if (this->peer ().enable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - // Call down to the base class to activate and register this handler. - if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); - - return this->initialize_connection (); -} - -// Return the current state of the channel. - -IO_Handler::State -IO_Handler::state (void) -{ - return this->state_; -} - -void -IO_Handler::id (CONN_ID id) -{ - this->id_ = id; -} - -CONN_ID -IO_Handler::id (void) -{ - return this->id_; -} - -// Set the peer's address information. -int -IO_Handler::bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - CONN_ID id) -{ - this->remote_addr_ = remote_addr; - this->local_addr_ = local_addr; - this->id_ = id; - return 0; -} - -ACE_INET_Addr & -IO_Handler::remote_addr (void) -{ - return this->remote_addr_; -} - -ACE_INET_Addr & -IO_Handler::local_addr (void) -{ - return this->local_addr_; -} - -// Constructor sets the consumer map pointer. - -Consumer_Handler::Consumer_Handler (Consumer_Map *consumer_map, - IO_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) -{ - this->direction_ = 'C'; - this->msg_queue ()->high_water_mark (Consumer_Handler::QUEUE_SIZE); -} - -// This method should be called only when the peer shuts down -// unexpectedly. This method simply marks the IO_Handler as -// having failed so that handle_close () can reconnect. - -int -Consumer_Handler::handle_input (ACE_HANDLE) -{ - char buf[1]; - - this->state (IO_Handler::FAILED); - - switch (this->peer ().recv (buf, sizeof buf)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has failed unexpectedly for Output IO_Handler %d\n", - this->id ()), -1); - /* NOTREACHED */ - case 0: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has shutdown unexpectedly for Output IO_Handler %d\n", - this->id ()), -1); - /* NOTREACHED */ - default: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer is sending input on Output IO_Handler %d\n", - this->id ()), -1); - /* NOTREACHED */ - } -} - -// Perform a non-blocking put() of event MB. If we are unable to -// send the entire event the remainder is re-queued at the *front* of -// the Event_List. - -int -Consumer_Handler::nonblk_put (ACE_Message_Block *mb) -{ - // Try to send the event. If we don't send it all (e.g., due to - // flow control), then re-queue the remainder at the head of the - // Event_List and ask the ACE_Reactor to inform us (via - // handle_output()) when it is possible to try again. - - ssize_t n = this->send (mb); - - if (n == -1) - { - // Things have gone wrong, let's try to close down and set up a new reconnection. - this->state (IO_Handler::FAILED); - this->handle_close (); - return -1; - } - else if (errno == EWOULDBLOCK) // Didn't manage to send everything. - { - ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", - this->get_handle (), this->id ())); - - // ACE_Queue in *front* of the list to preserve order. - if (this->msg_queue ()->enqueue_head - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); - - // Tell ACE_Reactor to call us back when we can send again. - else if (ACE_Service_Config::reactor ()-> - schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); - return 0; - } - else - return n; -} - -ssize_t -Consumer_Handler::send (ACE_Message_Block *mb) -{ - ssize_t len = mb->length (); - ssize_t n = this->peer ().send (mb->rd_ptr (), len); - - if (n <= 0) - return errno == EWOULDBLOCK ? 0 : n; - else if (n < len) - // Re-adjust pointer to skip over the part we did send. - mb->rd_ptr (n); - else /* if (n == length) */ - { - // The whole event is sent, we can now safely deallocate the - // buffer. Note that this should decrement a reference count... - delete mb; - errno = 0; - } - this->total_bytes (n); - return n; -} - -// Finish sending an event when flow control conditions abate. -// This method is automatically called by the ACE_Reactor. - -int -Consumer_Handler::handle_output (ACE_HANDLE) -{ - ACE_Message_Block *mb = 0; - - ACE_DEBUG ((LM_DEBUG, - "(%t) in handle_output on handle %d\n", - this->get_handle ())); - // The list had better not be empty, otherwise there's a bug! - - if (this->msg_queue ()->dequeue_head - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) - { - switch (this->nonblk_put (mb)) - { - case 0: // Partial send. - ACE_ASSERT (errno == EWOULDBLOCK); - // Didn't write everything this time, come back later... - break; - - case -1: - // Caller is responsible for freeing a ACE_Message_Block if failures occur. - delete mb; - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); - - /* FALLTHROUGH */ - default: // Sent the whole thing. - - // If we succeed in writing the entire event (or we did not - // fail due to EWOULDBLOCK) then check if there are more - // events on the Event_List. If there aren't, tell the - // ACE_Reactor not to notify us anymore (at least until - // there are new events queued up). - - if (this->msg_queue ()->is_empty ()) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) queueing deactivated on handle %d to routing id %d\n", - this->get_handle (), this->id ())); - - - if (ACE_Service_Config::reactor ()-> - cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup")); - } - } - } - else - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head")); - return 0; -} - -// Send an event to a peer (may queue if necessary). - -int -Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - if (this->msg_queue ()->is_empty ()) - // Try to send the event *without* blocking! - return this->nonblk_put (mb); - else - // If we have queued up events due to flow control then just - // enqueue and return. - return this->msg_queue ()->enqueue_tail - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -// Constructor sets the consumer map pointer and the connector -// pointer. - -Supplier_Handler::Supplier_Handler (Consumer_Map *consumer_map, - IO_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : msg_frag_ (0), - IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) -{ - this->direction_ = 'S'; - this->msg_queue ()->high_water_mark (0); -} - -// Receive a Peer event from peerd. Handles fragmentation. -// -// The routing event returned from recv consists of two parts: -// 1. The Address part, contains the virtual routing id. -// 2. The Data part, which contains the actual data to be routed. -// -// The reason for having two parts is to shield the higher layers -// of software from knowledge of the event structure. - -int -Supplier_Handler::recv (ACE_Message_Block *&forward_addr) -{ - Event *event; - ssize_t len; - ssize_t n = 0; - size_t offset = 0; - - if (this->msg_frag_ == 0) - // No existing fragment... - ACE_NEW_RETURN (this->msg_frag_, - ACE_Message_Block (sizeof (Event)), - -1); - - event = (Event *) this->msg_frag_->rd_ptr (); - - const ssize_t HEADER_SIZE = sizeof (Event_Header); - ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length (); - - if (header_bytes_left_to_read > 0) - { - n = this->peer ().recv (this->msg_frag_->wr_ptr (), - header_bytes_left_to_read); - - if (n == -1 /* error */ - || n == 0 /* EOF */) - { - ACE_ERROR ((LM_ERROR, "%p\n", - "Recv error during header read ")); - ACE_DEBUG ((LM_DEBUG, - "attempted to read %d\n", - header_bytes_left_to_read)); - delete this->msg_frag_; - this->msg_frag_ = 0; - return n; - } - - // Bump the write pointer by the amount read. - this->msg_frag_->wr_ptr (n); - - // At this point we may or may not have the ENTIRE header. - if (this->msg_frag_->length () < HEADER_SIZE) - { - ACE_DEBUG ((LM_DEBUG, - "Partial header received: only %d bytes\n", - this->msg_frag_->length ())); - // Notify the caller that we didn't get an entire event. - errno = EWOULDBLOCK; - return -1; - } - } - - // At this point there is a complete, valid header in msg_frag_ - len = sizeof event->buf_ + HEADER_SIZE - this->msg_frag_->length (); - - ssize_t m = this->peer ().recv (event->buf_ + offset, len); - - // Try to receive the remainder of the event - - switch (m) - { - case -1: - if (errno == EWOULDBLOCK) - { - // This shouldn't happen since the ACE_Reactor - // just triggered us to handle pending I/O! - ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n")); - errno = EWOULDBLOCK; - return -1; - } - else - /* FALLTHROUGH */; - - case 0: // Premature EOF. - delete this->msg_frag_; - this->msg_frag_ = 0; - return 0; - - default: - if (m != len) - // Re-adjust pointer to skip over the part we've read. - { - this->msg_frag_->wr_ptr (m); - errno = EWOULDBLOCK; - return -1; // Inform caller that we didn't get the whole event. - } - else - { - // Set the write pointer at 1 past the end of the event. - this->msg_frag_->wr_ptr (m); - - // Set the read pointer to the beginning of the event. - this->msg_frag_->rd_ptr (this->msg_frag_->base ()); - - // Allocate an event forwarding header and chain the data - // portion onto its continuation field. - ACE_NEW_RETURN (forward_addr, - ACE_Message_Block (sizeof (Event_Addr), - ACE_Message_Block::MB_PROTO, - this->msg_frag_), - -1); - - Event_Addr event_addr (this->id (), event->header_.routing_id_, 0); - // Copy the forwarding address from the Event_Addr into - // forward_addr. - forward_addr->copy ((char *) &event_addr, sizeof (Event)); - - // Reset the pointer to indicate we've got an entire event. - this->msg_frag_ = 0; - } - this->total_bytes (m + n); -#if defined (VERBOSE) - ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s", - event_addr.conn_id_, event->header_.routing_id_, event->header_.len_, - event->header_.len_, event->buf_)); -#else - ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n", - event->header_.routing_id_, event->header_.len_, this->total_bytes ())); -#endif - return m + n; - } -} - -// Receive various types of input (e.g., Peer event from the -// gatewayd, as well as stdio). - -int -Supplier_Handler::handle_input (ACE_HANDLE) -{ - ACE_Message_Block *forward_addr = 0; - - switch (this->recv (forward_addr)) - { - case 0: - // Note that a peer should never initiate a shutdown. - this->state (IO_Handler::FAILED); - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has closed down unexpectedly for Input IO_Handler %d\n", - this->id ()), -1); - /* NOTREACHED */ - case -1: - if (errno == EWOULDBLOCK) - // A short-read, we'll come back and finish it up later on! - return 0; - else // A weird problem occurred, shut down and start again. - { - this->state (IO_Handler::FAILED); - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input IO_Handler %d\n", - "Peer has failed unexpectedly", - this->id ()), -1); - } - /* NOTREACHED */ - default: - return this->forward (forward_addr); - } -} - -// Route an event to its appropriate destination. - -int -Supplier_Handler::forward (ACE_Message_Block *forward_addr) -{ - // We got a valid event, so determine its virtual routing id, - // which is stored in the first of the two event blocks chained - // together. - - Event_Addr *forwarding_key = (Event_Addr *) forward_addr->rd_ptr (); - - // Skip over the address portion. - const ACE_Message_Block *const data = forward_addr->cont (); - - // RE points to the routing entry located for this routing id. - Consumer_Entry *re = 0; - - if (this->consumer_map_->find (*forwarding_key, re) != -1) - { - // Check to see if there are any destinations. - if (re->destinations ()->size () == 0) - ACE_DEBUG ((LM_WARNING, - "there are no active destinations for this event currently\n")); - - else // There are destinations, so forward the event. - { - Consumer_Entry::ENTRY_SET *esp = re->destinations (); - Consumer_Entry::ENTRY_ITERATOR si (*esp); - - for (IO_Handler **channel = 0; si.next (channel) != 0; si.advance ()) - { - // Only process active channels. - if ((*channel)->active ()) - { - // Clone the event portion (should be doing reference counting here...) - ACE_Message_Block *newmsg = data->clone (); - - ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ())); - - if ((*channel)->put (newmsg) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", - "gateway is flow controlled, so we're dropping events")); - else - ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n", - "put", (*channel)->id ())); - - // Caller is responsible for freeing a ACE_Message_Block if failures occur. - delete newmsg; - } - } - } - // Will become superfluous once we have reference counting... - delete forward_addr; - return 0; - } - } - delete forward_addr; - // Failure return. - ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", - forwarding_key->conn_id_, forwarding_key->logical_id_, forwarding_key->payload_)); - return 0; -} - -#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX>; -template class ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX>; -template class ACE_Map_Entry<Event_Addr, Consumer_Entry *>; -#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/IO_Handler.h b/apps/Gateway/Gateway/IO_Handler.h deleted file mode 100644 index 7bda073f09b..00000000000 --- a/apps/Gateway/Gateway/IO_Handler.h +++ /dev/null @@ -1,224 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// IO_Handler.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_IO_HANDLER) -#define _IO_HANDLER - -#include "ace/Service_Config.h" -#include "ace/SOCK_Connector.h" -#include "ace/Svc_Handler.h" -#include "Consumer_Map.h" -#include "Consumer_Entry.h" -#include "Event.h" - -// Forward declaration. -class IO_Handler_Connector; - -class IO_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> - // = TITLE - // IO_Handler contains info about connection state and addressing. - // - // = DESCRIPTION - // The IO_Handler classes process events sent from the peers to the - // gateway. These classes works as follows: - // - // 1. IO_Handler_Connector creates a number of connections with the set of - // peers specified in a configuration file. - // - // 2. For each peer that connects successfully, IO_Handler_Connector - // creates an IO_Handler object. Each object assigns a unique routing - // id to its associated peer. The Handlers are used by gatewayd - // that to receive, route, and forward events from source peer(s) - // to destination peer(s). -{ -public: - IO_Handler (Consumer_Map *, - IO_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - - virtual int open (void * = 0); - // Initialize and activate a single-threaded IO_Handler (called by - // ACE_Connector::handle_output()). - - int bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - CONN_ID); - // Set the peer's addressing and routing information. - - ACE_INET_Addr &remote_addr (void); - // Returns the peer's routing address. - - ACE_INET_Addr &local_addr (void); - // Returns our local address. - - // = Set/get routing id. - CONN_ID id (void); - void id (CONN_ID); - - // = Set/get the current state of the IO_Handler. - enum State - { - IDLE = 1, // Prior to initialization. - CONNECTING, // During connection establishment. - ESTABLISHED, // IO_Handler is established and active. - DISCONNECTING, // IO_Handler is in the process of connecting. - FAILED // IO_Handler has failed. - }; - - // = Set/get the current state. - State state (void); - void state (State); - - // = Set/get the current retry timeout delay. - int timeout (void); - void timeout (int); - - // = Set/get the maximum retry timeout delay. - int max_timeout (void); - void max_timeout (int); - - // = Set/get IO_Handler activity status. - int active (void); - void active (int); - - // = Set/get direction (necessary for error checking). - char direction (void); - void direction (char); - - // = The total number of bytes sent/received on this channel. - size_t total_bytes (void); - void total_bytes (size_t bytes); - // Increment count by <bytes>. - - virtual int handle_timeout (const ACE_Time_Value &, const void *arg); - // Perform timer-based IO_Handler reconnection. - -protected: - enum - { - MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. - }; - - int initialize_connection (void); - // Perform the first-time initiation of a connection to the peer. - - int reinitiate_connection (void); - // Reinitiate a connection asynchronously when peers fail. - - void socket_queue_size (void); - // Set the socket queue size. - - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); - // Perform IO_Handler termination. - - Consumer_Map *consumer_map_; - // Pointer to table that maps an event - // to a Set of IO_Handler *'s for output. - - ACE_INET_Addr remote_addr_; - // Address of peer. - - ACE_INET_Addr local_addr_; - // Address of us. - - CONN_ID id_; - // The assigned routing ID of this entry. - - size_t total_bytes_; - // The total number of bytes sent/received on this channel. - - State state_; - // The current state of the channel. - - IO_Handler_Connector *connector_; - // Back pointer to IO_Handler_Connector to reestablish broken - // connections. - - int timeout_; - // Amount of time to wait between reconnection attempts. - - int max_timeout_; - // Maximum amount of time to wait between reconnection attempts. - - char direction_; - // Indicates which direction data flows through the channel ('O' == - // output and 'I' == input). - - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). -}; - -class Supplier_Handler : public IO_Handler - // = TITLE - // Handle reception of Peer events arriving as events. -{ -public: - Supplier_Handler (Consumer_Map *, - IO_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - // Constructor sets the consumer map pointer. - - virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - // Receive and process peer events. - -protected: - virtual int recv (ACE_Message_Block *&); - // Receive an event from a Supplier. - - int forward (ACE_Message_Block *event); - // Forward the Event to a Consumer. - - ACE_Message_Block *msg_frag_; - // Keep track of event fragment to handle non-blocking recv's from - // Suppliers. -}; - -class Consumer_Handler : public IO_Handler - // = TITLE - // Handle transmission of events to other Peers using a - // single-threaded approach. -{ -public: - Consumer_Handler (Consumer_Map *, - IO_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); - // Send an event to a Consumer (may be queued if necessary). - -protected: - // = We'll allow up to 16 megabytes to be queued per-output - // channel. - enum {QUEUE_SIZE = 1024 * 1024 * 16}; - - virtual int handle_input (ACE_HANDLE); - // Receive and process shutdowns from a Consumer. - - virtual int handle_output (ACE_HANDLE); - // Finish sending event when flow control conditions abate. - - int nonblk_put (ACE_Message_Block *mb); - // Perform a non-blocking put(). - - virtual ssize_t send (ACE_Message_Block *); - // Send an event to a Consumer. -}; - -#endif /* _IO_HANDLER */ diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.cpp b/apps/Gateway/Gateway/IO_Handler_Connector.cpp deleted file mode 100644 index 712b348951d..00000000000 --- a/apps/Gateway/Gateway/IO_Handler_Connector.cpp +++ /dev/null @@ -1,92 +0,0 @@ -#include "IO_Handler_Connector.h" -// $Id$ - - -IO_Handler_Connector::IO_Handler_Connector (void) -{ -} - -// Override the connection-failure method to add timer support. -// Note that these timers perform "expoential backoff" to -// avoid rapidly trying to reestablish connections when a link -// goes down. - -int -IO_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) -{ - ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0; - - // Locate the ACE_Svc_Handler corresponding to the socket descriptor. - if (this->handler_map_.find (sd, stp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", - sd, "find"), -1); - - IO_Handler *channel = stp->svc_handler (); - - // Schedule a reconnection request at some point in the future - // (note that channel uses an exponential backoff scheme). - if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, - channel->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - return 0; -} - -// Initiate (or reinitiate) a connection to the IO_Handler. - -int -IO_Handler_Connector::initiate_connection (IO_Handler *channel, - ACE_Synch_Options &synch_options) -{ - char buf[MAXHOSTNAMELEN]; - - // Mark ourselves as idle so that the various iterators - // will ignore us until we are reconnected. - channel->state (IO_Handler::IDLE); - - if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 - || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "can't obtain peer's address"), -1); - - // Try to connect to the Peer. - - if (this->connect (channel, channel->remote_addr (), - synch_options, channel->local_addr ()) == -1) - { - if (errno != EWOULDBLOCK) - { - channel->state (IO_Handler::FAILED); - ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", - "connect", buf)); - - // Reschedule ourselves to try and connect again. - if (synch_options[ACE_Synch_Options::USE_REACTOR]) - { - if (ACE_Service_Config::reactor ()->schedule_timer - (channel, 0, channel->timeout ()) == 0) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - else - // Failures on synchronous connects are reported as errors - // so that the caller can decide how to proceed. - return -1; - } - else - { - channel->state (IO_Handler::CONNECTING); - ACE_DEBUG ((LM_DEBUG, - "(%t) in the process of connecting %s to %s\n", - synch_options[ACE_Synch_Options::USE_REACTOR] - ? "asynchronously" : "synchronously", buf)); - } - } - else - { - channel->state (IO_Handler::ESTABLISHED); - ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", - buf, channel->get_handle ())); - } - return 0; -} diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.h b/apps/Gateway/Gateway/IO_Handler_Connector.h deleted file mode 100644 index 585428c88ee..00000000000 --- a/apps/Gateway/Gateway/IO_Handler_Connector.h +++ /dev/null @@ -1,40 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// IO_Handler_Connector.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_IO_HANDLER_CONNECTOR) -#define _IO_HANDLER_CONNECTOR - -#include "ace/Connector.h" -#include "Thr_IO_Handler.h" - -class IO_Handler_Connector : public ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR> - // = TITLE - // A concrete factory class that setups connections to peerds - // and produces a new IO_Handler object to do the dirty work... -{ -public: - IO_Handler_Connector (void); - - // Initiate (or reinitiate) a connection on the IO_Handler. - int initiate_connection (IO_Handler *, - ACE_Synch_Options & = ACE_Synch_Options::synch); - -protected: - // Override the connection-failure method to add timer support. - virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask); -}; - -#endif /* _IO_HANDLER_CONNECTOR */ diff --git a/apps/Gateway/Gateway/Peer_Message.h b/apps/Gateway/Gateway/Peer_Message.h deleted file mode 100644 index d9e65650095..00000000000 --- a/apps/Gateway/Gateway/Peer_Message.h +++ /dev/null @@ -1,89 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Peer_Message.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (PEER_MESSAGE) -#define PEER_MESSAGE - -// This is the unique connection identifier that denotes a particular -// Channel in the Gateway. -typedef short CONN_ID; - -class Peer_Addr - // = TITLE - // Peer address is used to identify the source/destination of a - // routing message. -{ -public: - Peer_Addr (CONN_ID cid = -1, u_char lid = 0, u_char pay = 0) - : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {} - - int operator== (const Peer_Addr &pa) const - { - return this->conn_id_ == pa.conn_id_ - && this->logical_id_ == pa.logical_id_ - && this->payload_ == pa.payload_; - } - - CONN_ID conn_id_; - // Unique connection identifier that denotes a particular Channel. - - u_char logical_id_; - // Logical ID. - - u_char payload_; - // Payload type. -}; - - -class Peer_Header - // = TITLE - // Fixed sized header. -{ -public: - typedef u_short ROUTING_ID; - // Type used to route messages from gatewayd. - - enum - { - INVALID_ID = -1 // No peer can validly use this number. - }; - - ROUTING_ID routing_id_; - // Source ID. - - size_t len_; - // Length of the message in bytes. -}; - -class Peer_Message - // = TITLE - // Variable-sized message (buf_ may be variable-sized between - // 0 and MAX_PAYLOAD_SIZE). -{ -public: - enum { MAX_PAYLOAD_SIZE = 1024 }; - // The maximum size of an Peer message (see Peer protocol specs for - // exact #). - - Peer_Header header_; - // Message header. - - char buf_[MAX_PAYLOAD_SIZE]; - // Message payload. -}; - -#endif /* PEER_MESSAGE */ diff --git a/apps/Gateway/Gateway/Routing_Entry.cpp b/apps/Gateway/Gateway/Routing_Entry.cpp deleted file mode 100644 index cc270cfac3a..00000000000 --- a/apps/Gateway/Gateway/Routing_Entry.cpp +++ /dev/null @@ -1,47 +0,0 @@ -// Defines an entry in the Routing Table. -// $Id$ - -#include "Routing_Entry.h" - -Routing_Entry::Routing_Entry (int validity_interval) - : validity_interval_ (validity_interval) -{ - ACE_NEW (this->destinations_, Routing_Entry::ENTRY_SET); -} - -Routing_Entry::~Routing_Entry (void) -{ - delete this->destinations_; -} - -// Get the associated set of destinations. - -Routing_Entry::ENTRY_SET * -Routing_Entry::destinations (void) -{ - return this->destinations_; -} - -// Set the associated set of destinations. - -void -Routing_Entry::destinations (Routing_Entry::ENTRY_SET *s) -{ - this->destinations_ = s; -} - -// Get the current validity interval for this route. - -int -Routing_Entry::validity_interval (void) -{ - return this->validity_interval_; -} - -// Set the current validity interval for this route. - -void -Routing_Entry::validity_interval (int vi) -{ - this->validity_interval_ = vi; -} diff --git a/apps/Gateway/Gateway/Routing_Entry.h b/apps/Gateway/Gateway/Routing_Entry.h deleted file mode 100644 index ab8e0eee53d..00000000000 --- a/apps/Gateway/Gateway/Routing_Entry.h +++ /dev/null @@ -1,53 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Routing_Entry.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_ROUTING_ENTRY) -#define _ROUTING_ENTRY - -#include "ace/Set.h" - -// Forward reference. -class Channel; - -class Routing_Entry -{ - // = TITLE - // Defines an entry in the Routing_Table. -public: - Routing_Entry (int validity_interval = 0); - ~Routing_Entry (void); - - typedef ACE_Unbounded_Set<Channel *> ENTRY_SET; - typedef ACE_Unbounded_Set_Iterator<Channel *> ENTRY_ITERATOR; - - // = Set/get the associated set of destinations. - ENTRY_SET *destinations (void); - void destinations (ENTRY_SET *); - - // = Set/get current validity interval for this routing entry. - int validity_interval (void); - void validity_interval (int); - -protected: - ENTRY_SET *destinations_; - // The set of destinations; - - int validity_interval_; - // The current validity interval of this link. -}; - -#endif /* _ROUTING_ENTRY */ diff --git a/apps/Gateway/Gateway/Routing_Table.cpp b/apps/Gateway/Gateway/Routing_Table.cpp deleted file mode 100644 index 3ef2f21bc1f..00000000000 --- a/apps/Gateway/Gateway/Routing_Table.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -#if !defined (_ROUTING_TABLE_C) -#define _ROUTING_TABLE_C - - -#include "Routing_Table.h" - -/* Bind the EXT_ID to the INT_ID. */ - -template <class EXT_ID, class INT_ID, class LOCK> int -Routing_Table<EXT_ID, INT_ID, LOCK>::bind (EXT_ID ext_id, INT_ID *int_id) -{ - return this->map_.bind (ext_id, int_id); -} - -/* Find the INT_ID corresponding to the EXT_ID. */ - -template <class EXT_ID, class INT_ID, class LOCK> int -Routing_Table<EXT_ID, INT_ID, LOCK>::find (EXT_ID ext_id, INT_ID *&int_id) -{ - return this->map_.find (ext_id, int_id); -} - -/* Unbind (remove) the EXT_ID from the map. */ - -template <class EXT_ID, class INT_ID, class LOCK> int -Routing_Table<EXT_ID, INT_ID, LOCK>::unbind (EXT_ID ext_id) -{ - return this->map_.unbind (ext_id); -} - -template <class EXT_ID, class INT_ID, class LOCK> -Routing_Iterator<EXT_ID, INT_ID, LOCK>::Routing_Iterator (Routing_Table<EXT_ID, - INT_ID, LOCK> &rt, - int ignore_inactive) - : map_iter_ (rt.map_), - ignore_inactive_ (ignore_inactive) -{ -} - -template <class EXT_ID, class INT_ID, class LOCK> int -Routing_Iterator<EXT_ID, INT_ID, LOCK>::next (INT_ID *&ss) -{ - // Loop in order to skip over inactive entries if necessary. - - for (ACE_Map_Entry<EXT_ID, INT_ID *> *temp = 0; - this->map_iter_.next (temp) != 0; - this->advance ()) - { - // Skip over inactive entries if necessary. - if (temp->int_id_->active () == 0 && this->ignore_inactive_) - continue; - - // Otherwise, return the next item. - ss = temp->int_id_; - return 1; - } - return 0; -} - -template <class EXT_ID, class INT_ID, class LOCK> int -Routing_Iterator<EXT_ID, INT_ID, LOCK>::advance (void) -{ - return this->map_iter_.advance (); -} -#endif /* _ROUTING_TABLE_C */ diff --git a/apps/Gateway/Gateway/Routing_Table.h b/apps/Gateway/Gateway/Routing_Table.h deleted file mode 100644 index 84194f13e49..00000000000 --- a/apps/Gateway/Gateway/Routing_Table.h +++ /dev/null @@ -1,67 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Routing_Table.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_ROUTING_TABLE_H) -#define _ROUTING_TABLE_H - -#include "ace/Map_Manager.h" - -template <class EXT_ID, class INT_ID, class LOCK> -class Routing_Table -{ - // = TITLE - // Define a generic routing table based on the ACE Map_Manager. - // - // = DESCRIPTION - // We need to have this table, rather than just using the Map_Manager - // directly in order to ignore "inactive" routing entries... -public: - int bind (EXT_ID ext_id, INT_ID *int_id); - // Associate EXT_ID with the INT_ID. - - int find (EXT_ID ext_id, INT_ID *&int_id); - // Break any association of EXID. - - int unbind (EXT_ID ext_id); - // Locate EXID and pass out parameter via INID. If found, - // return 0, else -1. - -public: - ACE_Map_Manager<EXT_ID, INT_ID *, LOCK> map_; - // Map external IDs to internal IDs. -}; - -template <class EXT_ID, class INT_ID, class LOCK> -class Routing_Iterator -{ - // = TITLE - // Define an iterator for the Routing Table. -public: - Routing_Iterator (Routing_Table<EXT_ID, INT_ID, LOCK> &mm, - int ignore_inactive = 1); - int next (INT_ID *&); - int advance (void); - -private: - ACE_Map_Iterator<EXT_ID, INT_ID *, LOCK> map_iter_; - int ignore_inactive_; -}; - -#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) -#include "Routing_Table.cpp" -#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ -#endif /* _ROUTING_TABLE_H */ diff --git a/apps/Gateway/Gateway/Thr_Channel.cpp b/apps/Gateway/Gateway/Thr_Channel.cpp deleted file mode 100644 index 26e385e2727..00000000000 --- a/apps/Gateway/Gateway/Thr_Channel.cpp +++ /dev/null @@ -1,204 +0,0 @@ -#include "Thr_Channel.h" -// $Id$ - -#include "Channel_Connector.h" - -#if defined (ACE_HAS_THREADS) -Thr_Output_Channel::Thr_Output_Channel (ROUTING_TABLE *rt, - Channel_Connector *cc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Output_Channel (rt, cc, thr_mgr, socket_queue_size) -{ -} - -// This method should be called only when the peer shuts down -// unexpectedly. This method marks the Channel as having failed and -// deactivates the ACE_Message_Queue (to wake up the thread blocked on -// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will -// eventually try to reconnect... - -int -Thr_Output_Channel::handle_input (ACE_HANDLE h) -{ - this->Output_Channel::handle_input (h); - ACE_Service_Config::reactor ()->remove_handler (h, - ACE_Event_Handler::RWE_MASK - | ACE_Event_Handler::DONT_CALL); - // Deactivate the queue while we try to get reconnected. - this->msg_queue ()->deactivate (); - return 0; -} - -// Initialize the threaded Output_Channel object and spawn a new -// thread. - -int -Thr_Output_Channel::open (void *) -{ - // Set the size of the socket queue. - this->socket_queue_size (); - - // Turn off non-blocking I/O. - if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - // Register ourselves to receive input events (which indicate that - // the Peer has shut down unexpectedly). - if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); - - // Reactivate message queue. If it was active then this is the - // first time in and we need to spawn a thread, otherwise the queue - // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH>::WAS_ACTIVE) - { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); - // Become an active object by spawning a new thread to transmit - // messages to peers. - return this->activate (THR_NEW_LWP | THR_DETACHED); - } - else - { - ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); - return 0; - } -} - -// ACE_Queue up a message for transmission (must not block since all -// Input_Channels are single-threaded). - -int -Thr_Output_Channel::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - // Perform non-blocking enqueue. - return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -// Transmit messages to the peer (note simplification resulting from -// threads...) - -int -Thr_Output_Channel::svc (void) -{ - for (;;) - { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Output_Channel's fd = %d\n", - this->peer ().get_handle ())); - - // Since this method runs in its own thread it is OK to block on - // output. - - for (ACE_Message_Block *mb = 0; - this->msg_queue ()->dequeue_head (mb) != -1; ) - if (this->send_peer (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); - - ACE_ASSERT (errno == ESHUTDOWN); - - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Output_Channel %d on handle %d\n", - this->id (), this->get_handle ())); - - this->peer ().close (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", - tv.sec ())); - ACE_OS::sleep (tv); - } - } - - return 0; -} - -Thr_Input_Channel::Thr_Input_Channel (ROUTING_TABLE *rt, - Channel_Connector *cc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Input_Channel (rt, cc, thr_mgr, socket_queue_size) -{ -} - -int -Thr_Input_Channel::open (void *) -{ - // Set the size of the socket queue. - this->socket_queue_size (); - - // Turn off non-blocking I/O. - if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); - - // Reactivate message queue. If it was active then this is the - // first time in and we need to spawn a thread, otherwise the queue - // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH>::WAS_ACTIVE) - { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); - // Become an active object by spawning a new thread to transmit - // messages to peers. - return this->activate (THR_NEW_LWP | THR_DETACHED); - } - else - { - ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); - return 0; - } -} - -// Receive messages from a Peer in a separate thread (note reuse of -// existing code!). - -int -Thr_Input_Channel::svc (void) -{ - for (;;) - { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Input_Channel's fd = %d\n", - this->peer ().get_handle ())); - - // Since this method runs in its own thread and processes - // messages for one connection it is OK to block on input and - // output. - - while (this->handle_input () != -1) - continue; - - ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down threaded Input_Channel %d on handle %d\n", - this->id (), - this->get_handle ())); - - this->peer ().close (); - - // Deactivate the queue while we try to get reconnected. - this->msg_queue ()->deactivate (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", tv.sec ())); - ACE_OS::sleep (tv); - } - } - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/apps/Gateway/Gateway/Thr_Channel.h b/apps/Gateway/Gateway/Thr_Channel.h deleted file mode 100644 index a1dc91b1619..00000000000 --- a/apps/Gateway/Gateway/Thr_Channel.h +++ /dev/null @@ -1,65 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Thr_Channel.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_THR_CHANNEL) -#define _THR_CHANNEL - -#include "Channel.h" - -#if defined (ACE_HAS_THREADS) -class Thr_Output_Channel : public Output_Channel - // = TITLE - // Runs each Output Channel in a separate thread. -{ -public: - Thr_Output_Channel (ROUTING_TABLE *, - Channel_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); - - virtual int open (void *); - // Initialize the threaded Output_Channel object and spawn a new - // thread. - - virtual int handle_input (ACE_HANDLE); - // Called when Peer shutdown unexpectedly. - - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); - // Send a message to a peer. - - virtual int svc (void); - // Transmit peer messages. -}; - -class Thr_Input_Channel : public Input_Channel - // = TITLE - // Runs each Input Channel in a separate thread. -{ -public: - Thr_Input_Channel (ROUTING_TABLE *, - Channel_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); - - virtual int open (void *); - // Initialize the object and spawn a new thread. - - virtual int svc (void); - // Transmit peer messages. -}; -#endif /* ACE_HAS_THREADS */ -#endif /* _THR_CHANNEL */ diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.cpp b/apps/Gateway/Gateway/Thr_IO_Handler.cpp deleted file mode 100644 index 109cfad9c3f..00000000000 --- a/apps/Gateway/Gateway/Thr_IO_Handler.cpp +++ /dev/null @@ -1,204 +0,0 @@ -#include "Thr_IO_Handler.h" -// $Id$ - -#include "IO_Handler_Connector.h" - -#if defined (ACE_HAS_THREADS) -Thr_Consumer_Handler::Thr_Consumer_Handler (Consumer_Map *consumer_map, - IO_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Consumer_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) -{ -} - -// This method should be called only when the peer shuts down -// unexpectedly. This method marks the IO_Handler as having failed and -// deactivates the ACE_Message_Queue (to wake up the thread blocked on -// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will -// eventually try to reconnect... - -int -Thr_Consumer_Handler::handle_input (ACE_HANDLE h) -{ - this->Consumer_Handler::handle_input (h); - ACE_Service_Config::reactor ()->remove_handler (h, - ACE_Event_Handler::RWE_MASK - | ACE_Event_Handler::DONT_CALL); - // Deactivate the queue while we try to get reconnected. - this->msg_queue ()->deactivate (); - return 0; -} - -// Initialize the threaded Consumer_Handler object and spawn a new -// thread. - -int -Thr_Consumer_Handler::open (void *) -{ - // Set the size of the socket queue. - this->socket_queue_size (); - - // Turn off non-blocking I/O. - if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - // Register ourselves to receive input events (which indicate that - // the Peer has shut down unexpectedly). - if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); - - // Reactivate message queue. If it was active then this is the - // first time in and we need to spawn a thread, otherwise the queue - // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) - { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); - // Become an active object by spawning a new thread to transmit - // messages to peers. - return this->activate (THR_NEW_LWP | THR_DETACHED); - } - else - { - ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); - return 0; - } -} - -// ACE_Queue up a message for transmission (must not block since all -// Supplier_Handlers are single-threaded). - -int -Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - // Perform non-blocking enqueue. - return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -// Transmit messages to the peer (note simplification resulting from -// threads...) - -int -Thr_Consumer_Handler::svc (void) -{ - for (;;) - { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Handler's fd = %d\n", - this->peer ().get_handle ())); - - // Since this method runs in its own thread it is OK to block on - // output. - - for (ACE_Message_Block *mb = 0; - this->msg_queue ()->dequeue_head (mb) != -1; ) - if (this->send (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); - - ACE_ASSERT (errno == ESHUTDOWN); - - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Handler %d on handle %d\n", - this->id (), this->get_handle ())); - - this->peer ().close (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", - tv.sec ())); - ACE_OS::sleep (tv); - } - } - - return 0; -} - -Thr_Supplier_Handler::Thr_Supplier_Handler (Consumer_Map *consumer_map, - IO_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Supplier_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) -{ -} - -int -Thr_Supplier_Handler::open (void *) -{ - // Set the size of the socket queue. - this->socket_queue_size (); - - // Turn off non-blocking I/O. - if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); - - // Reactivate message queue. If it was active then this is the - // first time in and we need to spawn a thread, otherwise the queue - // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) - { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); - // Become an active object by spawning a new thread to transmit - // messages to peers. - return this->activate (THR_NEW_LWP | THR_DETACHED); - } - else - { - ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); - return 0; - } -} - -// Receive messages from a Peer in a separate thread (note reuse of -// existing code!). - -int -Thr_Supplier_Handler::svc (void) -{ - for (;;) - { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Handler's fd = %d\n", - this->peer ().get_handle ())); - - // Since this method runs in its own thread and processes - // messages for one connection it is OK to block on input and - // output. - - while (this->handle_input () != -1) - continue; - - ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down threaded Supplier_Handler %d on handle %d\n", - this->id (), - this->get_handle ())); - - this->peer ().close (); - - // Deactivate the queue while we try to get reconnected. - this->msg_queue ()->deactivate (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", tv.sec ())); - ACE_OS::sleep (tv); - } - } - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.h b/apps/Gateway/Gateway/Thr_IO_Handler.h deleted file mode 100644 index ee056b35361..00000000000 --- a/apps/Gateway/Gateway/Thr_IO_Handler.h +++ /dev/null @@ -1,64 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Thr_IO_Handler.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_THR_IO_HANDLER) -#define _THR_IO_HANDLER - -#include "IO_Handler.h" - -#if defined (ACE_HAS_THREADS) -class Thr_Consumer_Handler : public Consumer_Handler - // = TITLE - // Runs each Output IO_Handler in a separate thread. -{ -public: - Thr_Consumer_Handler (Consumer_Map *, - IO_Handler_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); - - virtual int open (void *); - // Initialize the threaded Consumer_Handler object and spawn a new - // thread. - - virtual int handle_input (ACE_HANDLE); - // Called when Peer shutdown unexpectedly. - - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); - // Send a message to a peer. - - virtual int svc (void); - // Transmit peer messages. -}; - -class Thr_Supplier_Handler : public Supplier_Handler - // = TITLE - // Runs each Input IO_Handler in a separate thread. -{ -public: - Thr_Supplier_Handler (Consumer_Map *, - IO_Handler_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); - - virtual int open (void *); - // Initialize the object and spawn a new thread. - - virtual int svc (void); - // Transmit peer messages. -}; -#endif /* ACE_HAS_THREADS */ -#endif /* _THR_IO_HANDLER */ diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp deleted file mode 100644 index f316e4e82bf..00000000000 --- a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp +++ /dev/null @@ -1,211 +0,0 @@ -// $Id$ - -#include "Event_Channel.h" -#include "Thr_Proxy_Handler.h" - -#if defined (ACE_HAS_THREADS) -Thr_Consumer_Proxy::Thr_Consumer_Proxy (ACE_Event_Channel &ec, - const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32 conn_id) - : Consumer_Proxy (ec, remote_addr, local_addr, conn_id) -{ -} - -// This method should be called only when the Consumer shuts down -// unexpectedly. This method marks the Proxy_Handler as having failed -// and deactivates the ACE_Message_Queue (to wake up the thread -// blocked on <dequeue_head> in svc()). -// Thr_Output_Handler::handle_close () will eventually try to -// reconnect... - -int -Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) -{ - // Call down to the <Consumer_Proxy> to handle this first. - this->Consumer_Proxy::handle_input (h); - - ACE_Service_Config::reactor ()->remove_handler - (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); - - // Deactivate the queue while we try to get reconnected. - this->msg_queue ()->deactivate (); - return 0; -} - -// Initialize the threaded Consumer_Proxy object and spawn a new -// thread. - -int -Thr_Consumer_Proxy::open (void *) -{ - // Turn off non-blocking I/O. - if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - // Call back to the <Event_Channel> to complete our initialization. - else if (this->event_channel_.complete_proxy_connection (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); - - // Register ourselves to receive input events (which indicate that - // the Consumer has shut down unexpectedly). - else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - - // Reactivate message queue. If it was active then this is the - // first time in and we need to spawn a thread, otherwise the queue - // was inactive due to some problem and we've already got a thread. - else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) - { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); - // Become an active object by spawning a new thread to transmit - // events to Consumers. - return this->activate (THR_NEW_LWP | THR_DETACHED); - } - else - { - ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); - return 0; - } -} - -// Queue up an event for transmission (must not block since -// Supplier_Proxys may be single-threaded). - -int -Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - // Perform non-blocking enqueue. - return this->msg_queue ()->enqueue_tail - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -// Transmit events to the peer (note simplification resulting from -// threads...) - -int -Thr_Consumer_Proxy::svc (void) -{ - - for (;;) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) connected! Thr_Consumer_Proxy's handle = %d\n", - this->peer ().get_handle ())); - - // Since this method runs in its own thread it is OK to block on - // output. - - for (ACE_Message_Block *mb = 0; - this->msg_queue ()->dequeue_head (mb) != -1; - ) - { - if (this->send (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); - } - - ACE_ASSERT (errno == ESHUTDOWN); - - ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", - this->id (), this->get_handle ())); - - this->peer ().close (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->event_channel_.initiate_proxy_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", - tv.sec ())); - - ACE_OS::sleep (tv); - } - } - - return 0; -} - -Thr_Supplier_Proxy::Thr_Supplier_Proxy (ACE_Event_Channel &ec, - const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32 conn_id) - : Supplier_Proxy (ec, remote_addr, local_addr, conn_id) -{ -} - -int -Thr_Supplier_Proxy::open (void *) -{ - // Turn off non-blocking I/O. - if (this->peer ().disable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - - // Call back to the <Event_Channel> to complete our initialization. - else if (this->event_channel_.complete_proxy_connection (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); - - // Reactivate message queue. If it was active then this is the - // first time in and we need to spawn a thread, otherwise the queue - // was inactive due to some problem and we've already got a thread. - else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) - { - ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); - // Become an active object by spawning a new thread to transmit - // events to peers. - return this->activate (THR_NEW_LWP | THR_DETACHED); - } - else - { - ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); - return 0; - } -} - -// Receive events from a Peer in a separate thread (note reuse of -// existing code!). - -int -Thr_Supplier_Proxy::svc (void) -{ - for (;;) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) connected! Thr_Supplier_Proxy's handle = %d\n", - this->peer ().get_handle ())); - - // Since this method runs in its own thread and processes events - // for one connection it is OK to call down to the - // <Supplier_Proxy::handle_input> method, which blocks on input. - - while (this->handle_input () != -1) - continue; - - ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n", - this->id (), this->get_handle ())); - - this->peer ().close (); - - // Deactivate the queue while we try to get reconnected. - this->msg_queue ()->deactivate (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->event_channel_.initiate_proxy_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", - tv.sec ())); - ACE_OS::sleep (tv); - } - } - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h deleted file mode 100644 index 275bc87b320..00000000000 --- a/apps/Gateway/Gateway/Thr_Proxy_Handler.h +++ /dev/null @@ -1,66 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Thr_Proxy_Handler.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_THR_IO_HANDLER) -#define _THR_IO_HANDLER - -#include "Proxy_Handler.h" - -#if defined (ACE_HAS_THREADS) -class Thr_Consumer_Proxy : public Consumer_Proxy - // = TITLE - // Runs each Output Proxy_Handler in a separate thread. -{ -public: - Thr_Consumer_Proxy (ACE_Event_Channel &, - const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32 conn_id); - - virtual int open (void *); - // Initialize the threaded Consumer_Proxy object and spawn a new - // thread. - - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); - // Send a message to a peer. - -protected: - virtual int handle_input (ACE_HANDLE); - // Called when Peer shutdown unexpectedly. - - virtual int svc (void); - // Transmit peer messages. -}; - -class Thr_Supplier_Proxy : public Supplier_Proxy - // = TITLE - // Runs each Input Proxy_Handler in a separate thread. -{ -public: - Thr_Supplier_Proxy (ACE_Event_Channel &, - const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32 conn_id); - - virtual int open (void *); - // Initialize the object and spawn a new thread. - -protected: - virtual int svc (void); - // Transmit peer messages. -}; -#endif /* ACE_HAS_THREADS */ -#endif /* _THR_IO_HANDLER */ diff --git a/apps/Gateway/Gateway/cc_config b/apps/Gateway/Gateway/cc_config deleted file mode 100644 index 96f9ebdedd7..00000000000 --- a/apps/Gateway/Gateway/cc_config +++ /dev/null @@ -1,10 +0,0 @@ -# Conn ID Hostname Remote Port Direction Max Retry Delay Local Port -# ------- -------- ---- --------- --------------- ---------- - 1 tango.cs 10004 I 32 20000 -# 2 tango.cs 10004 O 32 - 3 merengue.cs 10004 O 32 20001 -# 4 mambo.cs 10004 O 32 20000 -# 5 lambada.cs 10004 O 32 20000 -# 6 tango.cs 10004 O 32 20000 -# 7 tango.cs 5001 I 32 -# 8 tango.cs 5002 O 32 diff --git a/apps/Gateway/Gateway/rt_config b/apps/Gateway/Gateway/rt_config deleted file mode 100644 index e951a0f09be..00000000000 --- a/apps/Gateway/Gateway/rt_config +++ /dev/null @@ -1,7 +0,0 @@ -# Conn ID Logical ID Payload Destinations -# ------- ---------- ------- ------------ -# 1 1 0 3,4,5 - 1 1 0 3 - 3 1 0 3 -# 4 1 0 4 -# 5 1 0 5 diff --git a/apps/Gateway/Peer/Event.h b/apps/Gateway/Peer/Event.h deleted file mode 100644 index 5e288edf910..00000000000 --- a/apps/Gateway/Peer/Event.h +++ /dev/null @@ -1,125 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Event.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (EVENT) -#define EVENT - -#include "ace/OS.h" - -// This is the unique connection identifier that denotes a particular -// Proxy_Handler in the Gateway. -typedef ACE_INT32 ACE_INT32; - -class Event_Key - // = TITLE - // Address used to identify the source/destination of an event. - // - // = DESCRIPTION - // This is really a "virtual forwarding address" thatis used to - // decouple the filtering and forwarding logic of the Event - // Channel from the format of the data. -{ -public: - Event_Key (ACE_INT32 cid = -1, - u_char sid = 0, - u_char type = 0) - : conn_id_ (cid), - supplier_id_ (sid), - type_ (type) {} - - int operator== (const Event_Key &event_addr) const - { - return this->conn_id_ == event_addr.conn_id_ - && this->supplier_id_ == event_addr.supplier_id_ - && this->type_ == event_addr.type_; - } - - ACE_INT32 conn_id_; - // Unique connection identifier that denotes a particular - // Proxy_Handler. - - ACE_INT32 supplier_id_; - // Logical ID. - - ACE_INT32 type_; - // Event type. -}; - -class Event_Header - // = TITLE - // Fixed sized header. - // - // = DESCRIPTION - // This is designed to have a sizeof (16) to avoid alignment - // problems on most platforms. -{ -public: - typedef ACE_INT32 SUPPLIER_ID; - // Type used to forward events from gatewayd. - - enum - { - INVALID_ID = -1 // No peer can validly use this number. - }; - - void decode (void) - { - this->len_ = ntohl (this->len_); - this->supplier_id_ = ntohl (this->supplier_id_); - this->type_ = ntohl (this->type_); - this->priority_ = ntohl (this->priority_); - } - // Decode from network byte order to host byte order. - - void encode (void) - { - this->len_ = htonl (this->len_); - this->supplier_id_ = htonl (this->supplier_id_); - this->type_ = htonl (this->type_); - this->priority_ = htonl (this->priority_); - } - // Encode from host byte order to network byte order. - - size_t len_; - // Length of the data_ payload, in bytes. - - SUPPLIER_ID supplier_id_; - // Source ID. - - ACE_INT32 type_; - // Event type. - - ACE_INT32 priority_; - // Event priority. -}; - -class Event - // = TITLE - // Variable-sized event (data_ may be variable-sized between - // 0 and MAX_PAYLOAD_SIZE). -{ -public: - enum { MAX_PAYLOAD_SIZE = 1024 }; - // The maximum size of an Event. - - Event_Header header_; - // Event header. - - char data_[MAX_PAYLOAD_SIZE]; - // Event data. -}; - -#endif /* EVENT */ diff --git a/apps/Gateway/Peer/Gateway_Handler.cpp b/apps/Gateway/Peer/Gateway_Handler.cpp deleted file mode 100644 index cfc9a7dad6f..00000000000 --- a/apps/Gateway/Peer/Gateway_Handler.cpp +++ /dev/null @@ -1,652 +0,0 @@ -#include "ace/Get_Opt.h" -// $Id$ - - -#include "Gateway_Handler.h" - -Gateway_Handler::Gateway_Handler (ACE_Thread_Manager *) - : routing_id_ (0), - msg_frag_ (0), - total_bytes_ (0) -{ - this->msg_queue ()->high_water_mark (Gateway_Handler::QUEUE_SIZE); -} - -int -Gateway_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum)); - - // Shut down the main event loop. - ACE_Service_Config::end_reactor_event_loop (); - return 0; -} - -// Cache a binding to the HANDLER_MAP. - -void -Gateway_Handler::map (HANDLER_MAP *m) -{ - this->map_ = m; -} - -// Upcall from the ACE_Acceptor::handle_input() that turns control -// over to our application-specific Gateway handler. - -int -Gateway_Handler::open (void *a) -{ - ACE_DEBUG ((LM_DEBUG, "Gateway handler's fd = %d\n", - this->peer ().get_handle ())); - - // Call down to the base class to activate and register this - // handler. - if (this->inherited::open (a) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - - if (this->peer ().enable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enable"), -1); - - Gateway_Handler *this_ = this; - - // Add ourselves to the map so we can be removed later on. - if (this->map_->bind (this->get_handle (), this_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind"), -1); - - char *to = ACE_OS::getenv ("TIMEOUT"); - int timeout = to == 0 ? 100000 : ACE_OS::atoi (to); - - // Schedule the time between disconnects. This should really be a - // "tunable" parameter. - if (ACE_Service_Config::reactor ()->schedule_timer (this, 0, timeout) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "schedule_timer")); - - // If there are messages left in the queue, make sure we - // enable the ACE_Reactor appropriately to get them sent out. - if (this->msg_queue ()->is_empty () == 0 - && ACE_Service_Config::reactor ()->schedule_wakeup (this, - ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_wakeup"), -1); - - // First action is to wait to be notified of our routing id. - this->do_action_ = &Gateway_Handler::await_route_id; - return 0; -} - -// Read messages from stdin and send them to the gatewayd. - -int -Gateway_Handler::xmit_stdin (void) -{ - if (this->routing_id_ != -1) - { - ssize_t n; - ACE_Message_Block *mb; - - ACE_NEW_RETURN (mb, - ACE_Message_Block (sizeof (Event)), - -1); - - Event *peer_msg = (Event *) mb->rd_ptr (); - peer_msg->header_.routing_id_ = this->routing_id_; - - n = ACE_OS::read (ACE_STDIN, peer_msg->buf_, sizeof peer_msg->buf_); - - switch (n) - { - case 0: - ACE_DEBUG ((LM_DEBUG, "stdin closing down\n")); - - // Take stdin out of the ACE_Reactor so we stop trying to - // send messages. - if (ACE_Service_Config::reactor ()->remove_handler - (0, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "remove_handler")); - delete mb; - break; - case -1: - delete mb; - ACE_ERROR ((LM_ERROR, "%p\n", "read")); - break; - default: - peer_msg->header_.len_ = htonl (n); - mb->wr_ptr (sizeof (Peer_Header) + n); - - if (this->put (mb) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "%p\n", - "gateway is flow controlled, so we're dropping messages")); - else - ACE_ERROR ((LM_ERROR, "%p\n", "transmission failure in xmit_stdin")); - - // Caller is responsible for freeing a ACE_Message_Block - // if failures occur. - delete mb; - } - } - } - return 0; -} - -// Perform a non-blocking put() of message MB. If we are unable to -// send the entire message the remainder is re-Taskd at the *front* of -// the Message_List. - -int -Gateway_Handler::nonblk_put (ACE_Message_Block *mb) -{ - // Try to send the message. If we don't send it all (e.g., due to - // flow control), then re-ACE_Task the remainder at the head of the - // Message_List and ask the ACE_Reactor to inform us (via - // handle_output()) when it is possible to try again. - - ssize_t n; - - if ((n = this->send_peer (mb)) == -1) - return -1; - else if (errno == EWOULDBLOCK) // Didn't manage to send everything. - { - ACE_DEBUG ((LM_DEBUG, - "queueing activated on handle %d to routing id %d\n", - this->get_handle (), this->routing_id_)); - - // ACE_Queue in *front* of the list to preserve order. - if (this->msg_queue ()->enqueue_head - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enqueue_head"), -1); - - // Tell ACE_Reactor to call us back when we can send again. - if (ACE_Service_Config::reactor ()->schedule_wakeup - (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_wakeup"), -1); - return 0; - } - else - return n; -} - -// Finish sending a message when flow control conditions abate. This -// method is automatically called by the ACE_Reactor. - -int -Gateway_Handler::handle_output (ACE_HANDLE) -{ - ACE_Message_Block *mb = 0; - - ACE_DEBUG ((LM_DEBUG, "in handle_output\n")); - // The list had better not be empty, otherwise there's a bug! - - if (this->msg_queue ()->dequeue_head - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) - { - switch (this->nonblk_put (mb)) - { - case 0: // Partial send. - ACE_ASSERT (errno == EWOULDBLOCK); - // Didn't write everything this time, come back later... - break; - - case -1: - // Caller is responsible for freeing a ACE_Message_Block if - // failures occur. - delete mb; - ACE_ERROR ((LM_ERROR, "%p\n", - "transmission failure in handle_output")); - - /* FALLTHROUGH */ - default: // Sent the whole thing. - - // If we succeed in writing the entire message (or we did - // not fail due to EWOULDBLOCK) then check if there are more - // messages on the Message_List. If there aren't, tell the - // ACE_Reactor not to notify us anymore (at least until - // there are new messages queued up). - - if (this->msg_queue ()->is_empty ()) - { - ACE_DEBUG ((LM_DEBUG, - "queue now empty on handle %d to routing id %d\n", - this->get_handle (), - this->routing_id_)); - - if (ACE_Service_Config::reactor ()->cancel_wakeup - (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "cancel_wakeup")); - } - } - } - else - ACE_ERROR ((LM_ERROR, "%p\n", "dequeue_head")); - return 0; -} - -// Send a message to a peer (may ACE_Task if necessary). - -int -Gateway_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - if (this->msg_queue ()->is_empty ()) - // Try to send the message *without* blocking! - return this->nonblk_put (mb); - else - // If we have queued up messages due to flow control then just - // enqueue and return. - return this->msg_queue ()->enqueue_tail - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -// Send an Peer message to gatewayd. - -int -Gateway_Handler::send_peer (ACE_Message_Block *mb) -{ - ssize_t n; - size_t len = mb->length (); - - if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0) - return errno == EWOULDBLOCK ? 0 : n; - else if (n < (ssize_t) len) - { - // Re-adjust pointer to skip over the part we did send. - mb->rd_ptr (n); - this->total_bytes_ += n; - } - else /* if (n == length) */ - { - // The whole message is sent, we can now safely deallocate the - // buffer. Note that this should decrement a reference count... - this->total_bytes_ += n; - delete mb; - errno = 0; - } - ACE_DEBUG ((LM_DEBUG, "sent %d bytes, total bytes sent = %d\n", - n, this->total_bytes_)); - return n; -} - -// Receive an Peer message from gatewayd. Handles fragmentation. - -int -Gateway_Handler::recv_peer (ACE_Message_Block *&mb) -{ - Event *peer_msg; - size_t len; - ssize_t n; - size_t offset = 0; - - if (this->msg_frag_ == 0) - { - ACE_NEW_RETURN (this->msg_frag_, - ACE_Message_Block (sizeof (Event)), - -1); - - // No existing fragment... - if (this->msg_frag_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, "out of memory\n"), -1); - - peer_msg = (Event *) this->msg_frag_->rd_ptr (); - - switch (n = this->peer ().recv (peer_msg, sizeof (Peer_Header))) - { - case sizeof (Peer_Header): - len = ntohl (peer_msg->header_.len_); - if (len <= sizeof peer_msg->buf_) - { - this->msg_frag_->wr_ptr (sizeof (Peer_Header)); - break; // The message is within the maximum size range. - } - else - ACE_ERROR ((LM_ERROR, "message too long = %d\n", len)); - /* FALLTHROUGH */ - default: - ACE_ERROR ((LM_ERROR, "invalid length = %d\n", n)); - n = -1; - /* FALLTHROUGH */ - case -1: - /* FALLTHROUGH */ - case 0: - // Make sure to free up memory on error returns. - delete this->msg_frag_; - this->msg_frag_ = 0; - return n; - } - } - else - { - offset = this->msg_frag_->length () - sizeof (Peer_Header); - len = peer_msg->header_.len_ - offset; - } - - switch (n = this->peer ().recv (peer_msg->buf_ + offset, len)) - { - case -1: - if (errno == EWOULDBLOCK) - { - // This shouldn't happen since the ACE_Reactor - // just triggered us to handle pending I/O! - ACE_DEBUG ((LM_DEBUG, "unexpected recv failure\n")); - // Since ACE_DEBUG might change errno, we need to reset it - // here. - errno = EWOULDBLOCK; - return -1; - } - else - /* FALLTHROUGH */; - - case 0: // EOF. - delete this->msg_frag_; - this->msg_frag_ = 0; - return n; - - default: - if (n != (ssize_t) len) - // Re-adjust pointer to skip over the part we've read. - { - this->msg_frag_->wr_ptr (n); - errno = EWOULDBLOCK; - // Inform caller that we didn't get the whole message. - return -1; - } - else - { - // Set the write pointer at 1 past the end of the message. - this->msg_frag_->wr_ptr (n); - - // Set the read pointer to the beginning of the message. - this->msg_frag_->rd_ptr (this->msg_frag_->base ()); - - mb = this->msg_frag_; - - // Reset the pointer to indicate we've got an entire - // message. - this->msg_frag_ = 0; - } - return n; - } -} - -// Receive various types of input (e.g., Peer message from the -// gatewayd, as well as stdio). - -int -Gateway_Handler::handle_input (ACE_HANDLE sd) -{ - ACE_DEBUG ((LM_DEBUG, "in handle_input, sd = %d\n", sd)); - if (sd == ACE_STDIN) // Handle message from stdin. - return this->xmit_stdin (); - else - // Perform the appropriate action depending on the state we are - // in. - return (this->*do_action_) (); -} - -// Action that receives the route id. - -int -Gateway_Handler::await_route_id (void) -{ - ssize_t n = this->peer ().recv (&this->routing_id_, - sizeof this->routing_id_); - - if (n != sizeof this->routing_id_) - { - if (n == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "gatewayd has closed down unexpectedly\n"), -1); - else - ACE_ERROR_RETURN ((LM_ERROR, - "%p, bytes received on handle %d = %d\n", - "recv", this->get_handle (), n), -1); - } - else - ACE_DEBUG ((LM_DEBUG, "assigned routing id %d\n", - this->routing_id_)); - - // Transition to the action that waits for Peer messages. - this->do_action_ = &Gateway_Handler::await_messages; - - // Reset standard input. - ACE_OS::rewind (stdin); - - // Register this handler to receive test messages on stdin. - if (ACE_Service_Config::reactor ()->register_handler - (ACE_STDIN, this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); - return 0; -} - -// Action that receives messages. - -int -Gateway_Handler::await_messages (void) -{ - ACE_Message_Block *mb = 0; - ssize_t n = this->recv_peer (mb); - - switch (n) - { - case 0: - ACE_ERROR_RETURN ((LM_ERROR, "gatewayd has closed down\n"), -1); - /* NOTREACHED */ - case -1: - if (errno == EWOULDBLOCK) - // A short-read, we'll come back and finish it up later on! - return 0; - else - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "recv_peer"), -1); - /* NOTREACHED */ - default: - { - // We got a valid message, so let's process it now! At the - // moment, we just print out the message contents... - - Event *peer_msg = (Event *) mb->rd_ptr (); - this->total_bytes_ += mb->length (); - -#if defined (VERBOSE) - ACE_DEBUG ((LM_DEBUG, - "route id = %d, len = %d, payload = %*s", - peer_msg->header_.routing_id_, peer_msg->header_.len_, - peer_msg->header_.len_, peer_msg->buf_)); -#else - ACE_DEBUG ((LM_DEBUG, - "route id = %d, cur len = %d, total len = %d\n", - peer_msg->header_.routing_id_, - peer_msg->header_.len_, - this->total_bytes_)); -#endif - delete mb; - return 0; - } - } -} - -// Periodically send messages via ACE_Reactor timer mechanism. - -int -Gateway_Handler::handle_timeout (const ACE_Time_Value &, const void *) -{ - // Skip over deactivated descriptors. - if (this->get_handle () != -1) - { - // Unbind ourselves from the map. - if (this->map_->unbind (this->get_handle ()) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "unbind")); - - // Shut down the handler. - this->handle_close (); - } - return 0; -} - -// Handle shutdown of the Gateway_Handler object. - -int -Gateway_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - if (this->get_handle () != ACE_INVALID_HANDLE) - { - ACE_DEBUG ((LM_DEBUG, "shutting down Gateway_Handler on handle %d\n", - this->get_handle ())); - - // Explicitly remove ourselves for handle 0 (the ACE_Reactor - // removes this->handle (), note that - // ACE_Event_Handler::DONT_CALL instructs the ACE_Reactor *not* - // to call this->handle_close(), which would otherwise lead to - // recursion!). - if (ACE_Service_Config::reactor ()->remove_handler - (0, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR ((LM_ERROR, "handle = %d: %p\n", - 0, "remove_handler")); - - // Deregister this handler with the ACE_Reactor. - if (ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::RWE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "handle = %d: %p\n", - this->get_handle (), "remove_handler"), -1); - - // Close down the peer. - this->peer ().close (); - } - return 0; -} - -Gateway_Acceptor::Gateway_Acceptor (Gateway_Handler *handler) - : gateway_handler_ (handler) -{ - this->gateway_handler_->map (&this->map_); -} - -// Note how this method just passes back the pre-allocated -// Gateway_Handler instead of having the ACE_Acceptor allocate a new -// one each time! - -Gateway_Handler * -Gateway_Acceptor::make_svc_handler (void) -{ - return this->gateway_handler_; -} - -int -Gateway_Acceptor::handle_signal (int signum, siginfo_t *, ucontext_t *) -{ - ACE_DEBUG ((LM_DEBUG, "signal %S occurred\n", signum)); - return 0; -} - -/* Returns information on the currently active service. */ - -int -Gateway_Acceptor::info (char **strp, size_t length) const -{ - char buf[BUFSIZ]; - char addr_str[BUFSIZ]; - - ACE_INET_Addr addr; - - if (this->acceptor ().get_local_addr (addr) == -1) - return -1; - else if (addr.addr_to_string (addr_str, sizeof addr) == -1) - return -1; - - ACE_OS::sprintf (buf, "%s\t %s/%s %s", - "Gateway peer daemon", addr_str, "tcp", - "# IRIDIUM SRP traffic generator and data sink\n"); - - if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0) - return -1; - else - ACE_OS::strncpy (*strp, buf, length); - return ACE_OS::strlen (buf); -} - -// Hook called by the explicit dynamic linking facility to terminate -// the peer. - -int -Gateway_Acceptor::fini (void) -{ - HANDLER_ITERATOR mi (this->map_); - - for (MAP_ENTRY *me = 0; - mi.next (me) != 0; - mi.advance ()) - { - if (me->int_id_->get_handle () != -1) - { - ACE_DEBUG ((LM_DEBUG, "closing down handle %d\n", - me->int_id_->get_handle ())); - me->int_id_->handle_close (); - } - else - ACE_DEBUG ((LM_DEBUG, "already closed %d\n")); - me->int_id_->destroy (); // Will trigger a delete. - } - - this->gateway_handler_->destroy (); // Will trigger a delete. - return inherited::fini (); -} - -// Hook called by the explicit dynamic linking facility to initialize -// the peer. - -int -Gateway_Acceptor::init (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "dp:", 0); - ACE_INET_Addr addr; - - for (int c; (c = get_opt ()) != -1; ) - { - switch (c) - { - case 'p': - addr.set (ACE_OS::atoi (get_opt.optarg)); - break; - case 'd': - break; - default: - break; - } - } - - if (ACE_Service_Config::reactor ()->register_handler (SIGPIPE, this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); - - if (this->open (addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "registering service with ACE_Reactor\n"), -1); - - ACE_Sig_Set sig_set; - sig_set.sig_add (SIGINT); - sig_set.sig_add (SIGQUIT); - - // Register ourselves to receive SIGINT and SIGQUIT so we can shut - // down gracefully via signals. - - if (ACE_Service_Config::reactor ()->register_handler (sig_set, - this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - return 0; -} - -// Dynamically linked factory function that dynamically allocates a -// new Gateway_Acceptor object. - -ACE_Service_Object * -_alloc_peerd (void) -{ - // This function illustrates how we can associate a ACE_Svc_Handler - // with the ACE_Acceptor at initialization time. - Gateway_Handler *handler; - - ACE_NEW_RETURN (handler, Gateway_Handler, 0); - ACE_Service_Object *temp; - - ACE_NEW_RETURN (temp, Gateway_Acceptor (handler), 0); - return temp; -} diff --git a/apps/Gateway/Peer/Gateway_Handler.h b/apps/Gateway/Peer/Gateway_Handler.h deleted file mode 100644 index 6dc4539e6b7..00000000000 --- a/apps/Gateway/Peer/Gateway_Handler.h +++ /dev/null @@ -1,154 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -/* These Gateway handler classes process Peer messages sent from the - communication gateway daemon (gatewayd) to its various peers, e.g., - CF and ETS, (represented collectively in this prototype as peerd). - . These classes works as follows: - - 1. Gateway_Acceptor creates a listener endpoint and waits passively - for gatewayd to connect with it. - - 2. When gatewayd connects, Gateway_Acceptor creates an - Gateway_Handler object that sends/receives messages from - gatewayd. - - 3. Gateway_Handler waits for gatewayd to inform it of its routing - ID, which is prepended to all outgoing messages send from peerd. - - 4. Once the routing ID is set, peerd periodically sends messages to - gatewayd. Peerd also receives and "processes" messages - forwarded to it from gatewayd. In this program, peerd - "processes" messages by writing them to stdout. */ - -#if !defined (GATEWAY_HANDLER) -#define GATEWAY_HANDLER - -#include "ace/Service_Config.h" -#include "ace/Svc_Handler.h" -#include "ace/Acceptor.h" -#include "ace/SOCK_Stream.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/INET_Addr.h" -#include "ace/Map_Manager.h" -#include "Peer_Message.h" - -// Forward declaration. -class Gateway_Handler; - -// Maps a ACE_HANDLE onto a Gateway_Handler *. -typedef ACE_Map_Manager <ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_MAP; -typedef ACE_Map_Iterator<ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_ITERATOR; -typedef ACE_Map_Entry <ACE_HANDLE, Gateway_Handler *> MAP_ENTRY; - -// Handle Peer messages arriving as events. - -class Gateway_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> -{ -public: - Gateway_Handler (ACE_Thread_Manager * = 0); - - virtual int open (void * = 0); - // Initialize the handler (called by ACE_Acceptor::handle_input()) - - virtual int handle_input (ACE_HANDLE); - // Receive and process peer messages. - - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); - // Send a message to a gateway (may be queued if necessary). - - virtual int handle_output (ACE_HANDLE); - // Finish sending a message when flow control conditions abate. - - virtual int handle_timeout (const ACE_Time_Value &, - const void *arg); - // Periodically send messages via ACE_Reactor timer mechanism. - - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); - // Perform object termination. - - void map (HANDLER_MAP *); - // Cache a binding to the HANDLER_MAP. - -protected: - typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited; - - // We'll allow up to 16 megabytes to be queued per-output - // channel!!!! This is clearly a policy in search of refinement... - enum { QUEUE_SIZE = 1024 * 1024 * 16 }; - - int handle_signal (int signum, siginfo_t *, ucontext_t *); - - Peer_Header::ROUTING_ID routing_id_; - // Routing ID of the peer (obtained from gatewayd). - - virtual int nonblk_put (ACE_Message_Block *mb); - // Perform a non-blocking put(). - - virtual int recv_peer (ACE_Message_Block *&); - // Receive an Peer message from gatewayd. - - virtual int send_peer (ACE_Message_Block *); - // Send an Peer message to gatewayd. - - int xmit_stdin (void); - // Receive a message from stdin and send it to the gateway. - - int (Gateway_Handler::*do_action_) (void); - // Pointer-to-member-function for the current action to run in this state. - - int await_route_id (void); - // Action that receives the route id. - - int await_messages (void); - // Action that receives messages. - - ACE_Message_Block *msg_frag_; - // Keep track of message fragment to handle non-blocking recv's from gateway. - - size_t total_bytes_; - // The total number of bytes sent/received to the gateway. - - HANDLER_MAP *map_; - // Maps the ACE_HANDLE onto the Gateway_Handler *. -}; - -// A factory class that accept connections from gatewayd and -// dynamically creates a new Gateway_Handler object to do the dirty work. - -class Gateway_Acceptor : public ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR> -{ -public: - // = Initialization methods, called when dynamically linked. - Gateway_Acceptor (Gateway_Handler *handler); - virtual int init (int argc, char *argv[]); - // Initialize the acceptor. - - virtual int info (char **, size_t) const; - // Return info about this service. - - virtual int fini (void); - // Perform termination. - - virtual Gateway_Handler *make_svc_handler (void); - // Factory method that creates the Gateway_Handler once. - - virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); - // Handle various signals (e.g., SIGPIPE) - - HANDLER_MAP map_; - // Maps the ACE_HANDLE onto the Gateway_Handler *. - - Gateway_Handler *gateway_handler_; - // Pointer to memory allocated exactly once. - - typedef ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR> inherited; -}; - -// Factory function that allocates a new Peer daemon. -extern "C" ACE_Service_Object *_alloc_peerd (void); - -#endif /* GATEWAY_HANDLER */ - diff --git a/apps/Gateway/Peer/Peer_Message.h b/apps/Gateway/Peer/Peer_Message.h deleted file mode 100644 index 67f57f148cb..00000000000 --- a/apps/Gateway/Peer/Peer_Message.h +++ /dev/null @@ -1,44 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// Define the Peer message schema (this may change). - -#if !defined (PEER_MESSAGE) -#define PEER_MESSAGE - -// Fixed sized header. - -class Peer_Header -{ -public: -// Type used to route messages from gatewayd. - typedef short ROUTING_ID; - - enum - { - INVALID_ID = -1 // No peer may use this number. - }; - - // Source ID. - ROUTING_ID routing_id_; - - // Length of the message in bytes. - size_t len_; -}; - -// Variable-sized message (buf_ may be variable-sized between -// 0 and MAX_PAYLOAD_SIZE). - -class Peer_Message -{ -public: - // The maximum size of an Peer message (see Peer protocol specs for exact #). - enum { MAX_PAYLOAD_SIZE = 1024 }; - - Peer_Header header_; - - // Message payload - char buf_[MAX_PAYLOAD_SIZE]; -}; - -#endif /* PEER_MESSAGE */ |