summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway')
-rw-r--r--apps/Gateway/Gateway/Channel.cpp710
-rw-r--r--apps/Gateway/Gateway/Channel.h280
-rw-r--r--apps/Gateway/Gateway/Channel_Connector.cpp92
-rw-r--r--apps/Gateway/Gateway/Channel_Connector.h41
-rw-r--r--apps/Gateway/Gateway/Concurrency_Strategies.h74
-rw-r--r--apps/Gateway/Gateway/Consumer_Entry.cpp31
-rw-r--r--apps/Gateway/Gateway/Consumer_Entry.h45
-rw-r--r--apps/Gateway/Gateway/Consumer_Map.cpp61
-rw-r--r--apps/Gateway/Gateway/Consumer_Map.h62
-rw-r--r--apps/Gateway/Gateway/Dispatch_Set.h28
-rw-r--r--apps/Gateway/Gateway/IO_Handler.cpp710
-rw-r--r--apps/Gateway/Gateway/IO_Handler.h224
-rw-r--r--apps/Gateway/Gateway/IO_Handler_Connector.cpp92
-rw-r--r--apps/Gateway/Gateway/IO_Handler_Connector.h40
-rw-r--r--apps/Gateway/Gateway/Peer_Message.h89
-rw-r--r--apps/Gateway/Gateway/Routing_Entry.cpp47
-rw-r--r--apps/Gateway/Gateway/Routing_Entry.h53
-rw-r--r--apps/Gateway/Gateway/Routing_Table.cpp69
-rw-r--r--apps/Gateway/Gateway/Routing_Table.h67
-rw-r--r--apps/Gateway/Gateway/Thr_Channel.cpp204
-rw-r--r--apps/Gateway/Gateway/Thr_Channel.h65
-rw-r--r--apps/Gateway/Gateway/Thr_IO_Handler.cpp204
-rw-r--r--apps/Gateway/Gateway/Thr_IO_Handler.h64
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.cpp211
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.h66
-rw-r--r--apps/Gateway/Gateway/cc_config10
-rw-r--r--apps/Gateway/Gateway/rt_config7
27 files changed, 0 insertions, 3646 deletions
diff --git a/apps/Gateway/Gateway/Channel.cpp b/apps/Gateway/Gateway/Channel.cpp
deleted file mode 100644
index 99699a6ee87..00000000000
--- a/apps/Gateway/Gateway/Channel.cpp
+++ /dev/null
@@ -1,710 +0,0 @@
-
-// $Id$
-
-#include "Routing_Entry.h"
-#include "Channel_Connector.h"
-
-// Convenient short-hands.
-#define CO CONDITION
-#define MU MUTEX
-
-// = The total number of bytes sent/received on this channel.
-size_t
-Channel::total_bytes (void)
-{
- return this->total_bytes_;
-}
-
-void
-Channel::total_bytes (size_t bytes)
-{
- this->total_bytes_ += bytes;
-}
-
-Channel::Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH> (thr_mgr),
- routing_table_ (rt),
- id_ (-1),
- total_bytes_ (0),
- state_ (Channel::IDLE),
- connector_ (cc),
- timeout_ (1),
- max_timeout_ (Channel::MAX_RETRY_TIMEOUT),
- socket_queue_size_ (socket_queue_size)
-{
-}
-
-// Set the associated channel.
-
-void
-Channel::active (int a)
-{
- this->state (a == 0 ? Channel::IDLE : Channel::ESTABLISHED);
-}
-
-// Get the associated channel.
-
-int
-Channel::active (void)
-{
- return this->state () == Channel::ESTABLISHED;
-}
-
-// Set the direction.
-
-void
-Channel::direction (char d)
-{
- this->direction_ = d;
-}
-
-// Get the direction.
-
-char
-Channel::direction (void)
-{
- return this->direction_;
-}
-
-// Sets the timeout delay.
-
-void
-Channel::timeout (int to)
-{
- if (to > this->max_timeout_)
- to = this->max_timeout_;
-
- this->timeout_ = to;
-}
-
-// Recalculate the current retry timeout delay using exponential
-// backoff. Returns the original timeout (i.e., before the
-// recalculation).
-
-int
-Channel::timeout (void)
-{
- int old_timeout = this->timeout_;
- this->timeout_ *= 2;
-
- if (this->timeout_ > this->max_timeout_)
- this->timeout_ = this->max_timeout_;
-
- return old_timeout;
-}
-
-// Sets the max timeout delay.
-
-void
-Channel::max_timeout (int mto)
-{
- this->max_timeout_ = mto;
-}
-
-// Gets the max timeout delay.
-
-int
-Channel::max_timeout (void)
-{
- return this->max_timeout_;
-}
-
-// Restart connection asynchronously when timeout occurs.
-
-int
-Channel::handle_timeout (const ACE_Time_Value &, const void *)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) attempting to reconnect Channel %d with timeout = %d\n",
- this->id (), this->timeout_));
- return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
-}
-
-// Restart connection (blocking_semantics dicates whether we
-// restart synchronously or asynchronously).
-
-int
-Channel::reinitiate_connection (void)
-{
- // Skip over deactivated descriptors.
- if (this->get_handle () != -1)
- {
- // Make sure to close down peer to reclaim descriptor.
- this->peer ().close ();
-
-#if 0
-// if (this->state () == FAILED)
-// {
- // Reinitiate timeout to improve reconnection time.
-// this->timeout (1);
-#endif
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Channel %d\n",
- this->id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Service_Config::reactor ()->schedule_timer (this, 0,
- this->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- return 0;
-}
-
-// Handle shutdown of the Channel object.
-
-int
-Channel::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down Channel %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- return this->reinitiate_connection ();
-}
-
-// Set the state of the channel.
-
-void
-Channel::state (Channel::State s)
-{
- this->state_ = s;
-}
-
-// Perform the first-time initiation of a connection to the peer.
-
-int
-Channel::initialize_connection (void)
-{
- this->state_ = Channel::ESTABLISHED;
-
- // Restart the timeout to 1.
- this->timeout (1);
-
-#if defined (ASSIGN_ROUTING_ID)
- // Action that sends the route id to the peerd.
-
- CONN_ID id = htons (this->id ());
-
- ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
-
- if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- n == 0 ? "gatewayd has closed down unexpectedly" : "send"), -1);
-#endif /* ASSIGN_ROUTING_ID */
- return 0;
-}
-
-// Set the size of the socket queue.
-
-void
-Channel::socket_queue_size (void)
-{
- if (this->socket_queue_size_ > 0)
- {
- int option = this->direction_ == 'I' ? SO_RCVBUF : SO_SNDBUF;
-
- if (this->peer ().set_option (SOL_SOCKET, option,
- &this->socket_queue_size_, sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
- }
-}
-
-// Upcall from the ACE_Acceptor::handle_input() that
-// delegates control to our application-specific Channel.
-
-int
-Channel::open (void *a)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) Channel's fd = %d\n", this->peer ().get_handle ()));
-
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn on non-blocking I/O.
- if (this->peer ().enable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call down to the base class to activate and register this handler.
- if (this->ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH>::open (a) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
-
- return this->initialize_connection ();
-}
-
-// Return the current state of the channel.
-
-Channel::State
-Channel::state (void)
-{
- return this->state_;
-}
-
-void
-Channel::id (CONN_ID id)
-{
- this->id_ = id;
-}
-
-CONN_ID
-Channel::id (void)
-{
- return this->id_;
-}
-
-// Set the peer's address information.
-int
-Channel::bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- CONN_ID id)
-{
- this->remote_addr_ = remote_addr;
- this->local_addr_ = local_addr;
- this->id_ = id;
- return 0;
-}
-
-ACE_INET_Addr &
-Channel::remote_addr (void)
-{
- return this->remote_addr_;
-}
-
-ACE_INET_Addr &
-Channel::local_addr (void)
-{
- return this->local_addr_;
-}
-
-// Constructor sets the routing table pointer.
-
-Output_Channel::Output_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Channel (rt, cc, thr_mgr, socket_queue_size)
-{
- this->direction_ = 'O';
- this->msg_queue ()->high_water_mark (Output_Channel::QUEUE_SIZE);
-}
-
-// This method should be called only when the peer shuts down
-// unexpectedly. This method simply marks the Channel as
-// having failed so that handle_close () can reconnect.
-
-int
-Output_Channel::handle_input (ACE_HANDLE)
-{
- char buf[1];
-
- this->state (Channel::FAILED);
-
- switch (this->peer ().recv (buf, sizeof buf))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has failed unexpectedly for Output Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case 0:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has shutdown unexpectedly for Output Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- default:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer is sending input on Output Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- }
-}
-
-int
-Output_Channel::svc (void)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Output_Channel!\n"), -1);
-}
-
-// Perform a non-blocking put() of message MB. If we are unable to
-// send the entire message the remainder is re-queued at the *front* of
-// the Message_List.
-
-int
-Output_Channel::nonblk_put (ACE_Message_Block *mb)
-{
- // Try to send the message. If we don't send it all (e.g., due to
- // flow control), then re-queue the remainder at the head of the
- // Message_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
-
- ssize_t n;
-
- if ((n = this->send_peer (mb)) == -1)
- {
- // Things have gone wrong, let's try to close down and set up a new reconnection.
- this->state (Channel::FAILED);
- this->handle_close ();
- return -1;
- }
- else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
- // ACE_Queue in *front* of the list to preserve order.
- if (this->msg_queue ()->enqueue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
-
- // Tell ACE_Reactor to call us back when we can send again.
- else if (ACE_Service_Config::reactor ()->
- schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
- return 0;
- }
- else
- return n;
-}
-
-int
-Output_Channel::send_peer (ACE_Message_Block *mb)
-{
- ssize_t n;
- size_t len = mb->length ();
-
- if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0)
- return errno == EWOULDBLOCK ? 0 : n;
- else if (n < len)
- // Re-adjust pointer to skip over the part we did send.
- mb->rd_ptr (n);
- else /* if (n == length) */
- {
- // The whole message is sent, we can now safely deallocate the buffer.
- // Note that this should decrement a reference count...
- delete mb;
- errno = 0;
- }
- this->total_bytes (n);
- return n;
-}
-
-// Finish sending a message when flow control conditions abate.
-// This method is automatically called by the ACE_Reactor.
-
-int
-Output_Channel::handle_output (ACE_HANDLE)
-{
- ACE_Message_Block *mb = 0;
- int status = 0;
-
- ACE_DEBUG ((LM_DEBUG, "(%t) in handle_output on handle %d\n", this->get_handle ()));
- // The list had better not be empty, otherwise there's a bug!
-
- if (this->msg_queue ()->dequeue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
- {
- switch (this->nonblk_put (mb))
- {
- case 0: // Partial send.
- ACE_ASSERT (errno == EWOULDBLOCK);
- // Didn't write everything this time, come back later...
- break;
-
- case -1:
- // Caller is responsible for freeing a ACE_Message_Block if failures occur.
- delete mb;
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
-
- /* FALLTHROUGH */
- default: // Sent the whole thing.
-
- // If we succeed in writing the entire message (or we did not fail
- // due to EWOULDBLOCK) then check if there are more messages on the Message_List.
- // If there aren't, tell the ACE_Reactor not to notify us anymore (at least
- // until there are new messages queued up).
-
- if (this->msg_queue ()->is_empty ())
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) queueing deactivated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
-
- if (ACE_Service_Config::reactor ()->
- cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
- }
- }
- }
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
- return 0;
-}
-
-// Send a message to a peer (may queue if necessary).
-
-int
-Output_Channel::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- if (this->msg_queue ()->is_empty ())
- // Try to send the message *without* blocking!
- return this->nonblk_put (mb);
- else
- // If we have queued up messages due to flow control
- // then just enqueue and return.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Constructor sets the routing table pointer and the connector pointer.
-
-Input_Channel::Input_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : msg_frag_ (0),
- Channel (rt, cc, thr_mgr, socket_queue_size)
-{
- this->direction_ = 'I';
- this->msg_queue ()->high_water_mark (0);
-}
-
-int
-Input_Channel::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) put should not be called on Input_Channel!\n"), -1);
-}
-
-int
-Input_Channel::svc (void)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Input_Channel!\n"), -1);
-}
-
-// Receive a Peer message from peerd. Handles fragmentation.
-//
-// The routing message returned from recv_peer consists of two parts:
-// 1. The Address part, contains the virtual routing id.
-// 2. The Data part, which contains the actual data to be routed.
-//
-// The reason for having two parts is to shield the higher layers
-// of software from knowledge of the message structure.
-
-int
-Input_Channel::recv_peer (ACE_Message_Block *&route_addr)
-{
- Peer_Message *peer_msg;
- size_t len;
- ssize_t n = 0;
- ssize_t m = 0;
- size_t offset = 0;
-
- if (this->msg_frag_ == 0)
- // No existing fragment...
- ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Peer_Message)),
- -1);
-
- peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr ();
-
- const ssize_t HEADER_SIZE = sizeof (Peer_Header);
- ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length ();
-
- if (header_bytes_left_to_read > 0)
- {
- n = this->peer ().recv (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
-
- if (n == -1 /* error */
- || n == 0 /* EOF */)
- {
- ACE_ERROR ((LM_ERROR, "%p\n", "Recv error during header read "));
- ACE_DEBUG ((LM_DEBUG, "attempted to read %d\n", header_bytes_left_to_read));
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return n;
- }
-
- // Bump the write pointer by the amount read.
- this->msg_frag_->wr_ptr (n);
-
- // At this point we may or may not have the ENTIRE header.
- if (this->msg_frag_->length () < HEADER_SIZE)
- {
- ACE_DEBUG ((LM_DEBUG, "Partial header received: only %d bytes\n",
- this->msg_frag_->length ()));
- // Notify the caller that we didn't get an entire message.
- errno = EWOULDBLOCK;
- return -1;
- }
- }
-
- // At this point there is a complete, valid header in msg_frag_
- len = sizeof peer_msg->buf_ + HEADER_SIZE - this->msg_frag_->length ();
-
- // Try to receive the remainder of the message
-
- switch (m = this->peer ().recv (peer_msg->buf_ + offset, len))
- {
- case -1:
- if (errno == EWOULDBLOCK)
- {
- // This shouldn't happen since the ACE_Reactor
- // just triggered us to handle pending I/O!
- ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n"));
- errno = EWOULDBLOCK;
- return -1;
- }
- else
- /* FALLTHROUGH */;
-
- case 0: // Premature EOF.
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return 0;
-
- default:
- if (m != len)
- // Re-adjust pointer to skip over the part we've read.
- {
- this->msg_frag_->wr_ptr (m);
- errno = EWOULDBLOCK;
- return -1; // Inform caller that we didn't get the whole message.
- }
- else
- {
- // Set the write pointer at 1 past the end of the message.
- this->msg_frag_->wr_ptr (m);
-
- // Set the read pointer to the beginning of the message.
- this->msg_frag_->rd_ptr (this->msg_frag_->base ());
-
- // Allocate a routing message header and chain the data portion
- // onto its continuation field.
- ACE_NEW_RETURN (route_addr,
- ACE_Message_Block (sizeof (Peer_Addr),
- ACE_Message_Block::MB_PROTO,
- this->msg_frag_),
- -1);
-
- Peer_Addr peer_addr (this->id (), peer_msg->header_.routing_id_, 0);
- // Copy the routing address from the Peer_Message into routing_addr.
- route_addr->copy ((char *) &peer_addr, sizeof (Peer_Addr));
-
- // Reset the pointer to indicate we've got an entire message.
- this->msg_frag_ = 0;
- }
- this->total_bytes (m + n);
-#if defined (VERBOSE)
- ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s",
- peer_addr.conn_id_, peer_msg->header_.routing_id_, peer_msg->header_.len_,
- peer_msg->header_.len_, peer_msg->buf_));
-#else
- ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n",
- peer_msg->header_.routing_id_, peer_msg->header_.len_, this->total_bytes ()));
-#endif
- return m + n;
- }
-}
-
-// Receive various types of input (e.g., Peer message from the
-// gatewayd, as well as stdio).
-
-int
-Input_Channel::handle_input (ACE_HANDLE)
-{
- ACE_Message_Block *route_addr = 0;
-
- switch (this->recv_peer (route_addr))
- {
- case 0:
- // Note that a peer should never initiate a shutdown.
- this->state (Channel::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has closed down unexpectedly for Input Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case -1:
- if (errno == EWOULDBLOCK)
- // A short-read, we'll come back and finish it up later on!
- return 0;
- else // A weird problem occurred, shut down and start again.
- {
- this->state (Channel::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Channel %d\n",
- "Peer has failed unexpectedly",
- this->id ()), -1);
- }
- /* NOTREACHED */
- default:
- return this->route_message (route_addr);
- }
-}
-
-// Route a message to its appropriate destination.
-
-int
-Input_Channel::route_message (ACE_Message_Block *route_addr)
-{
- // We got a valid message, so determine its virtual routing id,
- // which is stored in the first of the two message blocks chained together.
-
- Peer_Addr *routing_key = (Peer_Addr *) route_addr->rd_ptr ();
-
- // Skip over the address portion.
- const ACE_Message_Block *const data = route_addr->cont ();
-
- // RE points to the routing entry located for this routing id.
- Routing_Entry *re = 0;
-
- if (this->routing_table_->find (*routing_key, re) != -1)
- {
- // Check to see if there are any destinations.
- if (re->destinations ()->size () == 0)
- ACE_DEBUG ((LM_WARNING,
- "there are no active destinations for this message currently\n"));
-
- else // There are destinations, so forward the message.
- {
- Routing_Entry::ENTRY_SET *esp = re->destinations ();
- Routing_Entry::ENTRY_ITERATOR si (*esp);
-
- for (Channel **channel = 0; si.next (channel) != 0; si.advance ())
- {
- // Only process active channels.
- if ((*channel)->active ())
- {
- // Clone the message portion (should be doing reference counting here...)
- ACE_Message_Block *newmsg = data->clone ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ()));
-
- if ((*channel)->put (newmsg) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping messages"));
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n",
- "put", (*channel)->id ()));
-
- // Caller is responsible for freeing a ACE_Message_Block if failures occur.
- delete newmsg;
- }
- }
- }
- // Will become superfluous once we have reference counting...
- delete route_addr;
- return 0;
- }
- }
- delete route_addr;
- // Failure return.
- ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
- routing_key->conn_id_, routing_key->logical_id_, routing_key->payload_));
- return 0;
-}
-
-#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Map_Manager<Peer_Addr, Routing_Entry *, MUTEX>;
-template class ACE_Map_Iterator<Peer_Addr, Routing_Entry *, MUTEX>;
-template class ACE_Map_Entry<Peer_Addr, Routing_Entry *>;
-#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Channel.h b/apps/Gateway/Gateway/Channel.h
deleted file mode 100644
index 339716bc55a..00000000000
--- a/apps/Gateway/Gateway/Channel.h
+++ /dev/null
@@ -1,280 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Channel.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CHANNEL)
-#define _CHANNEL
-
-#include "ace/Service_Config.h"
-#include "ace/INET_Addr.h"
-#include "ace/SOCK_Connector.h"
-#include "ace/Svc_Handler.h"
-#include "Routing_Table.h"
-#include "Routing_Entry.h"
-#include "Peer_Message.h"
-
-// The following typedefs are used in order to parameterize the
-// synchronization policies without changing the source code!
-
-// If we don't have threads then use the single-threaded synchronization.
-#if !defined (ACE_HAS_THREADS)
-#define SYNCH ACE_NULL_SYNCH
-typedef ACE_Null_Mutex MUTEX;
-#define CHANNEL_PEER_STREAM ACE_SOCK_STREAM
-#define CHANNEL_PEER_CONNECTOR ACE_SOCK_CONNECTOR
-#else /* ACE_HAS_THREADS */
-
-// Select communication mechanisms.
-#if 0 // defined (ACE_HAS_TLI)
-// Note that due to inconsistencies between the semantics of sockets
-// and TLI with respect to establishing non-blocking connections it's
-// not a good idea to use TLI...
-#include "ace/TLI_Connector.h"
-#define CHANNEL_PEER_STREAM ACE_TLI_STREAM
-#define CHANNEL_PEER_CONNECTOR ACE_TLI_CONNECTOR
-#else
-#define CHANNEL_PEER_STREAM ACE_SOCK_STREAM
-#define CHANNEL_PEER_CONNECTOR ACE_SOCK_CONNECTOR
-#endif /* 0 */
-
-// Note that we only need to make the ACE_Task thread-safe if we
-// are using the multi-threaded Thr_Output_Channel...
-#if defined (USE_OUTPUT_MT)
-#define SYNCH ACE_MT_SYNCH
-#else
-#define SYNCH ACE_NULL_SYNCH
-#endif /* USE_OUTPUT_MT || USE_INPUT_MT */
-
-// Note that we only need to make the ACE_Map_Manager thread-safe if
-// we are using the multi-threaded Thr_Input_Channel...
-#if defined (USE_INPUT_MT)
-typedef ACE_RW_Mutex MUTEX;
-#else
-typedef ACE_Null_Mutex MUTEX;
-#endif /* USE_INPUT_MT */
-#endif /* ACE_HAS_THREADS */
-
-// Typedef for the routing table.
-typedef Routing_Table<Peer_Addr, Routing_Entry, MUTEX>
- ROUTING_TABLE;
-
-// Forward declaration.
-class Channel_Connector;
-
-class Channel : public ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH>
- // = TITLE
- // Channel contains info about connection state and addressing.
- //
- // = DESCRIPTION
- // The Channel classes process messages sent from the peers to the
- // gateway. These classes works as follows:
- //
- // 1. Channel_Connector creates a number of connections with the set of
- // peers specified in a configuration file.
- //
- // 2. For each peer that connects successfully, Channel_Connector
- // creates an Channel object. Each object assigns a unique routing
- // id to its associated peer. The Channels are used by gatewayd
- // that to receive, route, and forward messages from source peer(s)
- // to destination peer(s).
-{
-public:
- Channel (ROUTING_TABLE *,
- Channel_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
-
- virtual int open (void * = 0);
- // Initialize and activate a single-threaded Channel (called by
- // ACE_Connector::handle_output()).
-
- int bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- CONN_ID);
- // Set the peer's addressing and routing information.
-
- ACE_INET_Addr &remote_addr (void);
- // Returns the peer's routing address.
-
- ACE_INET_Addr &local_addr (void);
- // Returns our local address.
-
- // = Set/get routing id.
- CONN_ID id (void);
- void id (CONN_ID);
-
- // = Set/get the current state of the Channel.
- enum State
- {
- IDLE = 1, // Prior to initialization.
- CONNECTING, // During connection establishment.
- ESTABLISHED, // Channel is established and active.
- DISCONNECTING, // Channel is in the process of connecting.
- FAILED // Channel has failed.
- };
-
- // = Set/get the current state.
- State state (void);
- void state (State);
-
- // = Set/get the current retry timeout delay.
- int timeout (void);
- void timeout (int);
-
- // = Set/get the maximum retry timeout delay.
- int max_timeout (void);
- void max_timeout (int);
-
- // = Set/get Channel activity status.
- int active (void);
- void active (int);
-
- // = Set/get direction (necessary for error checking).
- char direction (void);
- void direction (char);
-
- // = The total number of bytes sent/received on this channel.
- size_t total_bytes (void);
- void total_bytes (size_t bytes);
- // Increment count by <bytes>.
-
- virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
- // Perform timer-based Channel reconnection.
-
-protected:
- enum
- {
- MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout.
- };
-
- int initialize_connection (void);
- // Perform the first-time initiation of a connection to the peer.
-
- int reinitiate_connection (void);
- // Reinitiate a connection asynchronously when peers fail.
-
- void socket_queue_size (void);
- // Set the socket queue size.
-
- virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
- // Perform Channel termination.
-
- ROUTING_TABLE *routing_table_;
- // Pointer to table that maps a Peer_Addr
- // to a Set of Channel *'s for output.
-
- ACE_INET_Addr remote_addr_;
- // Address of peer.
-
- ACE_INET_Addr local_addr_;
- // Address of us.
-
- CONN_ID id_;
- // The assigned routing ID of this entry.
-
- size_t total_bytes_;
- // The total number of bytes sent/received on this channel.
-
- State state_;
- // The current state of the channel.
-
- Channel_Connector *connector_;
- // Back pointer to Channel_Connector to reestablish broken
- // connections.
-
- int timeout_;
- // Amount of time to wait between reconnection attempts.
-
- int max_timeout_;
- // Maximum amount of time to wait between reconnection attempts.
-
- char direction_;
- // Indicates which direction data flows through the channel ('O' ==
- // output and 'I' == input).
-
- int socket_queue_size_;
- // Size of the socket queue (0 means "use default").
-};
-
-class Input_Channel : public Channel
- // = TITLE
- // Handle reception of Peer messages arriving as events.
-{
-public:
- Input_Channel (ROUTING_TABLE *,
- Channel_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
- // Constructor sets the routing table pointer.
-
- virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- // Receive and process peer messages.
-
-protected:
- virtual int recv_peer (ACE_Message_Block *&);
- // Receive a message from a peer.
-
- int route_message (ACE_Message_Block *);
- // Action that receives messages from peerd.
-
- ACE_Message_Block *msg_frag_;
- // Keep track of message fragment to handle non-blocking recv's from
- // peers.
-
- virtual int svc (void);
- // This method is not used since we are single-threaded.
-
-private:
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
- // This methods should not be called to handle input.
-};
-
-class Output_Channel : public Channel
- // = TITLE
- // Handle transmission of messages to other Peers using a
- // single-threaded approach.
-{
-public:
- Output_Channel (ROUTING_TABLE *,
- Channel_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- // Send a message to a gateway (may be queued if necessary).
-
-protected:
- // = We'll allow up to 16 megabytes to be queued per-output
- // channel.
- enum {QUEUE_SIZE = 1024 * 1024 * 16};
-
- virtual int handle_input (ACE_HANDLE);
- // Receive and process shutdowns from peer.
-
- virtual int handle_output (ACE_HANDLE);
- // Finish sending a message when flow control conditions abate.
-
- int nonblk_put (ACE_Message_Block *mb);
- // Perform a non-blocking put().
-
- virtual int send_peer (ACE_Message_Block *);
- // Send a message to a peer.
-
- virtual int svc (void);
- // This method is not used since we are single-threaded.
-};
-
-#endif /* _CHANNEL */
diff --git a/apps/Gateway/Gateway/Channel_Connector.cpp b/apps/Gateway/Gateway/Channel_Connector.cpp
deleted file mode 100644
index a5394e8b013..00000000000
--- a/apps/Gateway/Gateway/Channel_Connector.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-#include "Channel_Connector.h"
-// $Id$
-
-
-Channel_Connector::Channel_Connector (void)
-{
-}
-
-// Override the connection-failure method to add timer support.
-// Note that these timers perform "expoential backoff" to
-// avoid rapidly trying to reestablish connections when a link
-// goes down.
-
-int
-Channel_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
-{
- ACE_Connector<Channel, CHANNEL_PEER_CONNECTOR>::AST *stp = 0;
-
- // Locate the ACE_Svc_Handler corresponding to the socket descriptor.
- if (this->handler_map_.find (sd, stp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n",
- sd, "find"), -1);
-
- Channel *channel = stp->svc_handler ();
-
- // Schedule a reconnection request at some point in the future
- // (note that channel uses an exponential backoff scheme).
- if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0,
- channel->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- return 0;
-}
-
-// Initiate (or reinitiate) a connection to the Channel.
-
-int
-Channel_Connector::initiate_connection (Channel *channel,
- ACE_Synch_Options &synch_options)
-{
- char buf[MAXHOSTNAMELEN];
-
- // Mark ourselves as idle so that the various iterators
- // will ignore us until we are reconnected.
- channel->state (Channel::IDLE);
-
- if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1
- || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "can't obtain peer's address"), -1);
-
- // Try to connect to the Peer.
-
- if (this->connect (channel, channel->remote_addr (),
- synch_options, channel->local_addr ()) == -1)
- {
- if (errno != EWOULDBLOCK)
- {
- channel->state (Channel::FAILED);
- ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
- "connect", buf));
-
- // Reschedule ourselves to try and connect again.
- if (synch_options[ACE_Synch_Options::USE_REACTOR])
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (channel, 0, channel->timeout ()) == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- else
- // Failures on synchronous connects are reported as errors
- // so that the caller can decide how to proceed.
- return -1;
- }
- else
- {
- channel->state (Channel::CONNECTING);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in the process of connecting %s to %s\n",
- synch_options[ACE_Synch_Options::USE_REACTOR]
- ? "asynchronously" : "synchronously", buf));
- }
- }
- else
- {
- channel->state (Channel::ESTABLISHED);
- ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
- buf, channel->get_handle ()));
- }
- return 0;
-}
diff --git a/apps/Gateway/Gateway/Channel_Connector.h b/apps/Gateway/Gateway/Channel_Connector.h
deleted file mode 100644
index 3e27f37355a..00000000000
--- a/apps/Gateway/Gateway/Channel_Connector.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Channel_Connector.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CHANNEL_CONNECTOR)
-#define _CHANNEL_CONNECTOR
-
-#include "ace/Connector.h"
-#include "Thr_Channel.h"
-
-class Channel_Connector : public ACE_Connector<Channel, CHANNEL_PEER_CONNECTOR>
- // = TITLE
- // A concrete factory class that setups connections to peerds
- // and produces a new Channel object to do the dirty work...
-{
-public:
- Channel_Connector (void);
-
- // Initiate (or reinitiate) a connection on the Channel.
- int initiate_connection (Channel *,
- ACE_Synch_Options & = ACE_Synch_Options::synch);
-
-protected:
- // Override the connection-failure method to add timer support.
- virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask);
-};
-
-#endif /* _CHANNEL_CONNECTOR */
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h
deleted file mode 100644
index 28e59a4b2e6..00000000000
--- a/apps/Gateway/Gateway/Concurrency_Strategies.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Concurrency_strategies.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CONCURRENCY_STRATEGIES)
-#define _CONCURRENCY_STRATEGIES
-
-#include "ace/Synch.h"
-
-// The following typedefs are used in order to parameterize the
-// synchronization policies without changing the source code!
-
-// If we don't have threads then use the single-threaded synchronization.
-#if !defined (ACE_HAS_THREADS)
-#define SYNCH_STRATEGY ACE_NULL_SYNCH
-typedef ACE_Null_Mutex MAP_MUTEX;
-#else /* ACE_HAS_THREADS */
-
-// Note that we only need to make the ACE_Task thread-safe if we are
-// using the multi-threaded Thr_Consumer_Proxy...
-#if defined (USE_OUTPUT_MT)
-#define SYNCH_STRATEGY ACE_MT_SYNCH
-#else
-#define SYNCH_STRATEGY ACE_NULL_SYNCH
-#endif /* USE_OUTPUT_MT || USE_INPUT_MT */
-
-// Note that we only need to make the ACE_Map_Manager thread-safe if
-// we are using the multi-threaded Thr_Supplier_Proxy. In this
-// case, we use an RW_Mutex since we'll lookup Consumers far more
-// often than we'll update them.
-#if defined (USE_INPUT_MT)
-typedef ACE_RW_Mutex MAP_MUTEX;
-#else
-typedef ACE_Null_Mutex MAP_MUTEX;
-#endif /* USE_INPUT_MT */
-#endif /* ACE_HAS_THREADS */
-
-// = Forward decls
-class Thr_Consumer_Proxy;
-class Thr_Supplier_Proxy;
-class Consumer_Proxy;
-class Supplier_Proxy;
-
-#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT))
-#if defined (USE_OUTPUT_MT)
-typedef Thr_Consumer_Proxy CONSUMER_PROXY;
-#else
-typedef Consumer_Proxy CONSUMER_PROXY;
-#endif /* USE_OUTPUT_MT */
-
-#if defined (USE_INPUT_MT)
-typedef Thr_Supplier_Proxy SUPPLIER_PROXY;
-#else
-typedef Supplier_Proxy SUPPLIER_PROXY;
-#endif /* USE_INPUT_MT */
-#else
-// Instantiate a non-multi-threaded Gateway.
-typedef Supplier_Proxy SUPPLIER_PROXY;
-typedef Consumer_Proxy CONSUMER_PROXY;
-#endif /* ACE_HAS_THREADS */
-
-#endif /* _CONCURRENCY_STRATEGIES */
diff --git a/apps/Gateway/Gateway/Consumer_Entry.cpp b/apps/Gateway/Gateway/Consumer_Entry.cpp
deleted file mode 100644
index c3dcd96ebbf..00000000000
--- a/apps/Gateway/Gateway/Consumer_Entry.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-// Defines an entry in the Consumer Map.
-// $Id$
-
-#include "Consumer_Entry.h"
-
-Consumer_Entry::Consumer_Entry (void)
-{
- ACE_NEW (this->destinations_, Consumer_Entry::ENTRY_SET);
-}
-
-Consumer_Entry::~Consumer_Entry (void)
-{
- delete this->destinations_;
-}
-
-// Get the associated set of destinations.
-
-Consumer_Entry::ENTRY_SET *
-Consumer_Entry::destinations (void)
-{
- return this->destinations_;
-}
-
-// Set the associated set of destinations.
-
-void
-Consumer_Entry::destinations (Consumer_Entry::ENTRY_SET *s)
-{
- this->destinations_ = s;
-}
-
diff --git a/apps/Gateway/Gateway/Consumer_Entry.h b/apps/Gateway/Gateway/Consumer_Entry.h
deleted file mode 100644
index fe502991514..00000000000
--- a/apps/Gateway/Gateway/Consumer_Entry.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Consumer_Entry.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_ROUTING_ENTRY)
-#define _ROUTING_ENTRY
-
-#include "ace/Set.h"
-
-// Forward reference.
-class IO_Handler;
-
-class Consumer_Entry
-{
- // = TITLE
- // Defines an entry in the Consumer_Map.
-public:
- Consumer_Entry (void);
- ~Consumer_Entry (void);
-
- typedef ACE_Unbounded_Set<IO_Handler *> ENTRY_SET;
- typedef ACE_Unbounded_Set_Iterator<IO_Handler *> ENTRY_ITERATOR;
-
- // = Set/get the associated set of destinations.
- ENTRY_SET *destinations (void);
- void destinations (ENTRY_SET *);
-
-protected:
- ENTRY_SET *destinations_;
- // The set of destinations;
-};
-
-#endif /* _ROUTING_ENTRY */
diff --git a/apps/Gateway/Gateway/Consumer_Map.cpp b/apps/Gateway/Gateway/Consumer_Map.cpp
deleted file mode 100644
index 6d16601f949..00000000000
--- a/apps/Gateway/Gateway/Consumer_Map.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#if !defined (_CONSUMER_MAP_C)
-#define _CONSUMER_MAP_C
-
-#include "Consumer_Map.h"
-
-// Bind the Event_Addr to the INT_ID.
-
-int
-Consumer_Map::bind (Event_Addr event_addr,
- Consumer_Entry *Consumer_Entry)
-{
- return this->map_.bind (event_addr, Consumer_Entry);
-}
-
-// Find the Consumer_Entry corresponding to the Event_Addr.
-
-int
-Consumer_Map::find (Event_Addr event_addr,
- Consumer_Entry *&Consumer_Entry)
-{
- return this->map_.find (event_addr, Consumer_Entry);
-}
-
-// Unbind (remove) the Event_Addr from the map.
-
-int
-Consumer_Map::unbind (Event_Addr event_addr)
-{
- return this->map_.unbind (event_addr);
-}
-
-Consumer_Map_Iterator::Consumer_Map_Iterator (Consumer_Map &rt)
- : map_iter_ (rt.map_)
-{
-}
-
-int
-Consumer_Map_Iterator::next (Consumer_Entry *&ss)
-{
- // Loop in order to skip over inactive entries if necessary.
-
- for (ACE_Map_Entry<Event_Addr, Consumer_Entry *> *temp = 0;
- this->map_iter_.next (temp) != 0;
- this->advance ())
- {
- // Otherwise, return the next item.
- ss = temp->int_id_;
- return 1;
- }
- return 0;
-}
-
-int
-Consumer_Map_Iterator::advance (void)
-{
- return this->map_iter_.advance ();
-}
-#endif /* _CONSUMER_MAP_C */
diff --git a/apps/Gateway/Gateway/Consumer_Map.h b/apps/Gateway/Gateway/Consumer_Map.h
deleted file mode 100644
index fd392afaf6e..00000000000
--- a/apps/Gateway/Gateway/Consumer_Map.h
+++ /dev/null
@@ -1,62 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Consumer_Map.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CONSUMER_MAP_H)
-#define _CONSUMER_MAP_H
-
-#include "ace/Map_Manager.h"
-#include "Concurrency_Strategies.h"
-#include "Event.h"
-#include "Consumer_Entry.h"
-
-class Consumer_Map
-{
- // = TITLE
- // Define a generic consumer map based on the ACE Map_Manager.
- //
- // = DESCRIPTION
- // This class makes it easier to use the Map_Manager.
-public:
- int bind (Event_Addr event, Consumer_Entry *Consumer_Entry);
- // Associate Event with the Consumer_Entry.
-
- int find (Event_Addr event, Consumer_Entry *&Consumer_Entry);
- // Break any association of EXID.
-
- int unbind (Event_Addr event);
- // Locate EXID and pass out parameter via INID. If found,
- // return 0, else -1.
-
-public:
- ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_;
- // Map that associates Event Addrs (external ids) with Consumer_Entry *'s
- // <internal IDs>.
-};
-
-class Consumer_Map_Iterator
-{
- // = TITLE
- // Define an iterator for the Consumer Map.
-public:
- Consumer_Map_Iterator (Consumer_Map &mm);
- int next (Consumer_Entry *&);
- int advance (void);
-
-private:
- ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_iter_;
- // Map we are iterating over.
-};
-#endif /* _CONSUMER_MAP_H */
diff --git a/apps/Gateway/Gateway/Dispatch_Set.h b/apps/Gateway/Gateway/Dispatch_Set.h
deleted file mode 100644
index a867f1ca5ff..00000000000
--- a/apps/Gateway/Gateway/Dispatch_Set.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Dispatch_Set.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_DISPATCH_SET)
-#define _DISPATCH_SET
-
-#include "ace/Set.h"
-
-// Forward reference.
-class Proxy_Handler;
-
-typedef ACE_Unbounded_Set<Proxy_Handler *> Dispatch_Set;
-typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Dispatch_Set_Iterator;
-
-#endif /* _DISPATCH_SET */
diff --git a/apps/Gateway/Gateway/IO_Handler.cpp b/apps/Gateway/Gateway/IO_Handler.cpp
deleted file mode 100644
index ba1b355b3ba..00000000000
--- a/apps/Gateway/Gateway/IO_Handler.cpp
+++ /dev/null
@@ -1,710 +0,0 @@
-// $Id$
-
-#include "Consumer_Entry.h"
-#include "IO_Handler_Connector.h"
-
-// Convenient short-hands.
-#define CO CONDITION
-#define MU MAP_MUTEX
-
-// The total number of bytes sent/received on this channel.
-
-size_t
-IO_Handler::total_bytes (void)
-{
- return this->total_bytes_;
-}
-
-void
-IO_Handler::total_bytes (size_t bytes)
-{
- this->total_bytes_ += bytes;
-}
-
-IO_Handler::IO_Handler (Consumer_Map *consumer_map,
- IO_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr),
- consumer_map_ (consumer_map),
- id_ (-1),
- total_bytes_ (0),
- state_ (IO_Handler::IDLE),
- connector_ (ioc),
- timeout_ (1),
- max_timeout_ (IO_Handler::MAX_RETRY_TIMEOUT),
- socket_queue_size_ (socket_queue_size)
-{
-}
-
-// Set the associated channel.
-
-void
-IO_Handler::active (int a)
-{
- this->state (a == 0 ? IO_Handler::IDLE : IO_Handler::ESTABLISHED);
-}
-
-// Get the associated channel.
-
-int
-IO_Handler::active (void)
-{
- return this->state () == IO_Handler::ESTABLISHED;
-}
-
-// Set the direction.
-
-void
-IO_Handler::direction (char d)
-{
- this->direction_ = d;
-}
-
-// Get the direction.
-
-char
-IO_Handler::direction (void)
-{
- return this->direction_;
-}
-
-// Sets the timeout delay.
-
-void
-IO_Handler::timeout (int to)
-{
- if (to > this->max_timeout_)
- to = this->max_timeout_;
-
- this->timeout_ = to;
-}
-
-// Recalculate the current retry timeout delay using exponential
-// backoff. Returns the original timeout (i.e., before the
-// recalculation).
-
-int
-IO_Handler::timeout (void)
-{
- int old_timeout = this->timeout_;
- this->timeout_ *= 2;
-
- if (this->timeout_ > this->max_timeout_)
- this->timeout_ = this->max_timeout_;
-
- return old_timeout;
-}
-
-// Sets the max timeout delay.
-
-void
-IO_Handler::max_timeout (int mto)
-{
- this->max_timeout_ = mto;
-}
-
-// Gets the max timeout delay.
-
-int
-IO_Handler::max_timeout (void)
-{
- return this->max_timeout_;
-}
-
-// Restart connection asynchronously when timeout occurs.
-
-int
-IO_Handler::handle_timeout (const ACE_Time_Value &, const void *)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) attempting to reconnect IO_Handler %d with timeout = %d\n",
- this->id (), this->timeout_));
- return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
-}
-
-// Restart connection (blocking_semantics dicates whether we
-// restart synchronously or asynchronously).
-
-int
-IO_Handler::reinitiate_connection (void)
-{
- // Skip over deactivated descriptors.
- if (this->get_handle () != -1)
- {
- // Make sure to close down peer to reclaim descriptor.
- this->peer ().close ();
-
-#if 0
-// if (this->state () == FAILED)
-// {
- // Reinitiate timeout to improve reconnection time.
-// this->timeout (1);
-#endif
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of IO_Handler %d\n",
- this->id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Service_Config::reactor ()->schedule_timer
- (this, 0, this->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- return 0;
-}
-
-// Handle shutdown of the IO_Handler object.
-
-int
-IO_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down IO_Handler %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- return this->reinitiate_connection ();
-}
-
-// Set the state of the channel.
-
-void
-IO_Handler::state (IO_Handler::State s)
-{
- this->state_ = s;
-}
-
-// Perform the first-time initiation of a connection to the peer.
-
-int
-IO_Handler::initialize_connection (void)
-{
- this->state_ = IO_Handler::ESTABLISHED;
-
- // Restart the timeout to 1.
- this->timeout (1);
-
-#if defined (ASSIGN_SUPPLIER_ID)
- // Action that sends the route id to the peerd.
-
- CONN_ID id = htons (this->id ());
-
- ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
-
- if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- n == 0 ? "gatewayd has closed down unexpectedly" : "send"),
- -1);
-#endif /* ASSIGN_SUPPLIER_ID */
- return 0;
-}
-
-// Set the size of the socket queue.
-
-void
-IO_Handler::socket_queue_size (void)
-{
- if (this->socket_queue_size_ > 0)
- {
- int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF;
-
- if (this->peer ().set_option (SOL_SOCKET, option,
- &this->socket_queue_size_, sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
- }
-}
-
-// Upcall from the ACE_Acceptor::handle_input() that
-// delegates control to our application-specific IO_Handler.
-
-int
-IO_Handler::open (void *a)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) IO_Handler's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn on non-blocking I/O.
- if (this->peer ().enable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call down to the base class to activate and register this handler.
- if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
-
- return this->initialize_connection ();
-}
-
-// Return the current state of the channel.
-
-IO_Handler::State
-IO_Handler::state (void)
-{
- return this->state_;
-}
-
-void
-IO_Handler::id (CONN_ID id)
-{
- this->id_ = id;
-}
-
-CONN_ID
-IO_Handler::id (void)
-{
- return this->id_;
-}
-
-// Set the peer's address information.
-int
-IO_Handler::bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- CONN_ID id)
-{
- this->remote_addr_ = remote_addr;
- this->local_addr_ = local_addr;
- this->id_ = id;
- return 0;
-}
-
-ACE_INET_Addr &
-IO_Handler::remote_addr (void)
-{
- return this->remote_addr_;
-}
-
-ACE_INET_Addr &
-IO_Handler::local_addr (void)
-{
- return this->local_addr_;
-}
-
-// Constructor sets the consumer map pointer.
-
-Consumer_Handler::Consumer_Handler (Consumer_Map *consumer_map,
- IO_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
-{
- this->direction_ = 'C';
- this->msg_queue ()->high_water_mark (Consumer_Handler::QUEUE_SIZE);
-}
-
-// This method should be called only when the peer shuts down
-// unexpectedly. This method simply marks the IO_Handler as
-// having failed so that handle_close () can reconnect.
-
-int
-Consumer_Handler::handle_input (ACE_HANDLE)
-{
- char buf[1];
-
- this->state (IO_Handler::FAILED);
-
- switch (this->peer ().recv (buf, sizeof buf))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has failed unexpectedly for Output IO_Handler %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case 0:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has shutdown unexpectedly for Output IO_Handler %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- default:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer is sending input on Output IO_Handler %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- }
-}
-
-// Perform a non-blocking put() of event MB. If we are unable to
-// send the entire event the remainder is re-queued at the *front* of
-// the Event_List.
-
-int
-Consumer_Handler::nonblk_put (ACE_Message_Block *mb)
-{
- // Try to send the event. If we don't send it all (e.g., due to
- // flow control), then re-queue the remainder at the head of the
- // Event_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
-
- ssize_t n = this->send (mb);
-
- if (n == -1)
- {
- // Things have gone wrong, let's try to close down and set up a new reconnection.
- this->state (IO_Handler::FAILED);
- this->handle_close ();
- return -1;
- }
- else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
- // ACE_Queue in *front* of the list to preserve order.
- if (this->msg_queue ()->enqueue_head
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
-
- // Tell ACE_Reactor to call us back when we can send again.
- else if (ACE_Service_Config::reactor ()->
- schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
- return 0;
- }
- else
- return n;
-}
-
-ssize_t
-Consumer_Handler::send (ACE_Message_Block *mb)
-{
- ssize_t len = mb->length ();
- ssize_t n = this->peer ().send (mb->rd_ptr (), len);
-
- if (n <= 0)
- return errno == EWOULDBLOCK ? 0 : n;
- else if (n < len)
- // Re-adjust pointer to skip over the part we did send.
- mb->rd_ptr (n);
- else /* if (n == length) */
- {
- // The whole event is sent, we can now safely deallocate the
- // buffer. Note that this should decrement a reference count...
- delete mb;
- errno = 0;
- }
- this->total_bytes (n);
- return n;
-}
-
-// Finish sending an event when flow control conditions abate.
-// This method is automatically called by the ACE_Reactor.
-
-int
-Consumer_Handler::handle_output (ACE_HANDLE)
-{
- ACE_Message_Block *mb = 0;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in handle_output on handle %d\n",
- this->get_handle ()));
- // The list had better not be empty, otherwise there's a bug!
-
- if (this->msg_queue ()->dequeue_head
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
- {
- switch (this->nonblk_put (mb))
- {
- case 0: // Partial send.
- ACE_ASSERT (errno == EWOULDBLOCK);
- // Didn't write everything this time, come back later...
- break;
-
- case -1:
- // Caller is responsible for freeing a ACE_Message_Block if failures occur.
- delete mb;
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
-
- /* FALLTHROUGH */
- default: // Sent the whole thing.
-
- // If we succeed in writing the entire event (or we did not
- // fail due to EWOULDBLOCK) then check if there are more
- // events on the Event_List. If there aren't, tell the
- // ACE_Reactor not to notify us anymore (at least until
- // there are new events queued up).
-
- if (this->msg_queue ()->is_empty ())
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) queueing deactivated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
-
- if (ACE_Service_Config::reactor ()->
- cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
- }
- }
- }
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
- return 0;
-}
-
-// Send an event to a peer (may queue if necessary).
-
-int
-Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- if (this->msg_queue ()->is_empty ())
- // Try to send the event *without* blocking!
- return this->nonblk_put (mb);
- else
- // If we have queued up events due to flow control then just
- // enqueue and return.
- return this->msg_queue ()->enqueue_tail
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Constructor sets the consumer map pointer and the connector
-// pointer.
-
-Supplier_Handler::Supplier_Handler (Consumer_Map *consumer_map,
- IO_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : msg_frag_ (0),
- IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
-{
- this->direction_ = 'S';
- this->msg_queue ()->high_water_mark (0);
-}
-
-// Receive a Peer event from peerd. Handles fragmentation.
-//
-// The routing event returned from recv consists of two parts:
-// 1. The Address part, contains the virtual routing id.
-// 2. The Data part, which contains the actual data to be routed.
-//
-// The reason for having two parts is to shield the higher layers
-// of software from knowledge of the event structure.
-
-int
-Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
-{
- Event *event;
- ssize_t len;
- ssize_t n = 0;
- size_t offset = 0;
-
- if (this->msg_frag_ == 0)
- // No existing fragment...
- ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- event = (Event *) this->msg_frag_->rd_ptr ();
-
- const ssize_t HEADER_SIZE = sizeof (Event_Header);
- ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length ();
-
- if (header_bytes_left_to_read > 0)
- {
- n = this->peer ().recv (this->msg_frag_->wr_ptr (),
- header_bytes_left_to_read);
-
- if (n == -1 /* error */
- || n == 0 /* EOF */)
- {
- ACE_ERROR ((LM_ERROR, "%p\n",
- "Recv error during header read "));
- ACE_DEBUG ((LM_DEBUG,
- "attempted to read %d\n",
- header_bytes_left_to_read));
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return n;
- }
-
- // Bump the write pointer by the amount read.
- this->msg_frag_->wr_ptr (n);
-
- // At this point we may or may not have the ENTIRE header.
- if (this->msg_frag_->length () < HEADER_SIZE)
- {
- ACE_DEBUG ((LM_DEBUG,
- "Partial header received: only %d bytes\n",
- this->msg_frag_->length ()));
- // Notify the caller that we didn't get an entire event.
- errno = EWOULDBLOCK;
- return -1;
- }
- }
-
- // At this point there is a complete, valid header in msg_frag_
- len = sizeof event->buf_ + HEADER_SIZE - this->msg_frag_->length ();
-
- ssize_t m = this->peer ().recv (event->buf_ + offset, len);
-
- // Try to receive the remainder of the event
-
- switch (m)
- {
- case -1:
- if (errno == EWOULDBLOCK)
- {
- // This shouldn't happen since the ACE_Reactor
- // just triggered us to handle pending I/O!
- ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n"));
- errno = EWOULDBLOCK;
- return -1;
- }
- else
- /* FALLTHROUGH */;
-
- case 0: // Premature EOF.
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return 0;
-
- default:
- if (m != len)
- // Re-adjust pointer to skip over the part we've read.
- {
- this->msg_frag_->wr_ptr (m);
- errno = EWOULDBLOCK;
- return -1; // Inform caller that we didn't get the whole event.
- }
- else
- {
- // Set the write pointer at 1 past the end of the event.
- this->msg_frag_->wr_ptr (m);
-
- // Set the read pointer to the beginning of the event.
- this->msg_frag_->rd_ptr (this->msg_frag_->base ());
-
- // Allocate an event forwarding header and chain the data
- // portion onto its continuation field.
- ACE_NEW_RETURN (forward_addr,
- ACE_Message_Block (sizeof (Event_Addr),
- ACE_Message_Block::MB_PROTO,
- this->msg_frag_),
- -1);
-
- Event_Addr event_addr (this->id (), event->header_.routing_id_, 0);
- // Copy the forwarding address from the Event_Addr into
- // forward_addr.
- forward_addr->copy ((char *) &event_addr, sizeof (Event));
-
- // Reset the pointer to indicate we've got an entire event.
- this->msg_frag_ = 0;
- }
- this->total_bytes (m + n);
-#if defined (VERBOSE)
- ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s",
- event_addr.conn_id_, event->header_.routing_id_, event->header_.len_,
- event->header_.len_, event->buf_));
-#else
- ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n",
- event->header_.routing_id_, event->header_.len_, this->total_bytes ()));
-#endif
- return m + n;
- }
-}
-
-// Receive various types of input (e.g., Peer event from the
-// gatewayd, as well as stdio).
-
-int
-Supplier_Handler::handle_input (ACE_HANDLE)
-{
- ACE_Message_Block *forward_addr = 0;
-
- switch (this->recv (forward_addr))
- {
- case 0:
- // Note that a peer should never initiate a shutdown.
- this->state (IO_Handler::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has closed down unexpectedly for Input IO_Handler %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case -1:
- if (errno == EWOULDBLOCK)
- // A short-read, we'll come back and finish it up later on!
- return 0;
- else // A weird problem occurred, shut down and start again.
- {
- this->state (IO_Handler::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input IO_Handler %d\n",
- "Peer has failed unexpectedly",
- this->id ()), -1);
- }
- /* NOTREACHED */
- default:
- return this->forward (forward_addr);
- }
-}
-
-// Route an event to its appropriate destination.
-
-int
-Supplier_Handler::forward (ACE_Message_Block *forward_addr)
-{
- // We got a valid event, so determine its virtual routing id,
- // which is stored in the first of the two event blocks chained
- // together.
-
- Event_Addr *forwarding_key = (Event_Addr *) forward_addr->rd_ptr ();
-
- // Skip over the address portion.
- const ACE_Message_Block *const data = forward_addr->cont ();
-
- // RE points to the routing entry located for this routing id.
- Consumer_Entry *re = 0;
-
- if (this->consumer_map_->find (*forwarding_key, re) != -1)
- {
- // Check to see if there are any destinations.
- if (re->destinations ()->size () == 0)
- ACE_DEBUG ((LM_WARNING,
- "there are no active destinations for this event currently\n"));
-
- else // There are destinations, so forward the event.
- {
- Consumer_Entry::ENTRY_SET *esp = re->destinations ();
- Consumer_Entry::ENTRY_ITERATOR si (*esp);
-
- for (IO_Handler **channel = 0; si.next (channel) != 0; si.advance ())
- {
- // Only process active channels.
- if ((*channel)->active ())
- {
- // Clone the event portion (should be doing reference counting here...)
- ACE_Message_Block *newmsg = data->clone ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ()));
-
- if ((*channel)->put (newmsg) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping events"));
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n",
- "put", (*channel)->id ()));
-
- // Caller is responsible for freeing a ACE_Message_Block if failures occur.
- delete newmsg;
- }
- }
- }
- // Will become superfluous once we have reference counting...
- delete forward_addr;
- return 0;
- }
- }
- delete forward_addr;
- // Failure return.
- ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
- forwarding_key->conn_id_, forwarding_key->logical_id_, forwarding_key->payload_));
- return 0;
-}
-
-#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX>;
-template class ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX>;
-template class ACE_Map_Entry<Event_Addr, Consumer_Entry *>;
-#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/IO_Handler.h b/apps/Gateway/Gateway/IO_Handler.h
deleted file mode 100644
index 7bda073f09b..00000000000
--- a/apps/Gateway/Gateway/IO_Handler.h
+++ /dev/null
@@ -1,224 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// IO_Handler.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_IO_HANDLER)
-#define _IO_HANDLER
-
-#include "ace/Service_Config.h"
-#include "ace/SOCK_Connector.h"
-#include "ace/Svc_Handler.h"
-#include "Consumer_Map.h"
-#include "Consumer_Entry.h"
-#include "Event.h"
-
-// Forward declaration.
-class IO_Handler_Connector;
-
-class IO_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
- // = TITLE
- // IO_Handler contains info about connection state and addressing.
- //
- // = DESCRIPTION
- // The IO_Handler classes process events sent from the peers to the
- // gateway. These classes works as follows:
- //
- // 1. IO_Handler_Connector creates a number of connections with the set of
- // peers specified in a configuration file.
- //
- // 2. For each peer that connects successfully, IO_Handler_Connector
- // creates an IO_Handler object. Each object assigns a unique routing
- // id to its associated peer. The Handlers are used by gatewayd
- // that to receive, route, and forward events from source peer(s)
- // to destination peer(s).
-{
-public:
- IO_Handler (Consumer_Map *,
- IO_Handler_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
-
- virtual int open (void * = 0);
- // Initialize and activate a single-threaded IO_Handler (called by
- // ACE_Connector::handle_output()).
-
- int bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- CONN_ID);
- // Set the peer's addressing and routing information.
-
- ACE_INET_Addr &remote_addr (void);
- // Returns the peer's routing address.
-
- ACE_INET_Addr &local_addr (void);
- // Returns our local address.
-
- // = Set/get routing id.
- CONN_ID id (void);
- void id (CONN_ID);
-
- // = Set/get the current state of the IO_Handler.
- enum State
- {
- IDLE = 1, // Prior to initialization.
- CONNECTING, // During connection establishment.
- ESTABLISHED, // IO_Handler is established and active.
- DISCONNECTING, // IO_Handler is in the process of connecting.
- FAILED // IO_Handler has failed.
- };
-
- // = Set/get the current state.
- State state (void);
- void state (State);
-
- // = Set/get the current retry timeout delay.
- int timeout (void);
- void timeout (int);
-
- // = Set/get the maximum retry timeout delay.
- int max_timeout (void);
- void max_timeout (int);
-
- // = Set/get IO_Handler activity status.
- int active (void);
- void active (int);
-
- // = Set/get direction (necessary for error checking).
- char direction (void);
- void direction (char);
-
- // = The total number of bytes sent/received on this channel.
- size_t total_bytes (void);
- void total_bytes (size_t bytes);
- // Increment count by <bytes>.
-
- virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
- // Perform timer-based IO_Handler reconnection.
-
-protected:
- enum
- {
- MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout.
- };
-
- int initialize_connection (void);
- // Perform the first-time initiation of a connection to the peer.
-
- int reinitiate_connection (void);
- // Reinitiate a connection asynchronously when peers fail.
-
- void socket_queue_size (void);
- // Set the socket queue size.
-
- virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
- // Perform IO_Handler termination.
-
- Consumer_Map *consumer_map_;
- // Pointer to table that maps an event
- // to a Set of IO_Handler *'s for output.
-
- ACE_INET_Addr remote_addr_;
- // Address of peer.
-
- ACE_INET_Addr local_addr_;
- // Address of us.
-
- CONN_ID id_;
- // The assigned routing ID of this entry.
-
- size_t total_bytes_;
- // The total number of bytes sent/received on this channel.
-
- State state_;
- // The current state of the channel.
-
- IO_Handler_Connector *connector_;
- // Back pointer to IO_Handler_Connector to reestablish broken
- // connections.
-
- int timeout_;
- // Amount of time to wait between reconnection attempts.
-
- int max_timeout_;
- // Maximum amount of time to wait between reconnection attempts.
-
- char direction_;
- // Indicates which direction data flows through the channel ('O' ==
- // output and 'I' == input).
-
- int socket_queue_size_;
- // Size of the socket queue (0 means "use default").
-};
-
-class Supplier_Handler : public IO_Handler
- // = TITLE
- // Handle reception of Peer events arriving as events.
-{
-public:
- Supplier_Handler (Consumer_Map *,
- IO_Handler_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
- // Constructor sets the consumer map pointer.
-
- virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- // Receive and process peer events.
-
-protected:
- virtual int recv (ACE_Message_Block *&);
- // Receive an event from a Supplier.
-
- int forward (ACE_Message_Block *event);
- // Forward the Event to a Consumer.
-
- ACE_Message_Block *msg_frag_;
- // Keep track of event fragment to handle non-blocking recv's from
- // Suppliers.
-};
-
-class Consumer_Handler : public IO_Handler
- // = TITLE
- // Handle transmission of events to other Peers using a
- // single-threaded approach.
-{
-public:
- Consumer_Handler (Consumer_Map *,
- IO_Handler_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- // Send an event to a Consumer (may be queued if necessary).
-
-protected:
- // = We'll allow up to 16 megabytes to be queued per-output
- // channel.
- enum {QUEUE_SIZE = 1024 * 1024 * 16};
-
- virtual int handle_input (ACE_HANDLE);
- // Receive and process shutdowns from a Consumer.
-
- virtual int handle_output (ACE_HANDLE);
- // Finish sending event when flow control conditions abate.
-
- int nonblk_put (ACE_Message_Block *mb);
- // Perform a non-blocking put().
-
- virtual ssize_t send (ACE_Message_Block *);
- // Send an event to a Consumer.
-};
-
-#endif /* _IO_HANDLER */
diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.cpp b/apps/Gateway/Gateway/IO_Handler_Connector.cpp
deleted file mode 100644
index 712b348951d..00000000000
--- a/apps/Gateway/Gateway/IO_Handler_Connector.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-#include "IO_Handler_Connector.h"
-// $Id$
-
-
-IO_Handler_Connector::IO_Handler_Connector (void)
-{
-}
-
-// Override the connection-failure method to add timer support.
-// Note that these timers perform "expoential backoff" to
-// avoid rapidly trying to reestablish connections when a link
-// goes down.
-
-int
-IO_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
-{
- ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0;
-
- // Locate the ACE_Svc_Handler corresponding to the socket descriptor.
- if (this->handler_map_.find (sd, stp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n",
- sd, "find"), -1);
-
- IO_Handler *channel = stp->svc_handler ();
-
- // Schedule a reconnection request at some point in the future
- // (note that channel uses an exponential backoff scheme).
- if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0,
- channel->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- return 0;
-}
-
-// Initiate (or reinitiate) a connection to the IO_Handler.
-
-int
-IO_Handler_Connector::initiate_connection (IO_Handler *channel,
- ACE_Synch_Options &synch_options)
-{
- char buf[MAXHOSTNAMELEN];
-
- // Mark ourselves as idle so that the various iterators
- // will ignore us until we are reconnected.
- channel->state (IO_Handler::IDLE);
-
- if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1
- || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "can't obtain peer's address"), -1);
-
- // Try to connect to the Peer.
-
- if (this->connect (channel, channel->remote_addr (),
- synch_options, channel->local_addr ()) == -1)
- {
- if (errno != EWOULDBLOCK)
- {
- channel->state (IO_Handler::FAILED);
- ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
- "connect", buf));
-
- // Reschedule ourselves to try and connect again.
- if (synch_options[ACE_Synch_Options::USE_REACTOR])
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (channel, 0, channel->timeout ()) == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- else
- // Failures on synchronous connects are reported as errors
- // so that the caller can decide how to proceed.
- return -1;
- }
- else
- {
- channel->state (IO_Handler::CONNECTING);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in the process of connecting %s to %s\n",
- synch_options[ACE_Synch_Options::USE_REACTOR]
- ? "asynchronously" : "synchronously", buf));
- }
- }
- else
- {
- channel->state (IO_Handler::ESTABLISHED);
- ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
- buf, channel->get_handle ()));
- }
- return 0;
-}
diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.h b/apps/Gateway/Gateway/IO_Handler_Connector.h
deleted file mode 100644
index 585428c88ee..00000000000
--- a/apps/Gateway/Gateway/IO_Handler_Connector.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// IO_Handler_Connector.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_IO_HANDLER_CONNECTOR)
-#define _IO_HANDLER_CONNECTOR
-
-#include "ace/Connector.h"
-#include "Thr_IO_Handler.h"
-
-class IO_Handler_Connector : public ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>
- // = TITLE
- // A concrete factory class that setups connections to peerds
- // and produces a new IO_Handler object to do the dirty work...
-{
-public:
- IO_Handler_Connector (void);
-
- // Initiate (or reinitiate) a connection on the IO_Handler.
- int initiate_connection (IO_Handler *,
- ACE_Synch_Options & = ACE_Synch_Options::synch);
-
-protected:
- // Override the connection-failure method to add timer support.
- virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask);
-};
-
-#endif /* _IO_HANDLER_CONNECTOR */
diff --git a/apps/Gateway/Gateway/Peer_Message.h b/apps/Gateway/Gateway/Peer_Message.h
deleted file mode 100644
index d9e65650095..00000000000
--- a/apps/Gateway/Gateway/Peer_Message.h
+++ /dev/null
@@ -1,89 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Peer_Message.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (PEER_MESSAGE)
-#define PEER_MESSAGE
-
-// This is the unique connection identifier that denotes a particular
-// Channel in the Gateway.
-typedef short CONN_ID;
-
-class Peer_Addr
- // = TITLE
- // Peer address is used to identify the source/destination of a
- // routing message.
-{
-public:
- Peer_Addr (CONN_ID cid = -1, u_char lid = 0, u_char pay = 0)
- : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {}
-
- int operator== (const Peer_Addr &pa) const
- {
- return this->conn_id_ == pa.conn_id_
- && this->logical_id_ == pa.logical_id_
- && this->payload_ == pa.payload_;
- }
-
- CONN_ID conn_id_;
- // Unique connection identifier that denotes a particular Channel.
-
- u_char logical_id_;
- // Logical ID.
-
- u_char payload_;
- // Payload type.
-};
-
-
-class Peer_Header
- // = TITLE
- // Fixed sized header.
-{
-public:
- typedef u_short ROUTING_ID;
- // Type used to route messages from gatewayd.
-
- enum
- {
- INVALID_ID = -1 // No peer can validly use this number.
- };
-
- ROUTING_ID routing_id_;
- // Source ID.
-
- size_t len_;
- // Length of the message in bytes.
-};
-
-class Peer_Message
- // = TITLE
- // Variable-sized message (buf_ may be variable-sized between
- // 0 and MAX_PAYLOAD_SIZE).
-{
-public:
- enum { MAX_PAYLOAD_SIZE = 1024 };
- // The maximum size of an Peer message (see Peer protocol specs for
- // exact #).
-
- Peer_Header header_;
- // Message header.
-
- char buf_[MAX_PAYLOAD_SIZE];
- // Message payload.
-};
-
-#endif /* PEER_MESSAGE */
diff --git a/apps/Gateway/Gateway/Routing_Entry.cpp b/apps/Gateway/Gateway/Routing_Entry.cpp
deleted file mode 100644
index cc270cfac3a..00000000000
--- a/apps/Gateway/Gateway/Routing_Entry.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-// Defines an entry in the Routing Table.
-// $Id$
-
-#include "Routing_Entry.h"
-
-Routing_Entry::Routing_Entry (int validity_interval)
- : validity_interval_ (validity_interval)
-{
- ACE_NEW (this->destinations_, Routing_Entry::ENTRY_SET);
-}
-
-Routing_Entry::~Routing_Entry (void)
-{
- delete this->destinations_;
-}
-
-// Get the associated set of destinations.
-
-Routing_Entry::ENTRY_SET *
-Routing_Entry::destinations (void)
-{
- return this->destinations_;
-}
-
-// Set the associated set of destinations.
-
-void
-Routing_Entry::destinations (Routing_Entry::ENTRY_SET *s)
-{
- this->destinations_ = s;
-}
-
-// Get the current validity interval for this route.
-
-int
-Routing_Entry::validity_interval (void)
-{
- return this->validity_interval_;
-}
-
-// Set the current validity interval for this route.
-
-void
-Routing_Entry::validity_interval (int vi)
-{
- this->validity_interval_ = vi;
-}
diff --git a/apps/Gateway/Gateway/Routing_Entry.h b/apps/Gateway/Gateway/Routing_Entry.h
deleted file mode 100644
index ab8e0eee53d..00000000000
--- a/apps/Gateway/Gateway/Routing_Entry.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Routing_Entry.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_ROUTING_ENTRY)
-#define _ROUTING_ENTRY
-
-#include "ace/Set.h"
-
-// Forward reference.
-class Channel;
-
-class Routing_Entry
-{
- // = TITLE
- // Defines an entry in the Routing_Table.
-public:
- Routing_Entry (int validity_interval = 0);
- ~Routing_Entry (void);
-
- typedef ACE_Unbounded_Set<Channel *> ENTRY_SET;
- typedef ACE_Unbounded_Set_Iterator<Channel *> ENTRY_ITERATOR;
-
- // = Set/get the associated set of destinations.
- ENTRY_SET *destinations (void);
- void destinations (ENTRY_SET *);
-
- // = Set/get current validity interval for this routing entry.
- int validity_interval (void);
- void validity_interval (int);
-
-protected:
- ENTRY_SET *destinations_;
- // The set of destinations;
-
- int validity_interval_;
- // The current validity interval of this link.
-};
-
-#endif /* _ROUTING_ENTRY */
diff --git a/apps/Gateway/Gateway/Routing_Table.cpp b/apps/Gateway/Gateway/Routing_Table.cpp
deleted file mode 100644
index 3ef2f21bc1f..00000000000
--- a/apps/Gateway/Gateway/Routing_Table.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-#if !defined (_ROUTING_TABLE_C)
-#define _ROUTING_TABLE_C
-
-
-#include "Routing_Table.h"
-
-/* Bind the EXT_ID to the INT_ID. */
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Table<EXT_ID, INT_ID, LOCK>::bind (EXT_ID ext_id, INT_ID *int_id)
-{
- return this->map_.bind (ext_id, int_id);
-}
-
-/* Find the INT_ID corresponding to the EXT_ID. */
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Table<EXT_ID, INT_ID, LOCK>::find (EXT_ID ext_id, INT_ID *&int_id)
-{
- return this->map_.find (ext_id, int_id);
-}
-
-/* Unbind (remove) the EXT_ID from the map. */
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Table<EXT_ID, INT_ID, LOCK>::unbind (EXT_ID ext_id)
-{
- return this->map_.unbind (ext_id);
-}
-
-template <class EXT_ID, class INT_ID, class LOCK>
-Routing_Iterator<EXT_ID, INT_ID, LOCK>::Routing_Iterator (Routing_Table<EXT_ID,
- INT_ID, LOCK> &rt,
- int ignore_inactive)
- : map_iter_ (rt.map_),
- ignore_inactive_ (ignore_inactive)
-{
-}
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Iterator<EXT_ID, INT_ID, LOCK>::next (INT_ID *&ss)
-{
- // Loop in order to skip over inactive entries if necessary.
-
- for (ACE_Map_Entry<EXT_ID, INT_ID *> *temp = 0;
- this->map_iter_.next (temp) != 0;
- this->advance ())
- {
- // Skip over inactive entries if necessary.
- if (temp->int_id_->active () == 0 && this->ignore_inactive_)
- continue;
-
- // Otherwise, return the next item.
- ss = temp->int_id_;
- return 1;
- }
- return 0;
-}
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Iterator<EXT_ID, INT_ID, LOCK>::advance (void)
-{
- return this->map_iter_.advance ();
-}
-#endif /* _ROUTING_TABLE_C */
diff --git a/apps/Gateway/Gateway/Routing_Table.h b/apps/Gateway/Gateway/Routing_Table.h
deleted file mode 100644
index 84194f13e49..00000000000
--- a/apps/Gateway/Gateway/Routing_Table.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Routing_Table.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_ROUTING_TABLE_H)
-#define _ROUTING_TABLE_H
-
-#include "ace/Map_Manager.h"
-
-template <class EXT_ID, class INT_ID, class LOCK>
-class Routing_Table
-{
- // = TITLE
- // Define a generic routing table based on the ACE Map_Manager.
- //
- // = DESCRIPTION
- // We need to have this table, rather than just using the Map_Manager
- // directly in order to ignore "inactive" routing entries...
-public:
- int bind (EXT_ID ext_id, INT_ID *int_id);
- // Associate EXT_ID with the INT_ID.
-
- int find (EXT_ID ext_id, INT_ID *&int_id);
- // Break any association of EXID.
-
- int unbind (EXT_ID ext_id);
- // Locate EXID and pass out parameter via INID. If found,
- // return 0, else -1.
-
-public:
- ACE_Map_Manager<EXT_ID, INT_ID *, LOCK> map_;
- // Map external IDs to internal IDs.
-};
-
-template <class EXT_ID, class INT_ID, class LOCK>
-class Routing_Iterator
-{
- // = TITLE
- // Define an iterator for the Routing Table.
-public:
- Routing_Iterator (Routing_Table<EXT_ID, INT_ID, LOCK> &mm,
- int ignore_inactive = 1);
- int next (INT_ID *&);
- int advance (void);
-
-private:
- ACE_Map_Iterator<EXT_ID, INT_ID *, LOCK> map_iter_;
- int ignore_inactive_;
-};
-
-#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
-#include "Routing_Table.cpp"
-#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-#endif /* _ROUTING_TABLE_H */
diff --git a/apps/Gateway/Gateway/Thr_Channel.cpp b/apps/Gateway/Gateway/Thr_Channel.cpp
deleted file mode 100644
index 26e385e2727..00000000000
--- a/apps/Gateway/Gateway/Thr_Channel.cpp
+++ /dev/null
@@ -1,204 +0,0 @@
-#include "Thr_Channel.h"
-// $Id$
-
-#include "Channel_Connector.h"
-
-#if defined (ACE_HAS_THREADS)
-Thr_Output_Channel::Thr_Output_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Output_Channel (rt, cc, thr_mgr, socket_queue_size)
-{
-}
-
-// This method should be called only when the peer shuts down
-// unexpectedly. This method marks the Channel as having failed and
-// deactivates the ACE_Message_Queue (to wake up the thread blocked on
-// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
-// eventually try to reconnect...
-
-int
-Thr_Output_Channel::handle_input (ACE_HANDLE h)
-{
- this->Output_Channel::handle_input (h);
- ACE_Service_Config::reactor ()->remove_handler (h,
- ACE_Event_Handler::RWE_MASK
- | ACE_Event_Handler::DONT_CALL);
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
- return 0;
-}
-
-// Initialize the threaded Output_Channel object and spawn a new
-// thread.
-
-int
-Thr_Output_Channel::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Register ourselves to receive input events (which indicate that
- // the Peer has shut down unexpectedly).
- if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// ACE_Queue up a message for transmission (must not block since all
-// Input_Channels are single-threaded).
-
-int
-Thr_Output_Channel::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- // Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Transmit messages to the peer (note simplification resulting from
-// threads...)
-
-int
-Thr_Output_Channel::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Output_Channel's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread it is OK to block on
- // output.
-
- for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1; )
- if (this->send_peer (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Output_Channel %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
-
- return 0;
-}
-
-Thr_Input_Channel::Thr_Input_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Input_Channel (rt, cc, thr_mgr, socket_queue_size)
-{
-}
-
-int
-Thr_Input_Channel::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Receive messages from a Peer in a separate thread (note reuse of
-// existing code!).
-
-int
-Thr_Input_Channel::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Input_Channel's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread and processes
- // messages for one connection it is OK to block on input and
- // output.
-
- while (this->handle_input () != -1)
- continue;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Input_Channel %d on handle %d\n",
- this->id (),
- this->get_handle ()));
-
- this->peer ().close ();
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n", tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Thr_Channel.h b/apps/Gateway/Gateway/Thr_Channel.h
deleted file mode 100644
index a1dc91b1619..00000000000
--- a/apps/Gateway/Gateway/Thr_Channel.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Thr_Channel.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_THR_CHANNEL)
-#define _THR_CHANNEL
-
-#include "Channel.h"
-
-#if defined (ACE_HAS_THREADS)
-class Thr_Output_Channel : public Output_Channel
- // = TITLE
- // Runs each Output Channel in a separate thread.
-{
-public:
- Thr_Output_Channel (ROUTING_TABLE *,
- Channel_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the threaded Output_Channel object and spawn a new
- // thread.
-
- virtual int handle_input (ACE_HANDLE);
- // Called when Peer shutdown unexpectedly.
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- // Send a message to a peer.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-
-class Thr_Input_Channel : public Input_Channel
- // = TITLE
- // Runs each Input Channel in a separate thread.
-{
-public:
- Thr_Input_Channel (ROUTING_TABLE *,
- Channel_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the object and spawn a new thread.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-#endif /* ACE_HAS_THREADS */
-#endif /* _THR_CHANNEL */
diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.cpp b/apps/Gateway/Gateway/Thr_IO_Handler.cpp
deleted file mode 100644
index 109cfad9c3f..00000000000
--- a/apps/Gateway/Gateway/Thr_IO_Handler.cpp
+++ /dev/null
@@ -1,204 +0,0 @@
-#include "Thr_IO_Handler.h"
-// $Id$
-
-#include "IO_Handler_Connector.h"
-
-#if defined (ACE_HAS_THREADS)
-Thr_Consumer_Handler::Thr_Consumer_Handler (Consumer_Map *consumer_map,
- IO_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Consumer_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
-{
-}
-
-// This method should be called only when the peer shuts down
-// unexpectedly. This method marks the IO_Handler as having failed and
-// deactivates the ACE_Message_Queue (to wake up the thread blocked on
-// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
-// eventually try to reconnect...
-
-int
-Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
-{
- this->Consumer_Handler::handle_input (h);
- ACE_Service_Config::reactor ()->remove_handler (h,
- ACE_Event_Handler::RWE_MASK
- | ACE_Event_Handler::DONT_CALL);
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
- return 0;
-}
-
-// Initialize the threaded Consumer_Handler object and spawn a new
-// thread.
-
-int
-Thr_Consumer_Handler::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Register ourselves to receive input events (which indicate that
- // the Peer has shut down unexpectedly).
- if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// ACE_Queue up a message for transmission (must not block since all
-// Supplier_Handlers are single-threaded).
-
-int
-Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- // Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Transmit messages to the peer (note simplification resulting from
-// threads...)
-
-int
-Thr_Consumer_Handler::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Handler's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread it is OK to block on
- // output.
-
- for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1; )
- if (this->send (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Handler %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
-
- return 0;
-}
-
-Thr_Supplier_Handler::Thr_Supplier_Handler (Consumer_Map *consumer_map,
- IO_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Supplier_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
-{
-}
-
-int
-Thr_Supplier_Handler::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Receive messages from a Peer in a separate thread (note reuse of
-// existing code!).
-
-int
-Thr_Supplier_Handler::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Handler's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread and processes
- // messages for one connection it is OK to block on input and
- // output.
-
- while (this->handle_input () != -1)
- continue;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Supplier_Handler %d on handle %d\n",
- this->id (),
- this->get_handle ()));
-
- this->peer ().close ();
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n", tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.h b/apps/Gateway/Gateway/Thr_IO_Handler.h
deleted file mode 100644
index ee056b35361..00000000000
--- a/apps/Gateway/Gateway/Thr_IO_Handler.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Thr_IO_Handler.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_THR_IO_HANDLER)
-#define _THR_IO_HANDLER
-
-#include "IO_Handler.h"
-
-#if defined (ACE_HAS_THREADS)
-class Thr_Consumer_Handler : public Consumer_Handler
- // = TITLE
- // Runs each Output IO_Handler in a separate thread.
-{
-public:
- Thr_Consumer_Handler (Consumer_Map *,
- IO_Handler_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the threaded Consumer_Handler object and spawn a new
- // thread.
-
- virtual int handle_input (ACE_HANDLE);
- // Called when Peer shutdown unexpectedly.
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- // Send a message to a peer.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-
-class Thr_Supplier_Handler : public Supplier_Handler
- // = TITLE
- // Runs each Input IO_Handler in a separate thread.
-{
-public:
- Thr_Supplier_Handler (Consumer_Map *,
- IO_Handler_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the object and spawn a new thread.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-#endif /* ACE_HAS_THREADS */
-#endif /* _THR_IO_HANDLER */
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
deleted file mode 100644
index f316e4e82bf..00000000000
--- a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
+++ /dev/null
@@ -1,211 +0,0 @@
-// $Id$
-
-#include "Event_Channel.h"
-#include "Thr_Proxy_Handler.h"
-
-#if defined (ACE_HAS_THREADS)
-Thr_Consumer_Proxy::Thr_Consumer_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : Consumer_Proxy (ec, remote_addr, local_addr, conn_id)
-{
-}
-
-// This method should be called only when the Consumer shuts down
-// unexpectedly. This method marks the Proxy_Handler as having failed
-// and deactivates the ACE_Message_Queue (to wake up the thread
-// blocked on <dequeue_head> in svc()).
-// Thr_Output_Handler::handle_close () will eventually try to
-// reconnect...
-
-int
-Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
-{
- // Call down to the <Consumer_Proxy> to handle this first.
- this->Consumer_Proxy::handle_input (h);
-
- ACE_Service_Config::reactor ()->remove_handler
- (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
- return 0;
-}
-
-// Initialize the threaded Consumer_Proxy object and spawn a new
-// thread.
-
-int
-Thr_Consumer_Proxy::open (void *)
-{
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call back to the <Event_Channel> to complete our initialization.
- else if (this->event_channel_.complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
-
- // Register ourselves to receive input events (which indicate that
- // the Consumer has shut down unexpectedly).
- else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // events to Consumers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Queue up an event for transmission (must not block since
-// Supplier_Proxys may be single-threaded).
-
-int
-Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- // Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Transmit events to the peer (note simplification resulting from
-// threads...)
-
-int
-Thr_Consumer_Proxy::svc (void)
-{
-
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) connected! Thr_Consumer_Proxy's handle = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread it is OK to block on
- // output.
-
- for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1;
- )
- {
- if (this->send (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
- }
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->event_channel_.initiate_proxy_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
-
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
-
- ACE_OS::sleep (tv);
- }
- }
-
- return 0;
-}
-
-Thr_Supplier_Proxy::Thr_Supplier_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : Supplier_Proxy (ec, remote_addr, local_addr, conn_id)
-{
-}
-
-int
-Thr_Supplier_Proxy::open (void *)
-{
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call back to the <Event_Channel> to complete our initialization.
- else if (this->event_channel_.complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // events to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Receive events from a Peer in a separate thread (note reuse of
-// existing code!).
-
-int
-Thr_Supplier_Proxy::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) connected! Thr_Supplier_Proxy's handle = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread and processes events
- // for one connection it is OK to call down to the
- // <Supplier_Proxy::handle_input> method, which blocks on input.
-
- while (this->handle_input () != -1)
- continue;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->event_channel_.initiate_proxy_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h
deleted file mode 100644
index 275bc87b320..00000000000
--- a/apps/Gateway/Gateway/Thr_Proxy_Handler.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Thr_Proxy_Handler.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_THR_IO_HANDLER)
-#define _THR_IO_HANDLER
-
-#include "Proxy_Handler.h"
-
-#if defined (ACE_HAS_THREADS)
-class Thr_Consumer_Proxy : public Consumer_Proxy
- // = TITLE
- // Runs each Output Proxy_Handler in a separate thread.
-{
-public:
- Thr_Consumer_Proxy (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
-
- virtual int open (void *);
- // Initialize the threaded Consumer_Proxy object and spawn a new
- // thread.
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- // Send a message to a peer.
-
-protected:
- virtual int handle_input (ACE_HANDLE);
- // Called when Peer shutdown unexpectedly.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-
-class Thr_Supplier_Proxy : public Supplier_Proxy
- // = TITLE
- // Runs each Input Proxy_Handler in a separate thread.
-{
-public:
- Thr_Supplier_Proxy (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
-
- virtual int open (void *);
- // Initialize the object and spawn a new thread.
-
-protected:
- virtual int svc (void);
- // Transmit peer messages.
-};
-#endif /* ACE_HAS_THREADS */
-#endif /* _THR_IO_HANDLER */
diff --git a/apps/Gateway/Gateway/cc_config b/apps/Gateway/Gateway/cc_config
deleted file mode 100644
index 96f9ebdedd7..00000000000
--- a/apps/Gateway/Gateway/cc_config
+++ /dev/null
@@ -1,10 +0,0 @@
-# Conn ID Hostname Remote Port Direction Max Retry Delay Local Port
-# ------- -------- ---- --------- --------------- ----------
- 1 tango.cs 10004 I 32 20000
-# 2 tango.cs 10004 O 32
- 3 merengue.cs 10004 O 32 20001
-# 4 mambo.cs 10004 O 32 20000
-# 5 lambada.cs 10004 O 32 20000
-# 6 tango.cs 10004 O 32 20000
-# 7 tango.cs 5001 I 32
-# 8 tango.cs 5002 O 32
diff --git a/apps/Gateway/Gateway/rt_config b/apps/Gateway/Gateway/rt_config
deleted file mode 100644
index e951a0f09be..00000000000
--- a/apps/Gateway/Gateway/rt_config
+++ /dev/null
@@ -1,7 +0,0 @@
-# Conn ID Logical ID Payload Destinations
-# ------- ---------- ------- ------------
-# 1 1 0 3,4,5
- 1 1 0 3
- 3 1 0 3
-# 4 1 0 4
-# 5 1 0 5