summaryrefslogtreecommitdiff
path: root/apps/Gateway
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway')
-rw-r--r--apps/Gateway/Gateway/Channel.cpp710
-rw-r--r--apps/Gateway/Gateway/Channel_Connector.cpp92
-rw-r--r--apps/Gateway/Gateway/Channel_Connector.h41
-rw-r--r--apps/Gateway/Gateway/Concurrency_Strategies.h74
-rw-r--r--apps/Gateway/Gateway/Config_Files.cpp165
-rw-r--r--apps/Gateway/Gateway/Config_Files.h90
-rw-r--r--apps/Gateway/Gateway/Consumer_Dispatch_Set.h28
-rw-r--r--apps/Gateway/Gateway/Consumer_Entry.cpp31
-rw-r--r--apps/Gateway/Gateway/Consumer_Entry.h45
-rw-r--r--apps/Gateway/Gateway/Consumer_Map.cpp61
-rw-r--r--apps/Gateway/Gateway/Consumer_Map.h62
-rw-r--r--apps/Gateway/Gateway/Dispatch_Set.h28
-rw-r--r--apps/Gateway/Gateway/Event.h125
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp377
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h125
-rw-r--r--apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp59
-rw-r--r--apps/Gateway/Gateway/Event_Forwarding_Discriminator.h60
-rw-r--r--apps/Gateway/Gateway/File_Parser.cpp142
-rw-r--r--apps/Gateway/Gateway/File_Parser.h74
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp341
-rw-r--r--apps/Gateway/Gateway/Gateway.h25
-rw-r--r--apps/Gateway/Gateway/IO_Handler_Connector.cpp92
-rw-r--r--apps/Gateway/Gateway/IO_Handler_Connector.h40
-rw-r--r--apps/Gateway/Gateway/Makefile454
-rw-r--r--apps/Gateway/Gateway/Peer_Message.h89
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.cpp581
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.h202
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.cpp93
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.h40
-rw-r--r--apps/Gateway/Gateway/README23
-rw-r--r--apps/Gateway/Gateway/Routing_Entry.cpp47
-rw-r--r--apps/Gateway/Gateway/Routing_Entry.h53
-rw-r--r--apps/Gateway/Gateway/Routing_Table.cpp69
-rw-r--r--apps/Gateway/Gateway/Routing_Table.h67
-rw-r--r--apps/Gateway/Gateway/Thr_Channel.cpp204
-rw-r--r--apps/Gateway/Gateway/Thr_Channel.h65
-rw-r--r--apps/Gateway/Gateway/Thr_IO_Handler.cpp204
-rw-r--r--apps/Gateway/Gateway/Thr_IO_Handler.h64
-rw-r--r--apps/Gateway/Gateway/cc_config10
-rw-r--r--apps/Gateway/Gateway/consumer_config8
-rw-r--r--apps/Gateway/Gateway/gatewayd.cpp33
-rw-r--r--apps/Gateway/Gateway/rt_config7
-rw-r--r--apps/Gateway/Gateway/svc.conf3
-rw-r--r--apps/Gateway/Makefile26
-rw-r--r--apps/Gateway/Peer/Gateway_Handler.cpp652
-rw-r--r--apps/Gateway/Peer/Gateway_Handler.h154
-rw-r--r--apps/Gateway/Peer/Makefile116
-rw-r--r--apps/Gateway/Peer/Peer_Message.h44
-rw-r--r--apps/Gateway/Peer/peerd.cpp44
-rw-r--r--apps/Gateway/Peer/svc.conf3
-rw-r--r--apps/Gateway/README92
51 files changed, 0 insertions, 6334 deletions
diff --git a/apps/Gateway/Gateway/Channel.cpp b/apps/Gateway/Gateway/Channel.cpp
deleted file mode 100644
index 99699a6ee87..00000000000
--- a/apps/Gateway/Gateway/Channel.cpp
+++ /dev/null
@@ -1,710 +0,0 @@
-
-// $Id$
-
-#include "Routing_Entry.h"
-#include "Channel_Connector.h"
-
-// Convenient short-hands.
-#define CO CONDITION
-#define MU MUTEX
-
-// = The total number of bytes sent/received on this channel.
-size_t
-Channel::total_bytes (void)
-{
- return this->total_bytes_;
-}
-
-void
-Channel::total_bytes (size_t bytes)
-{
- this->total_bytes_ += bytes;
-}
-
-Channel::Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH> (thr_mgr),
- routing_table_ (rt),
- id_ (-1),
- total_bytes_ (0),
- state_ (Channel::IDLE),
- connector_ (cc),
- timeout_ (1),
- max_timeout_ (Channel::MAX_RETRY_TIMEOUT),
- socket_queue_size_ (socket_queue_size)
-{
-}
-
-// Set the associated channel.
-
-void
-Channel::active (int a)
-{
- this->state (a == 0 ? Channel::IDLE : Channel::ESTABLISHED);
-}
-
-// Get the associated channel.
-
-int
-Channel::active (void)
-{
- return this->state () == Channel::ESTABLISHED;
-}
-
-// Set the direction.
-
-void
-Channel::direction (char d)
-{
- this->direction_ = d;
-}
-
-// Get the direction.
-
-char
-Channel::direction (void)
-{
- return this->direction_;
-}
-
-// Sets the timeout delay.
-
-void
-Channel::timeout (int to)
-{
- if (to > this->max_timeout_)
- to = this->max_timeout_;
-
- this->timeout_ = to;
-}
-
-// Recalculate the current retry timeout delay using exponential
-// backoff. Returns the original timeout (i.e., before the
-// recalculation).
-
-int
-Channel::timeout (void)
-{
- int old_timeout = this->timeout_;
- this->timeout_ *= 2;
-
- if (this->timeout_ > this->max_timeout_)
- this->timeout_ = this->max_timeout_;
-
- return old_timeout;
-}
-
-// Sets the max timeout delay.
-
-void
-Channel::max_timeout (int mto)
-{
- this->max_timeout_ = mto;
-}
-
-// Gets the max timeout delay.
-
-int
-Channel::max_timeout (void)
-{
- return this->max_timeout_;
-}
-
-// Restart connection asynchronously when timeout occurs.
-
-int
-Channel::handle_timeout (const ACE_Time_Value &, const void *)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) attempting to reconnect Channel %d with timeout = %d\n",
- this->id (), this->timeout_));
- return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
-}
-
-// Restart connection (blocking_semantics dicates whether we
-// restart synchronously or asynchronously).
-
-int
-Channel::reinitiate_connection (void)
-{
- // Skip over deactivated descriptors.
- if (this->get_handle () != -1)
- {
- // Make sure to close down peer to reclaim descriptor.
- this->peer ().close ();
-
-#if 0
-// if (this->state () == FAILED)
-// {
- // Reinitiate timeout to improve reconnection time.
-// this->timeout (1);
-#endif
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Channel %d\n",
- this->id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Service_Config::reactor ()->schedule_timer (this, 0,
- this->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- return 0;
-}
-
-// Handle shutdown of the Channel object.
-
-int
-Channel::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down Channel %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- return this->reinitiate_connection ();
-}
-
-// Set the state of the channel.
-
-void
-Channel::state (Channel::State s)
-{
- this->state_ = s;
-}
-
-// Perform the first-time initiation of a connection to the peer.
-
-int
-Channel::initialize_connection (void)
-{
- this->state_ = Channel::ESTABLISHED;
-
- // Restart the timeout to 1.
- this->timeout (1);
-
-#if defined (ASSIGN_ROUTING_ID)
- // Action that sends the route id to the peerd.
-
- CONN_ID id = htons (this->id ());
-
- ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
-
- if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- n == 0 ? "gatewayd has closed down unexpectedly" : "send"), -1);
-#endif /* ASSIGN_ROUTING_ID */
- return 0;
-}
-
-// Set the size of the socket queue.
-
-void
-Channel::socket_queue_size (void)
-{
- if (this->socket_queue_size_ > 0)
- {
- int option = this->direction_ == 'I' ? SO_RCVBUF : SO_SNDBUF;
-
- if (this->peer ().set_option (SOL_SOCKET, option,
- &this->socket_queue_size_, sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
- }
-}
-
-// Upcall from the ACE_Acceptor::handle_input() that
-// delegates control to our application-specific Channel.
-
-int
-Channel::open (void *a)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) Channel's fd = %d\n", this->peer ().get_handle ()));
-
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn on non-blocking I/O.
- if (this->peer ().enable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call down to the base class to activate and register this handler.
- if (this->ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH>::open (a) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
-
- return this->initialize_connection ();
-}
-
-// Return the current state of the channel.
-
-Channel::State
-Channel::state (void)
-{
- return this->state_;
-}
-
-void
-Channel::id (CONN_ID id)
-{
- this->id_ = id;
-}
-
-CONN_ID
-Channel::id (void)
-{
- return this->id_;
-}
-
-// Set the peer's address information.
-int
-Channel::bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- CONN_ID id)
-{
- this->remote_addr_ = remote_addr;
- this->local_addr_ = local_addr;
- this->id_ = id;
- return 0;
-}
-
-ACE_INET_Addr &
-Channel::remote_addr (void)
-{
- return this->remote_addr_;
-}
-
-ACE_INET_Addr &
-Channel::local_addr (void)
-{
- return this->local_addr_;
-}
-
-// Constructor sets the routing table pointer.
-
-Output_Channel::Output_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Channel (rt, cc, thr_mgr, socket_queue_size)
-{
- this->direction_ = 'O';
- this->msg_queue ()->high_water_mark (Output_Channel::QUEUE_SIZE);
-}
-
-// This method should be called only when the peer shuts down
-// unexpectedly. This method simply marks the Channel as
-// having failed so that handle_close () can reconnect.
-
-int
-Output_Channel::handle_input (ACE_HANDLE)
-{
- char buf[1];
-
- this->state (Channel::FAILED);
-
- switch (this->peer ().recv (buf, sizeof buf))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has failed unexpectedly for Output Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case 0:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has shutdown unexpectedly for Output Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- default:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer is sending input on Output Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- }
-}
-
-int
-Output_Channel::svc (void)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Output_Channel!\n"), -1);
-}
-
-// Perform a non-blocking put() of message MB. If we are unable to
-// send the entire message the remainder is re-queued at the *front* of
-// the Message_List.
-
-int
-Output_Channel::nonblk_put (ACE_Message_Block *mb)
-{
- // Try to send the message. If we don't send it all (e.g., due to
- // flow control), then re-queue the remainder at the head of the
- // Message_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
-
- ssize_t n;
-
- if ((n = this->send_peer (mb)) == -1)
- {
- // Things have gone wrong, let's try to close down and set up a new reconnection.
- this->state (Channel::FAILED);
- this->handle_close ();
- return -1;
- }
- else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
- // ACE_Queue in *front* of the list to preserve order.
- if (this->msg_queue ()->enqueue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
-
- // Tell ACE_Reactor to call us back when we can send again.
- else if (ACE_Service_Config::reactor ()->
- schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
- return 0;
- }
- else
- return n;
-}
-
-int
-Output_Channel::send_peer (ACE_Message_Block *mb)
-{
- ssize_t n;
- size_t len = mb->length ();
-
- if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0)
- return errno == EWOULDBLOCK ? 0 : n;
- else if (n < len)
- // Re-adjust pointer to skip over the part we did send.
- mb->rd_ptr (n);
- else /* if (n == length) */
- {
- // The whole message is sent, we can now safely deallocate the buffer.
- // Note that this should decrement a reference count...
- delete mb;
- errno = 0;
- }
- this->total_bytes (n);
- return n;
-}
-
-// Finish sending a message when flow control conditions abate.
-// This method is automatically called by the ACE_Reactor.
-
-int
-Output_Channel::handle_output (ACE_HANDLE)
-{
- ACE_Message_Block *mb = 0;
- int status = 0;
-
- ACE_DEBUG ((LM_DEBUG, "(%t) in handle_output on handle %d\n", this->get_handle ()));
- // The list had better not be empty, otherwise there's a bug!
-
- if (this->msg_queue ()->dequeue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
- {
- switch (this->nonblk_put (mb))
- {
- case 0: // Partial send.
- ACE_ASSERT (errno == EWOULDBLOCK);
- // Didn't write everything this time, come back later...
- break;
-
- case -1:
- // Caller is responsible for freeing a ACE_Message_Block if failures occur.
- delete mb;
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
-
- /* FALLTHROUGH */
- default: // Sent the whole thing.
-
- // If we succeed in writing the entire message (or we did not fail
- // due to EWOULDBLOCK) then check if there are more messages on the Message_List.
- // If there aren't, tell the ACE_Reactor not to notify us anymore (at least
- // until there are new messages queued up).
-
- if (this->msg_queue ()->is_empty ())
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) queueing deactivated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
-
- if (ACE_Service_Config::reactor ()->
- cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
- }
- }
- }
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
- return 0;
-}
-
-// Send a message to a peer (may queue if necessary).
-
-int
-Output_Channel::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- if (this->msg_queue ()->is_empty ())
- // Try to send the message *without* blocking!
- return this->nonblk_put (mb);
- else
- // If we have queued up messages due to flow control
- // then just enqueue and return.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Constructor sets the routing table pointer and the connector pointer.
-
-Input_Channel::Input_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : msg_frag_ (0),
- Channel (rt, cc, thr_mgr, socket_queue_size)
-{
- this->direction_ = 'I';
- this->msg_queue ()->high_water_mark (0);
-}
-
-int
-Input_Channel::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) put should not be called on Input_Channel!\n"), -1);
-}
-
-int
-Input_Channel::svc (void)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Input_Channel!\n"), -1);
-}
-
-// Receive a Peer message from peerd. Handles fragmentation.
-//
-// The routing message returned from recv_peer consists of two parts:
-// 1. The Address part, contains the virtual routing id.
-// 2. The Data part, which contains the actual data to be routed.
-//
-// The reason for having two parts is to shield the higher layers
-// of software from knowledge of the message structure.
-
-int
-Input_Channel::recv_peer (ACE_Message_Block *&route_addr)
-{
- Peer_Message *peer_msg;
- size_t len;
- ssize_t n = 0;
- ssize_t m = 0;
- size_t offset = 0;
-
- if (this->msg_frag_ == 0)
- // No existing fragment...
- ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Peer_Message)),
- -1);
-
- peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr ();
-
- const ssize_t HEADER_SIZE = sizeof (Peer_Header);
- ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length ();
-
- if (header_bytes_left_to_read > 0)
- {
- n = this->peer ().recv (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
-
- if (n == -1 /* error */
- || n == 0 /* EOF */)
- {
- ACE_ERROR ((LM_ERROR, "%p\n", "Recv error during header read "));
- ACE_DEBUG ((LM_DEBUG, "attempted to read %d\n", header_bytes_left_to_read));
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return n;
- }
-
- // Bump the write pointer by the amount read.
- this->msg_frag_->wr_ptr (n);
-
- // At this point we may or may not have the ENTIRE header.
- if (this->msg_frag_->length () < HEADER_SIZE)
- {
- ACE_DEBUG ((LM_DEBUG, "Partial header received: only %d bytes\n",
- this->msg_frag_->length ()));
- // Notify the caller that we didn't get an entire message.
- errno = EWOULDBLOCK;
- return -1;
- }
- }
-
- // At this point there is a complete, valid header in msg_frag_
- len = sizeof peer_msg->buf_ + HEADER_SIZE - this->msg_frag_->length ();
-
- // Try to receive the remainder of the message
-
- switch (m = this->peer ().recv (peer_msg->buf_ + offset, len))
- {
- case -1:
- if (errno == EWOULDBLOCK)
- {
- // This shouldn't happen since the ACE_Reactor
- // just triggered us to handle pending I/O!
- ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n"));
- errno = EWOULDBLOCK;
- return -1;
- }
- else
- /* FALLTHROUGH */;
-
- case 0: // Premature EOF.
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return 0;
-
- default:
- if (m != len)
- // Re-adjust pointer to skip over the part we've read.
- {
- this->msg_frag_->wr_ptr (m);
- errno = EWOULDBLOCK;
- return -1; // Inform caller that we didn't get the whole message.
- }
- else
- {
- // Set the write pointer at 1 past the end of the message.
- this->msg_frag_->wr_ptr (m);
-
- // Set the read pointer to the beginning of the message.
- this->msg_frag_->rd_ptr (this->msg_frag_->base ());
-
- // Allocate a routing message header and chain the data portion
- // onto its continuation field.
- ACE_NEW_RETURN (route_addr,
- ACE_Message_Block (sizeof (Peer_Addr),
- ACE_Message_Block::MB_PROTO,
- this->msg_frag_),
- -1);
-
- Peer_Addr peer_addr (this->id (), peer_msg->header_.routing_id_, 0);
- // Copy the routing address from the Peer_Message into routing_addr.
- route_addr->copy ((char *) &peer_addr, sizeof (Peer_Addr));
-
- // Reset the pointer to indicate we've got an entire message.
- this->msg_frag_ = 0;
- }
- this->total_bytes (m + n);
-#if defined (VERBOSE)
- ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s",
- peer_addr.conn_id_, peer_msg->header_.routing_id_, peer_msg->header_.len_,
- peer_msg->header_.len_, peer_msg->buf_));
-#else
- ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n",
- peer_msg->header_.routing_id_, peer_msg->header_.len_, this->total_bytes ()));
-#endif
- return m + n;
- }
-}
-
-// Receive various types of input (e.g., Peer message from the
-// gatewayd, as well as stdio).
-
-int
-Input_Channel::handle_input (ACE_HANDLE)
-{
- ACE_Message_Block *route_addr = 0;
-
- switch (this->recv_peer (route_addr))
- {
- case 0:
- // Note that a peer should never initiate a shutdown.
- this->state (Channel::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has closed down unexpectedly for Input Channel %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case -1:
- if (errno == EWOULDBLOCK)
- // A short-read, we'll come back and finish it up later on!
- return 0;
- else // A weird problem occurred, shut down and start again.
- {
- this->state (Channel::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Channel %d\n",
- "Peer has failed unexpectedly",
- this->id ()), -1);
- }
- /* NOTREACHED */
- default:
- return this->route_message (route_addr);
- }
-}
-
-// Route a message to its appropriate destination.
-
-int
-Input_Channel::route_message (ACE_Message_Block *route_addr)
-{
- // We got a valid message, so determine its virtual routing id,
- // which is stored in the first of the two message blocks chained together.
-
- Peer_Addr *routing_key = (Peer_Addr *) route_addr->rd_ptr ();
-
- // Skip over the address portion.
- const ACE_Message_Block *const data = route_addr->cont ();
-
- // RE points to the routing entry located for this routing id.
- Routing_Entry *re = 0;
-
- if (this->routing_table_->find (*routing_key, re) != -1)
- {
- // Check to see if there are any destinations.
- if (re->destinations ()->size () == 0)
- ACE_DEBUG ((LM_WARNING,
- "there are no active destinations for this message currently\n"));
-
- else // There are destinations, so forward the message.
- {
- Routing_Entry::ENTRY_SET *esp = re->destinations ();
- Routing_Entry::ENTRY_ITERATOR si (*esp);
-
- for (Channel **channel = 0; si.next (channel) != 0; si.advance ())
- {
- // Only process active channels.
- if ((*channel)->active ())
- {
- // Clone the message portion (should be doing reference counting here...)
- ACE_Message_Block *newmsg = data->clone ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ()));
-
- if ((*channel)->put (newmsg) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping messages"));
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n",
- "put", (*channel)->id ()));
-
- // Caller is responsible for freeing a ACE_Message_Block if failures occur.
- delete newmsg;
- }
- }
- }
- // Will become superfluous once we have reference counting...
- delete route_addr;
- return 0;
- }
- }
- delete route_addr;
- // Failure return.
- ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
- routing_key->conn_id_, routing_key->logical_id_, routing_key->payload_));
- return 0;
-}
-
-#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Map_Manager<Peer_Addr, Routing_Entry *, MUTEX>;
-template class ACE_Map_Iterator<Peer_Addr, Routing_Entry *, MUTEX>;
-template class ACE_Map_Entry<Peer_Addr, Routing_Entry *>;
-#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Channel_Connector.cpp b/apps/Gateway/Gateway/Channel_Connector.cpp
deleted file mode 100644
index a5394e8b013..00000000000
--- a/apps/Gateway/Gateway/Channel_Connector.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-#include "Channel_Connector.h"
-// $Id$
-
-
-Channel_Connector::Channel_Connector (void)
-{
-}
-
-// Override the connection-failure method to add timer support.
-// Note that these timers perform "expoential backoff" to
-// avoid rapidly trying to reestablish connections when a link
-// goes down.
-
-int
-Channel_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
-{
- ACE_Connector<Channel, CHANNEL_PEER_CONNECTOR>::AST *stp = 0;
-
- // Locate the ACE_Svc_Handler corresponding to the socket descriptor.
- if (this->handler_map_.find (sd, stp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n",
- sd, "find"), -1);
-
- Channel *channel = stp->svc_handler ();
-
- // Schedule a reconnection request at some point in the future
- // (note that channel uses an exponential backoff scheme).
- if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0,
- channel->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- return 0;
-}
-
-// Initiate (or reinitiate) a connection to the Channel.
-
-int
-Channel_Connector::initiate_connection (Channel *channel,
- ACE_Synch_Options &synch_options)
-{
- char buf[MAXHOSTNAMELEN];
-
- // Mark ourselves as idle so that the various iterators
- // will ignore us until we are reconnected.
- channel->state (Channel::IDLE);
-
- if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1
- || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "can't obtain peer's address"), -1);
-
- // Try to connect to the Peer.
-
- if (this->connect (channel, channel->remote_addr (),
- synch_options, channel->local_addr ()) == -1)
- {
- if (errno != EWOULDBLOCK)
- {
- channel->state (Channel::FAILED);
- ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
- "connect", buf));
-
- // Reschedule ourselves to try and connect again.
- if (synch_options[ACE_Synch_Options::USE_REACTOR])
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (channel, 0, channel->timeout ()) == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- else
- // Failures on synchronous connects are reported as errors
- // so that the caller can decide how to proceed.
- return -1;
- }
- else
- {
- channel->state (Channel::CONNECTING);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in the process of connecting %s to %s\n",
- synch_options[ACE_Synch_Options::USE_REACTOR]
- ? "asynchronously" : "synchronously", buf));
- }
- }
- else
- {
- channel->state (Channel::ESTABLISHED);
- ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
- buf, channel->get_handle ()));
- }
- return 0;
-}
diff --git a/apps/Gateway/Gateway/Channel_Connector.h b/apps/Gateway/Gateway/Channel_Connector.h
deleted file mode 100644
index 3e27f37355a..00000000000
--- a/apps/Gateway/Gateway/Channel_Connector.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Channel_Connector.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CHANNEL_CONNECTOR)
-#define _CHANNEL_CONNECTOR
-
-#include "ace/Connector.h"
-#include "Thr_Channel.h"
-
-class Channel_Connector : public ACE_Connector<Channel, CHANNEL_PEER_CONNECTOR>
- // = TITLE
- // A concrete factory class that setups connections to peerds
- // and produces a new Channel object to do the dirty work...
-{
-public:
- Channel_Connector (void);
-
- // Initiate (or reinitiate) a connection on the Channel.
- int initiate_connection (Channel *,
- ACE_Synch_Options & = ACE_Synch_Options::synch);
-
-protected:
- // Override the connection-failure method to add timer support.
- virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask);
-};
-
-#endif /* _CHANNEL_CONNECTOR */
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h
deleted file mode 100644
index 28e59a4b2e6..00000000000
--- a/apps/Gateway/Gateway/Concurrency_Strategies.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Concurrency_strategies.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CONCURRENCY_STRATEGIES)
-#define _CONCURRENCY_STRATEGIES
-
-#include "ace/Synch.h"
-
-// The following typedefs are used in order to parameterize the
-// synchronization policies without changing the source code!
-
-// If we don't have threads then use the single-threaded synchronization.
-#if !defined (ACE_HAS_THREADS)
-#define SYNCH_STRATEGY ACE_NULL_SYNCH
-typedef ACE_Null_Mutex MAP_MUTEX;
-#else /* ACE_HAS_THREADS */
-
-// Note that we only need to make the ACE_Task thread-safe if we are
-// using the multi-threaded Thr_Consumer_Proxy...
-#if defined (USE_OUTPUT_MT)
-#define SYNCH_STRATEGY ACE_MT_SYNCH
-#else
-#define SYNCH_STRATEGY ACE_NULL_SYNCH
-#endif /* USE_OUTPUT_MT || USE_INPUT_MT */
-
-// Note that we only need to make the ACE_Map_Manager thread-safe if
-// we are using the multi-threaded Thr_Supplier_Proxy. In this
-// case, we use an RW_Mutex since we'll lookup Consumers far more
-// often than we'll update them.
-#if defined (USE_INPUT_MT)
-typedef ACE_RW_Mutex MAP_MUTEX;
-#else
-typedef ACE_Null_Mutex MAP_MUTEX;
-#endif /* USE_INPUT_MT */
-#endif /* ACE_HAS_THREADS */
-
-// = Forward decls
-class Thr_Consumer_Proxy;
-class Thr_Supplier_Proxy;
-class Consumer_Proxy;
-class Supplier_Proxy;
-
-#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT))
-#if defined (USE_OUTPUT_MT)
-typedef Thr_Consumer_Proxy CONSUMER_PROXY;
-#else
-typedef Consumer_Proxy CONSUMER_PROXY;
-#endif /* USE_OUTPUT_MT */
-
-#if defined (USE_INPUT_MT)
-typedef Thr_Supplier_Proxy SUPPLIER_PROXY;
-#else
-typedef Supplier_Proxy SUPPLIER_PROXY;
-#endif /* USE_INPUT_MT */
-#else
-// Instantiate a non-multi-threaded Gateway.
-typedef Supplier_Proxy SUPPLIER_PROXY;
-typedef Consumer_Proxy CONSUMER_PROXY;
-#endif /* ACE_HAS_THREADS */
-
-#endif /* _CONCURRENCY_STRATEGIES */
diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp
deleted file mode 100644
index 5b95dc4fbf0..00000000000
--- a/apps/Gateway/Gateway/Config_Files.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-#include "ace/OS.h"
-// $Id$
-
-#include "Config_Files.h"
-
-// This fixes a nasty bug with cfront-based compilers (like
-// Centerline).
-typedef FP::Return_Type FP_RETURN_TYPE;
-
-FP_RETURN_TYPE
-Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
- int &line_number)
-{
- FP_RETURN_TYPE read_result;
-
- // Increment the line count.
- line_number++;
-
- // Ignore comments, check for EOF and EOLINE if this succeeds, we
- // have our connection id.
- while ((read_result = this->getint (entry.conn_id_)) != FP::SUCCESS)
- {
- if (read_result == FP::EOFILE)
- return FP::EOFILE;
- else if (read_result == FP::EOLINE
- || read_result == FP::COMMENT)
- line_number++;
- }
-
- // Get the logical id.
- if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS)
- return read_result;
-
- // Get the payload type.
- if ((read_result = this->getint (entry.type_)) != FP::SUCCESS)
- return read_result;
-
- // get all the consumers.
- entry.total_consumers_ = 0;
-
- while ((read_result = this->getint (entry.consumers_[entry.total_consumers_]))
- == FP::SUCCESS)
- ++entry.total_consumers_; // do nothing (should check against max...)
-
- if (read_result == FP::EOLINE || read_result == FP::EOFILE)
- return FP::SUCCESS;
- else
- return read_result;
-}
-
-FP_RETURN_TYPE
-Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
- int &line_number)
-{
- char buf[BUFSIZ];
- FP_RETURN_TYPE read_result;
- // increment the line count
- line_number++;
-
- // Ignore comments, check for EOF and EOLINE
- // if this succeeds, we have our connection id
- while ((read_result = this->getint (entry.conn_id_)) != FP::SUCCESS)
- {
- if (read_result == FP::EOFILE)
- return FP::EOFILE;
- else if (read_result == FP::EOLINE
- || read_result == FP::COMMENT)
- line_number++;
- }
-
- // get the hostname
- if ((read_result = this->getword (entry.host_)) != FP::SUCCESS)
- return read_result;
-
- ACE_INT32 port;
-
- // Get the port number.
- if ((read_result = this->getint (port)) != FP::SUCCESS)
- return read_result;
- else
- entry.remote_port_ = (u_short) port;
-
- // Get the proxy role.
- if ((read_result = this->getword (buf)) != FP::SUCCESS)
- return read_result;
- else
- entry.proxy_role_ = buf[0];
-
- // Get the max retry delay.
- if ((read_result = this->getint (entry.max_retry_delay_)) != FP::SUCCESS)
- return read_result;
-
- // Get the local port number.
- if ((read_result = this->getint (port)) != FP::SUCCESS)
- return read_result;
- else
- entry.local_port_ = (u_short) port;
-
- return FP::SUCCESS;
-}
-
-#if defined (DEBUGGING)
-int main (int argc, char *argv[])
-{
- if (argc != 4) {
-// ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1);
- cerr << argv[0] << " CCfilename filename Mapfilename.\n";
- exit (1);
- }
- FP_RETURN_TYPE result;
- Connection_Config_File_Entry entry;
- Connection_Config_File_Parser CCfile;
-
- CCfile.open (argv[1]);
-
- int line_number = 0;
-
- printf ("ConnID\tHost\t\tRPort\tDir\tRetry\tLPort\n");
-
- // Read config file line at a time.
- while ((result = CCfile.read_entry (entry, line_number)) != EOF)
- {
- if (result != FP::SUCCESS)
- // ACE_DEBUG ((LM_DEBUG, "Error line %d.\n", line_number));
- cerr << "Error at line " << line_number << endl;
- else
- printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n",
- entry.conn_id_, entry.host_, entry.remote_port_, entry.proxy_role_,
- entry.max_retry_delay_, entry.transform_, entry.local_port_);
- }
- CCfile.close();
-
- Consumer_Config_File_Entry entry;
- Consumer_Config_File_Parser file;
-
- file.open (argv[2]);
-
- line_number = 0;
-
- printf ("\nConnID\tLogic\tPayload\tDestinations\n");
-
- // Read config file line at a time.
- while ((result = file.read_entry (entry, line_number)) != EOF)
- {
- if (result != FP::SUCCESS)
- cerr << "Error at line " << line_number << endl;
- else
- {
- printf ("%d\t%d\t%d\t%d\t",
- entry.conn_id_, entry.supplier_id_, entry.type_);
- while (--entry.total_consumers_ >= 0)
- printf ("%d,", entry.consumers_[entry.total_consumers_]);
- printf ("\n");
- }
- }
- file.close();
-
- return 0;
-}
-#endif /* DEBUGGING */
-
-#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class File_Parser<Connection_Config_File_Entry>;
-template class File_Parser<Consumer_Config_File_Entry>;
-#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
deleted file mode 100644
index eae0248eb8c..00000000000
--- a/apps/Gateway/Gateway/Config_Files.h
+++ /dev/null
@@ -1,90 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Config_Files.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CONFIG_FILES)
-#define _CONFIG_FILES
-
-#include "ace/OS.h"
-#include "File_Parser.h"
-
-class Connection_Config_File_Entry
- // = TITLE
- // Stores connection configuration information.
-{
-public:
- ACE_INT32 conn_id_;
- // Connection id for this Proxy_Handler.
-
- char host_[BUFSIZ];
- // Host to connect with.
-
- u_short remote_port_;
- // Port to connect with.
-
- char proxy_role_;
- // 'S' (supplier) or 'C' (consumer).
-
- ACE_INT32 max_retry_delay_;
- // Maximum amount of time to wait for reconnecting.
-
- u_short local_port_;
- // Our local port number.
-};
-
-class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry>
- // = TITLE
- // Parser for the Proxy_Handler Connection file.
-{
-public:
- virtual FP::Return_Type
- read_entry (Connection_Config_File_Entry &entry, int &line_number);
-};
-
-class Consumer_Config_File_Entry
- // = TITLE
- // Stores the information in a Consumer Map entry.
-{
-public:
- enum {
- MAX_CONSUMERS = 1000 // Total number of multicast consumers.
- };
-
- ACE_INT32 conn_id_;
- // Connection id for this proxy.
-
- ACE_INT32 supplier_id_;
- // Logical supplier id for this proxy.
-
- ACE_INT32 type_;
- // Message type.
-
- ACE_INT32 consumers_[MAX_CONSUMERS];
- // Connection ids for consumers that we're routing to.
-
- int total_consumers_;
- // Total number of these consumers.
-};
-
-class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry>
- // = TITLE
- // Parser for the Consumer Map file.
-{
-public:
- virtual FP::Return_Type
- read_entry (Consumer_Config_File_Entry &entry, int &line_number);
-};
-
-#endif /* _CONFIG_FILES */
diff --git a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
deleted file mode 100644
index 71e2046b56e..00000000000
--- a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Consumer_Dispatch_Set.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_DISPATCH_SET)
-#define _DISPATCH_SET
-
-#include "ace/Set.h"
-
-// Forward reference.
-class Proxy_Handler;
-
-typedef ACE_Unbounded_Set<Proxy_Handler *> Consumer_Dispatch_Set;
-typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Consumer_Dispatch_Set_Iterator;
-
-#endif /* _DISPATCH_SET */
diff --git a/apps/Gateway/Gateway/Consumer_Entry.cpp b/apps/Gateway/Gateway/Consumer_Entry.cpp
deleted file mode 100644
index c3dcd96ebbf..00000000000
--- a/apps/Gateway/Gateway/Consumer_Entry.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-// Defines an entry in the Consumer Map.
-// $Id$
-
-#include "Consumer_Entry.h"
-
-Consumer_Entry::Consumer_Entry (void)
-{
- ACE_NEW (this->destinations_, Consumer_Entry::ENTRY_SET);
-}
-
-Consumer_Entry::~Consumer_Entry (void)
-{
- delete this->destinations_;
-}
-
-// Get the associated set of destinations.
-
-Consumer_Entry::ENTRY_SET *
-Consumer_Entry::destinations (void)
-{
- return this->destinations_;
-}
-
-// Set the associated set of destinations.
-
-void
-Consumer_Entry::destinations (Consumer_Entry::ENTRY_SET *s)
-{
- this->destinations_ = s;
-}
-
diff --git a/apps/Gateway/Gateway/Consumer_Entry.h b/apps/Gateway/Gateway/Consumer_Entry.h
deleted file mode 100644
index fe502991514..00000000000
--- a/apps/Gateway/Gateway/Consumer_Entry.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Consumer_Entry.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_ROUTING_ENTRY)
-#define _ROUTING_ENTRY
-
-#include "ace/Set.h"
-
-// Forward reference.
-class IO_Handler;
-
-class Consumer_Entry
-{
- // = TITLE
- // Defines an entry in the Consumer_Map.
-public:
- Consumer_Entry (void);
- ~Consumer_Entry (void);
-
- typedef ACE_Unbounded_Set<IO_Handler *> ENTRY_SET;
- typedef ACE_Unbounded_Set_Iterator<IO_Handler *> ENTRY_ITERATOR;
-
- // = Set/get the associated set of destinations.
- ENTRY_SET *destinations (void);
- void destinations (ENTRY_SET *);
-
-protected:
- ENTRY_SET *destinations_;
- // The set of destinations;
-};
-
-#endif /* _ROUTING_ENTRY */
diff --git a/apps/Gateway/Gateway/Consumer_Map.cpp b/apps/Gateway/Gateway/Consumer_Map.cpp
deleted file mode 100644
index 6d16601f949..00000000000
--- a/apps/Gateway/Gateway/Consumer_Map.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#if !defined (_CONSUMER_MAP_C)
-#define _CONSUMER_MAP_C
-
-#include "Consumer_Map.h"
-
-// Bind the Event_Addr to the INT_ID.
-
-int
-Consumer_Map::bind (Event_Addr event_addr,
- Consumer_Entry *Consumer_Entry)
-{
- return this->map_.bind (event_addr, Consumer_Entry);
-}
-
-// Find the Consumer_Entry corresponding to the Event_Addr.
-
-int
-Consumer_Map::find (Event_Addr event_addr,
- Consumer_Entry *&Consumer_Entry)
-{
- return this->map_.find (event_addr, Consumer_Entry);
-}
-
-// Unbind (remove) the Event_Addr from the map.
-
-int
-Consumer_Map::unbind (Event_Addr event_addr)
-{
- return this->map_.unbind (event_addr);
-}
-
-Consumer_Map_Iterator::Consumer_Map_Iterator (Consumer_Map &rt)
- : map_iter_ (rt.map_)
-{
-}
-
-int
-Consumer_Map_Iterator::next (Consumer_Entry *&ss)
-{
- // Loop in order to skip over inactive entries if necessary.
-
- for (ACE_Map_Entry<Event_Addr, Consumer_Entry *> *temp = 0;
- this->map_iter_.next (temp) != 0;
- this->advance ())
- {
- // Otherwise, return the next item.
- ss = temp->int_id_;
- return 1;
- }
- return 0;
-}
-
-int
-Consumer_Map_Iterator::advance (void)
-{
- return this->map_iter_.advance ();
-}
-#endif /* _CONSUMER_MAP_C */
diff --git a/apps/Gateway/Gateway/Consumer_Map.h b/apps/Gateway/Gateway/Consumer_Map.h
deleted file mode 100644
index fd392afaf6e..00000000000
--- a/apps/Gateway/Gateway/Consumer_Map.h
+++ /dev/null
@@ -1,62 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Consumer_Map.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CONSUMER_MAP_H)
-#define _CONSUMER_MAP_H
-
-#include "ace/Map_Manager.h"
-#include "Concurrency_Strategies.h"
-#include "Event.h"
-#include "Consumer_Entry.h"
-
-class Consumer_Map
-{
- // = TITLE
- // Define a generic consumer map based on the ACE Map_Manager.
- //
- // = DESCRIPTION
- // This class makes it easier to use the Map_Manager.
-public:
- int bind (Event_Addr event, Consumer_Entry *Consumer_Entry);
- // Associate Event with the Consumer_Entry.
-
- int find (Event_Addr event, Consumer_Entry *&Consumer_Entry);
- // Break any association of EXID.
-
- int unbind (Event_Addr event);
- // Locate EXID and pass out parameter via INID. If found,
- // return 0, else -1.
-
-public:
- ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_;
- // Map that associates Event Addrs (external ids) with Consumer_Entry *'s
- // <internal IDs>.
-};
-
-class Consumer_Map_Iterator
-{
- // = TITLE
- // Define an iterator for the Consumer Map.
-public:
- Consumer_Map_Iterator (Consumer_Map &mm);
- int next (Consumer_Entry *&);
- int advance (void);
-
-private:
- ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_iter_;
- // Map we are iterating over.
-};
-#endif /* _CONSUMER_MAP_H */
diff --git a/apps/Gateway/Gateway/Dispatch_Set.h b/apps/Gateway/Gateway/Dispatch_Set.h
deleted file mode 100644
index a867f1ca5ff..00000000000
--- a/apps/Gateway/Gateway/Dispatch_Set.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Dispatch_Set.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_DISPATCH_SET)
-#define _DISPATCH_SET
-
-#include "ace/Set.h"
-
-// Forward reference.
-class Proxy_Handler;
-
-typedef ACE_Unbounded_Set<Proxy_Handler *> Dispatch_Set;
-typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Dispatch_Set_Iterator;
-
-#endif /* _DISPATCH_SET */
diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h
deleted file mode 100644
index 5e288edf910..00000000000
--- a/apps/Gateway/Gateway/Event.h
+++ /dev/null
@@ -1,125 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Event.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (EVENT)
-#define EVENT
-
-#include "ace/OS.h"
-
-// This is the unique connection identifier that denotes a particular
-// Proxy_Handler in the Gateway.
-typedef ACE_INT32 ACE_INT32;
-
-class Event_Key
- // = TITLE
- // Address used to identify the source/destination of an event.
- //
- // = DESCRIPTION
- // This is really a "virtual forwarding address" thatis used to
- // decouple the filtering and forwarding logic of the Event
- // Channel from the format of the data.
-{
-public:
- Event_Key (ACE_INT32 cid = -1,
- u_char sid = 0,
- u_char type = 0)
- : conn_id_ (cid),
- supplier_id_ (sid),
- type_ (type) {}
-
- int operator== (const Event_Key &event_addr) const
- {
- return this->conn_id_ == event_addr.conn_id_
- && this->supplier_id_ == event_addr.supplier_id_
- && this->type_ == event_addr.type_;
- }
-
- ACE_INT32 conn_id_;
- // Unique connection identifier that denotes a particular
- // Proxy_Handler.
-
- ACE_INT32 supplier_id_;
- // Logical ID.
-
- ACE_INT32 type_;
- // Event type.
-};
-
-class Event_Header
- // = TITLE
- // Fixed sized header.
- //
- // = DESCRIPTION
- // This is designed to have a sizeof (16) to avoid alignment
- // problems on most platforms.
-{
-public:
- typedef ACE_INT32 SUPPLIER_ID;
- // Type used to forward events from gatewayd.
-
- enum
- {
- INVALID_ID = -1 // No peer can validly use this number.
- };
-
- void decode (void)
- {
- this->len_ = ntohl (this->len_);
- this->supplier_id_ = ntohl (this->supplier_id_);
- this->type_ = ntohl (this->type_);
- this->priority_ = ntohl (this->priority_);
- }
- // Decode from network byte order to host byte order.
-
- void encode (void)
- {
- this->len_ = htonl (this->len_);
- this->supplier_id_ = htonl (this->supplier_id_);
- this->type_ = htonl (this->type_);
- this->priority_ = htonl (this->priority_);
- }
- // Encode from host byte order to network byte order.
-
- size_t len_;
- // Length of the data_ payload, in bytes.
-
- SUPPLIER_ID supplier_id_;
- // Source ID.
-
- ACE_INT32 type_;
- // Event type.
-
- ACE_INT32 priority_;
- // Event priority.
-};
-
-class Event
- // = TITLE
- // Variable-sized event (data_ may be variable-sized between
- // 0 and MAX_PAYLOAD_SIZE).
-{
-public:
- enum { MAX_PAYLOAD_SIZE = 1024 };
- // The maximum size of an Event.
-
- Event_Header header_;
- // Event header.
-
- char data_[MAX_PAYLOAD_SIZE];
- // Event data.
-};
-
-#endif /* EVENT */
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
deleted file mode 100644
index 02f2cd465f8..00000000000
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ /dev/null
@@ -1,377 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#define ACE_BUILD_SVC_DLL
-#include "Proxy_Handler_Connector.h"
-#include "Event_Channel.h"
-
-ACE_Event_Channel_Options::ACE_Event_Channel_Options (void)
- : performance_window_ (0),
- blocking_semantics_ (ACE_NONBLOCK),
- socket_queue_size_ (0)
-{
-}
-
-ACE_Event_Channel::~ACE_Event_Channel (void)
-{
-}
-
-ACE_Event_Channel::ACE_Event_Channel (void)
-{
-}
-
-ACE_Event_Channel_Options &
-ACE_Event_Channel::options (void)
-{
- return this->options_;
-}
-
-ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
- const void *)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
-
- // If we've got a ACE_Thread Manager then use it to suspend all the
- // threads. This will enable us to get an accurate count.
-
-#if defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)
- if (ACE_Service_Config::thr_mgr ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads..."));
-#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
-
- size_t total_bytes_in = 0;
- size_t total_bytes_out = 0;
-
- // Iterate through the connection map summing up the number of bytes
- // sent/received.
-
- for (CONNECTION_MAP_ENTRY *me = 0;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Proxy_Handler *proxy_handler = me->int_id_;
-
- if (proxy_handler->proxy_role () == 'C')
- total_bytes_out += proxy_handler->total_bytes ();
- else // proxy_handler->proxy_role () == 'S'
- total_bytes_in += proxy_handler->total_bytes ();
- }
-
-#if defined (ACE_NLOGGING)
- ACE_OS::fprintf (stderr, "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- performance_window_,
- total_bytes_in,
- total_bytes_out);
-
- ACE_OS::fprintf (stderr, "%f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)));
-
- ACE_OS::fprintf (stderr, "%f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)));
-#else
- ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- this->options ().performance_window_,
- total_bytes_in,
- total_bytes_out));
- ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
- ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
-#endif /* ACE_NLOGGING */
-
-#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
- ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads..."));
-
- // Resume all the threads again.
-
- if (ACE_Service_Config::thr_mgr ()->resume_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1);
-#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
-
- return 0;
-}
-
-ACE_Event_Channel::put (ACE_Message_Block *forward_addr,
- ACE_Time_Value *)
-{
- // We got a valid event, so determine its virtual forwarding
- // address, which is stored in the first of the two event blocks,
- // which are chained together by this->recv().
-
- Event_Key *forwarding_addr = (Event_Key *) forward_addr->rd_ptr ();
-
- // Skip over the address portion and get the data.
- ACE_Message_Block *data = forward_addr->cont ();
-
- // <dispatch_set> points to the set of Consumers associated with
- // this forwarding address.
- Consumer_Dispatch_Set *dispatch_set = 0;
-
- if (this->efd_.find (*forwarding_addr, dispatch_set) == -1)
- // Failure.
- ACE_ERROR ((LM_DEBUG,
- "(%t) find failed on conn id = %d, logical id = %d, type = %d\n",
- forwarding_addr->conn_id_,
- forwarding_addr->supplier_id_,
- forwarding_addr->type_));
- else
- {
- // Check to see if there are any consumers.
- if (dispatch_set->size () == 0)
- ACE_DEBUG ((LM_WARNING,
- "there are no active consumers for this event currently\n"));
-
- else // There are consumers, so forward the event.
- {
- Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);
-
- // At this point, we should assign a thread-safe locking
- // strategy to the Message_Block is we're running in a
- // multi-threaded configuration.
- // data->locking_strategy (MB_Locking_Strategy::instance ());
-
- for (Proxy_Handler **proxy_handler = 0;
- dsi.next (proxy_handler) != 0;
- dsi.advance ())
- {
- // Only process active proxy_handlers.
- if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED)
- {
- // Duplicate the event portion via reference
- // counting.
- ACE_Message_Block *dup_msg = data->duplicate ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n",
- (*proxy_handler)->id ()));
-
- if ((*proxy_handler)->put (dup_msg) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping events"));
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n",
- "put", (*proxy_handler)->id ()));
-
- // We are responsible for releasing an
- // ACE_Message_Block if failures occur.
- dup_msg->release ();
- }
- }
- }
- }
- }
-
- // Release the memory in the message block.
- forward_addr->release ();
- return 0;
-}
-
-ACE_Event_Channel::svc (void)
-{
- return 0;
-}
-
-int
-ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler,
- ACE_Synch_Options &synch_options)
-{
- return this->connector_.initiate_connection (proxy_handler,
- synch_options);
-}
-
-int
-ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
-{
- int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
- int socket_queue_size = this->options ().socket_queue_size_;
-
- if (proxy_handler->peer ().set_option (SOL_SOCKET,
- option,
- &socket_queue_size,
- sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
-
- proxy_handler->thr_mgr (ACE_Service_Config::thr_mgr ());
-
- // Our state is now "established."
- proxy_handler->state (Proxy_Handler::ESTABLISHED);
-
- // Restart the timeout to 1.
- proxy_handler->timeout (1);
-
- ACE_INT32 id = htonl (proxy_handler->id ());
-
- // Send the connection id to the peerd.
-
- ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id);
-
- if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- n == 0 ? "peer has closed down unexpectedly" : "send"),
- -1);
-}
-
-// Restart connection (blocking_semantics dicates whether we restart
-// synchronously or asynchronously).
-
-int
-ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler)
-{
- // Skip over deactivated descriptors.
- if (proxy_handler->get_handle () != ACE_INVALID_HANDLE)
- {
- // Make sure to close down peer to reclaim descriptor.
- proxy_handler->peer ().close ();
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Proxy_Handler %d\n",
- proxy_handler->id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Service_Config::reactor ()->schedule_timer
- (proxy_handler, 0, proxy_handler->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- return 0;
-}
-
-// Initiate connections with the Consumer and Supplier Peers.
-
-ACE_Event_Channel::initiate_connections (void)
-{
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
-
- ACE_Synch_Options synch_options;
-
- if (this->options ().blocking_semantics_ == ACE_NONBLOCK)
- synch_options = ACE_Synch_Options::asynch;
- else
- synch_options = ACE_Synch_Options::synch;
-
- // Iterate through the Consumer Map connecting all the
- // Proxy_Handlers.
-
- for (CONNECTION_MAP_ENTRY *me = 0;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Proxy_Handler *proxy_handler = me->int_id_;
-
- if (this->initiate_proxy_connection
- (proxy_handler, synch_options) == -1)
- continue; // Failures are handled elsewhere...
- }
-
- return 0;
-}
-
-// This method gracefully shuts down all the Handlers in the
-// Proxy_Handler Connection Map.
-
-ACE_Event_Channel::close (u_long)
-{
-#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
- if (ACE_Service_Config::thr_mgr ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
-#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
-
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
-
- // Iterate over all the handlers and shut them down.
-
- for (CONNECTION_MAP_ENTRY *me;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Proxy_Handler *proxy_handler = me->int_id_;
-
- ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n",
- proxy_handler->id ()));
-
- if (proxy_handler->state () != Proxy_Handler::IDLE)
- // Mark Proxy_Handler as DISCONNECTING so we don't try to
- // reconnect...
- proxy_handler->state (Proxy_Handler::DISCONNECTING);
-
- // Deallocate Proxy_Handler resources.
- proxy_handler->destroy (); // Will trigger a delete.
- }
-
- return 0;
-}
-
-int
-ACE_Event_Channel::find_proxy (ACE_INT32 conn_id,
- Proxy_Handler *&proxy_handler)
-{
- return this->connection_map_.find (conn_id, proxy_handler);
-}
-
-int
-ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler)
-{
- switch (this->connection_map_.bind (proxy_handler->id (), proxy_handler))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) bind failed for connection %d\n",
- proxy_handler->id ()), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) duplicate connection %d, already bound\n",
- proxy_handler->id ()), -1);
- /* NOTREACHED */
- case 0:
- // Success.
- return 0;
- }
-}
-
-int
-ACE_Event_Channel::subscribe (const Event_Key &event_addr,
- Consumer_Dispatch_Set *cds)
-{
- // Bind with consumer map, keyed by peer address.
- switch (this->efd_.bind (event_addr, cds))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
- event_addr.conn_id_), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, "
- "already bound\n", event_addr.conn_id_), -1);
- /* NOTREACHED */
- case 0:
- // Success.
- return 0;
- }
-}
-
-ACE_Event_Channel::open (void *)
-{
- // Ignore SIPPIPE so each Consumer_Proxy can handle it.
- ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
-
-#if 0
- // If this->performance_window_ > 0 start a timer.
-
- if (this->options ().performance_window_ > 0)
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (this, 0, this->options ().performance_window_) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
- else
- ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
- this->options ().performance_window_)));
- }
-#endif
-
- return 0;
-}
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
deleted file mode 100644
index 1ecf468addf..00000000000
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ /dev/null
@@ -1,125 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Event_Channel.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (ACE_EVENT_CHANNEL)
-#define ACE_EVENT_CHANNEL
-
-#include "Proxy_Handler_Connector.h"
-
-class ACE_Svc_Export ACE_Event_Channel_Options
- // = TITLE
- // Maintains the options for an <ACE_Event_Channel>.
-{
-public:
- ACE_Event_Channel_Options (void);
- // Initialization.
-
- int performance_window_;
- // Number of seconds after connection establishment to report
- // throughput.
-
- int blocking_semantics_;
- // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.
-
- int socket_queue_size_;
- // Size of the socket queue (0 means "use default").
-};
-
-class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<SYNCH_STRATEGY>
- // = TITLE
- // Define a generic Event_Channel.
- //
- // = DESCRIPTION
-{
-public:
- // = Initialization and termination methods.
- ACE_Event_Channel (void);
- ~ACE_Event_Channel (void);
-
- virtual int open (void * = 0);
- // Open the channel.
-
- virtual int close (u_long = 0);
- // Close down the Channel.
-
- // = Proxy management methods.
- int initiate_proxy_connection (Proxy_Handler *,
- ACE_Synch_Options & = ACE_Synch_Options::synch);
- // Initiate the connection of the <Proxy_Handler> to its peer.
-
- int complete_proxy_connection (Proxy_Handler *);
- // Complete the initialization of the <Proxy_Handler> once it's
- // connected to its Peer.
-
- int reinitiate_proxy_connection (Proxy_Handler *);
- // Reinitiate a connection asynchronously when the Peer fails.
-
- int bind_proxy (Proxy_Handler *);
- // Bind the <Proxy_Handler> to the <connection_map_>.
-
- int find_proxy (ACE_INT32 conn_id, Proxy_Handler *&);
- // Locate the <Proxy_Handler> with <conn_id>.
-
- int subscribe (const Event_Key &event_addr,
- Consumer_Dispatch_Set *cds);
- // Subscribe the <Consumer_Dispatch_Set> to receive events that
- // match <Event_Key>.
-
- // = Event forwarding method.
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value * = 0);
- // Pass <mb> to the Event Channel so it can forward it to Consumers.
-
- ACE_Event_Channel_Options &options (void);
- // Points to the Event_Channel options.
-
- int initiate_connections (void);
- // Initiate connections to the peers.
-
-private:
- virtual int svc (void);
- // Run as an active object.
-
- int parse_args (int argc, char *argv[]);
- // Parse the command-line arguments.
-
- virtual int handle_timeout (const ACE_Time_Value &,
- const void *arg);
- // Perform timer-based performance profiling.
-
- Proxy_Handler_Connector connector_;
- // Used to establish the connections actively.
-
- // Proxy_Handler_Acceptor acceptor_;
- // Used to establish the connections passively.
-
- // = Make life easier by defining typedefs.
- // Note that Proxy_Handler is assumed to the base class of
- // SUPPLIER_PROXY and CONSUMER_PROXY.
- typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP;
- typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR;
- typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY;
-
- CONNECTION_MAP connection_map_;
- // Table that maps Connection IDs to Proxy_Handler *'s.
-
- Event_Forwarding_Discriminator efd_;
- // Map that associates an event to a set of Consumer_Proxy *'s.
-
- ACE_Event_Channel_Options options_;
- // The options for the channel.
-};
-
-#endif /* ACE_EVENT_CHANNEL */
diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
deleted file mode 100644
index 4dfbb658c1f..00000000000
--- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
+++ /dev/null
@@ -1,59 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#if !defined (_CONSUMER_MAP_C)
-#define _CONSUMER_MAP_C
-
-#include "Event_Forwarding_Discriminator.h"
-
-// Bind the Event_Key to the INT_ID.
-
-int
-Event_Forwarding_Discriminator::bind (Event_Key event_addr,
- Consumer_Dispatch_Set *cds)
-{
- return this->map_.bind (event_addr, cds);
-}
-
-// Find the Consumer_Dispatch_Set corresponding to the Event_Key.
-
-int
-Event_Forwarding_Discriminator::find (Event_Key event_addr,
- Consumer_Dispatch_Set *&cds)
-{
- return this->map_.find (event_addr, cds);
-}
-
-// Unbind (remove) the Event_Key from the map.
-
-int
-Event_Forwarding_Discriminator::unbind (Event_Key event_addr)
-{
- return this->map_.unbind (event_addr);
-}
-
-Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator
- (Event_Forwarding_Discriminator &rt)
- : map_iter_ (rt.map_)
-{
-}
-
-int
-Event_Forwarding_Discriminator_Iterator::next (Consumer_Dispatch_Set *&cds)
-{
- ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *> *temp;
- if (this->map_iter_.next (temp) == 0)
- return 0;
- else
- {
- cds = temp->int_id_;
- return 1;
- }
-}
-
-int
-Event_Forwarding_Discriminator_Iterator::advance (void)
-{
- return this->map_iter_.advance ();
-}
-#endif /* _CONSUMER_MAP_C */
diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
deleted file mode 100644
index 9b7531c1f46..00000000000
--- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Event_Forwarding_Discriminator.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_CONSUMER_MAP_H)
-#define _CONSUMER_MAP_H
-
-#include "ace/Map_Manager.h"
-#include "Concurrency_Strategies.h"
-#include "Event.h"
-#include "Consumer_Dispatch_Set.h"
-
-class Event_Forwarding_Discriminator
-{
- // = TITLE
- // Map events to the set of Consumer_Proxies that have subscribed
- // to receive the event.
-public:
- int bind (Event_Key event, Consumer_Dispatch_Set *cds);
- // Associate Event with the Consumer_Dispatch_Set.
-
- int unbind (Event_Key event);
- // Locate EXID and pass out parameter via INID. If found,
- // return 0, else -1.
-
- int find (Event_Key event, Consumer_Dispatch_Set *&cds);
- // Break any association of EXID.
-
-public:
- ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_;
- // Map that associates Event Addrs (external ids) with Consumer_Dispatch_Set *'s
- // <internal IDs>.
-};
-
-class Event_Forwarding_Discriminator_Iterator
-{
- // = TITLE
- // Define an iterator for the Consumer Map.
-public:
- Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm);
- int next (Consumer_Dispatch_Set *&);
- int advance (void);
-
-private:
- ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_iter_;
- // Map we are iterating over.
-};
-#endif /* _CONSUMER_MAP_H */
diff --git a/apps/Gateway/Gateway/File_Parser.cpp b/apps/Gateway/Gateway/File_Parser.cpp
deleted file mode 100644
index 07bda87180b..00000000000
--- a/apps/Gateway/Gateway/File_Parser.cpp
+++ /dev/null
@@ -1,142 +0,0 @@
-#if !defined (FILE_PARSER_C)
-// $Id$
-
-#define FILE_PARSER_C
-
-#include "ace/OS.h"
-#include "File_Parser.h"
-
-// This fixes a nasty bug with cfront-based compilers (like
-// Centerline).
-typedef FP::Return_Type FP_RETURN_TYPE;
-
-// File_Parser stuff.
-
-template <class ENTRY> int
-File_Parser<ENTRY>::open (const char filename[])
-{
- this->infile_ = ACE_OS::fopen (filename, "r");
- if (this->infile_ == 0)
- return -1;
- else
- return 0;
-}
-
-template <class ENTRY> int
-File_Parser<ENTRY>::close (void)
-{
- return ACE_OS::fclose (this->infile_);
-}
-
-template <class ENTRY> FP_RETURN_TYPE
-File_Parser<ENTRY>::getword (char buf[])
-{
- return this->readword (buf);
-}
-
-// Get the next string from the file via this->readword()
-// Check make sure the string forms a valid number.
-template <class ENTRY> FP_RETURN_TYPE
-File_Parser<ENTRY>::getint (ACE_INT32 &value)
-{
- char buf[BUFSIZ];
- FP_RETURN_TYPE read_result = this->readword(buf);
- if (read_result == FP::SUCCESS)
- {
- // ptr is used for error checking with ACE_OS::strtol
- char *ptr;
-
- // try to convert the buf to a decimal number
- value = ACE_OS::strtol (buf, &ptr, 10);
-
- // check if the buf is a decimal or not
- if (value == 0 && ptr == buf)
- return FP::ERROR;
- else
- return FP::SUCCESS;
- }
- else
- return read_result;
-}
-
-
-template <class ENTRY> FP_RETURN_TYPE
-File_Parser<ENTRY>::readword (char buf[])
-{
- int wordlength = 0;
- int c;
-
- // Skip over leading delimiters and get word.
-
- while ((c = getc (this->infile_)) != EOF && c != '\n')
- if (this->delimiter (c))
- {
- // We've reached the end of a "word".
- if (wordlength > 0)
- break;
- }
- else
- buf[wordlength++] = c;
-
- buf[wordlength] = '\0';
-
- if (c == EOF) {
- // If EOF is just a delimiter, don't return EOF so that the word
- // gets processed.
- if (wordlength > 0)
- {
- ungetc (c, this->infile_);
- return FP::SUCCESS;
- }
- else
- // else return EOF so that read loops stop
- return FP::EOFILE;
- }
- else if (c == '\n')
- {
- // if the EOLINE is just a delimiter, don't return EOLINE
- // so that the word gets processed
- if (wordlength > 0)
- ungetc (c, this->infile_);
- else
- return FP::EOLINE;
- }
-
- // Skip comments.
- if (this->comments (buf[0]))
- {
- if (this->skipline () == EOF)
- return FP::EOFILE;
- else
- return FP::COMMENT;
- }
- else
- return FP::SUCCESS;
-}
-
-template <class ENTRY> int
-File_Parser<ENTRY>::delimiter (char ch)
-{
- return ch == ' ' || ch == ',' || ch == '\t';
-}
-
-template <class ENTRY> int
-File_Parser<ENTRY>::comments (char ch)
-{
- return ch == '#';
-}
-
-template <class ENTRY> int
-File_Parser<ENTRY>::skipline (void)
-{
- // Skip the remainder of the line.
-
- int c;
-
- while ((c = getc (this->infile_)) != '\n' && c != EOF)
- continue;
-
- return c;
-}
-
-#endif /* _FILE_PARSER_C */
diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h
deleted file mode 100644
index f1de7429db0..00000000000
--- a/apps/Gateway/Gateway/File_Parser.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// File_Parser.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_FILE_PARSER)
-#define _FILE_PARSER
-
-#include "ace/OS.h"
-
-class FP
- // = TITLE
- // This class serves as a namespace for the Return_Type
-{
-public:
- enum Return_Type
- {
- EOLINE,
- EOFILE,
- SUCCESS,
- COMMENT,
- ERROR
- };
-};
-
-template <class ENTRY>
-class File_Parser
- // = TITLE
- // Class used to parse the configuration file for the Consumer
- // Map.
-{
-public:
- // = Open and Close the file specified
- int open (const char filename[]);
- int close (void);
-
- virtual FP::Return_Type read_entry (ENTRY &, int &line_number) = 0;
- // Implementations use protected methods to fill in the entry.
-
-protected:
- FP::Return_Type getword (char buf[]);
- // Read the next ASCII word.
-
- FP::Return_Type getint (ACE_INT32 &value);
- // Read the next integer.
-
- FP::Return_Type readword (char buf[]);
- int delimiter (char ch);
- int comments (char ch);
- int skipline (void);
-
- FILE *infile_;
-};
-
-#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
-#include "File_Parser.cpp"
-#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-
-#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
-#pragma implementation ("File_Parser.cpp")
-#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
-
-#endif /* _FILE_PARSER */
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
deleted file mode 100644
index 4ff09aed1b7..00000000000
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ /dev/null
@@ -1,341 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-#include "ace/Get_Opt.h"
-#include "Config_Files.h"
-#include "ace/Service_Config.h"
-#include "Event_Channel.h"
-#include "Gateway.h"
-
-class Gateway : public ACE_Service_Object
- // = TITLE
- // Integrates the whole Gateway application.
- //
- // = DESCRIPTION
- // This implementation uses the <ACE_Event_Channel> as the basis
- // for the <Gateway> routing.
-{
-public:
- // = Service configurator hooks.
- virtual int init (int argc, char *argv[]);
- // Perform initialization.
-
- virtual int fini (void);
- // Perform termination.
-
- virtual int info (char **, size_t) const;
- // Return info about this service.
-
- int parse_connection_config_file (void);
- // Parse the connection configuration file.
-
- int parse_consumer_config_file (void);
- // Parse the consumer configuration file.
-
-protected:
- int handle_input (ACE_HANDLE);
- // Shut down the Gateway when input comes in from the controlling
- // console.
-
- int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
- // Shut down the Gateway when a signal arrives.
-
- int parse_args (int argc, char *argv[]);
- // Parse gateway configuration arguments obtained from svc.conf
- // file.
-
- char connection_config_file_[MAXPATHLEN + 1];
- // Name of the connection configuration file.
-
- char consumer_config_file_[MAXPATHLEN + 1];
- // Name of the consumer map configuration file.
-
- ACE_Event_Channel event_channel_;
- // The Event Channel routes events from Supplier(s) to Consumer(s).
-
- int active_connector_role_;
- // Enabled if we are playing the role of the active Connector.
-
- int debug_;
- // Are we debugging?
-};
-
-int
-Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *)
-{
- if (signum > 0)
- ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum));
-
- // Shut down the main event loop.
- ACE_Service_Config::end_reactor_event_loop ();
- return 0;
-}
-
-int
-Gateway::handle_input (ACE_HANDLE h)
-{
- if (ACE_Service_Config::reactor ()->remove_handler
- (0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "remove_handler"), -1);
-
- char buf[BUFSIZ];
- // Consume the input...
- ACE_OS::read (h, buf, sizeof (buf));
-
- // Shut us down.
- return this->handle_signal (h);
-}
-
-// Parse the "command-line" arguments and set the corresponding flags.
-
-int
-Gateway::parse_args (int argc, char *argv[])
-{
- ACE_OS::strcpy (this->connection_config_file_, "connection_config");
- ACE_OS::strcpy (this->consumer_config_file_, "consumer_config");
- this->active_connector_role_ = 1;
- this->debug_ = 0;
-
- ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0);
-
- for (int c; (c = get_opt ()) != -1; )
- {
- switch (c)
- {
- case 'b': // Use blocking connection establishment.
- this->event_channel_.options ().blocking_semantics_ = 0;
- break;
- case 'c':
- ACE_OS::strncpy (this->connection_config_file_,
- get_opt.optarg,
- sizeof this->connection_config_file_);
- break;
- case 'd':
- this->debug_ = 1;
- break;
- case 'p':
- // We are not playing the active Connector role.
- this->active_connector_role_ = 0;
- break;
- case 'q':
- this->event_channel_.options ().socket_queue_size_ =
- ACE_OS::atoi (get_opt.optarg);
- break;
- case 'r':
- ACE_OS::strncpy (this->consumer_config_file_,
- get_opt.optarg,
- sizeof this->consumer_config_file_);
- break;
- case 'w': // Time performance for a designated amount of time.
- this->event_channel_.options ().performance_window_ =
- ACE_OS::atoi (get_opt.optarg);
- // Use blocking connection semantics so that we get accurate
- // timings (since all connections start at once).
- this->event_channel_.options ().blocking_semantics_ = 0;
- break;
- default:
- break;
- }
- }
- return 0;
-}
-
-int
-Gateway::init (int argc, char *argv[])
-{
- // Initialize the Event_Channel.
- if (this->event_channel_.open () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1);
-
- // Parse the "command-line" arguments.
- this->parse_args (argc, argv);
-
- ACE_Sig_Set sig_set;
- sig_set.sig_add (SIGINT);
- sig_set.sig_add (SIGQUIT);
-
- // Register ourselves to receive SIGINT and SIGQUIT so we can shut
- // down gracefully via signals.
-
- if (ACE_Service_Config::reactor ()->register_handler
- (sig_set, this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- if (ACE_Service_Config::reactor ()->register_handler
- (0, this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- if (this->active_connector_role_)
- {
- // Parse the connection configuration file.
- this->parse_connection_config_file ();
-
- // Parse the consumer map config file and build the consumer
- // map.
- this->parse_consumer_config_file ();
-
- // Initiate connections with the peers.
- this->event_channel_.initiate_connections ();
- }
-
- return 0;
-}
-
-// This method is automatically called when the gateway is shutdown.
-// It closes down the Event Channel.
-
-int
-Gateway::fini (void)
-{
- this->event_channel_.close ();
- return 0;
-}
-
-// Returns information on the currently active service.
-
-int
-Gateway::info (char **strp, size_t length) const
-{
- char buf[BUFSIZ];
-
- ACE_OS::sprintf (buf, "%s\t %s", "Gateway daemon",
- "# Application-level gateway\n");
-
- if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
- return -1;
- else
- ACE_OS::strncpy (*strp, buf, length);
- return ACE_OS::strlen (buf);
-}
-
-// Parse and build the connection table.
-
-int
-Gateway::parse_connection_config_file (void)
-{
- // File that contains the consumer map configuration information.
- Connection_Config_File_Parser connection_file;
- Connection_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (connection_file.open (this->connection_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1);
-
- // Read config file one line at a time.
- while (connection_file.read_entry (entry, line_number) != FP::EOFILE)
- {
- file_empty = 0;
-
- if (this->debug_)
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, "
- "proxy role = %c, max retry timeout = %d, local port = %d\n",
- entry.conn_id_,
- entry.host_,
- entry.remote_port_,
- entry.proxy_role_,
- entry.max_retry_delay_,
- entry.local_port_));
-
- Proxy_Handler *proxy_handler = 0;
-
- // Initialize the entry's peer addressing info.
-
- ACE_INET_Addr remote_addr (entry.remote_port_, entry.host_);
- ACE_INET_Addr local_addr (entry.local_port_);
-
- // The next few lines of code are dependent on whether we are
- // making an Supplier_Proxy or an Consumer_Proxy.
-
- if (entry.proxy_role_ == 'C') // Configure a Consumer_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- CONSUMER_PROXY (this->event_channel_, remote_addr,
- local_addr, entry.conn_id_),
- -1);
- else // proxy_role == 'S', so configure a Supplier_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- SUPPLIER_PROXY (this->event_channel_, remote_addr,
- local_addr, entry.conn_id_),
- -1);
-
- // The following code is common to both Supplier_Proxys_ and
- // Consumer_Proxys.
-
- // Initialize max timeout.
- proxy_handler->max_timeout (entry.max_retry_delay_);
-
- // Bind the new Proxy_Handler to the connection ID.
- this->event_channel_.bind_proxy (proxy_handler);
- }
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: connection proxy_handler configuration file was empty\n"));
- return 0;
-}
-
-int
-Gateway::parse_consumer_config_file (void)
-{
- // File that contains the consumer event forwarding information.
- Consumer_Config_File_Parser consumer_file;
- Consumer_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (consumer_file.open (this->consumer_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1);
-
- // Read config file line at a time.
- while (consumer_file.read_entry (entry, line_number) != FP::EOFILE)
- {
- file_empty = 0;
-
- if (this->debug_)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
- "number of consumers = %d\n",
- entry.conn_id_,
- entry.supplier_id_,
- entry.type_,
- entry.total_consumers_));
- for (int i = 0; i < entry.total_consumers_; i++)
- ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
- i, entry.consumers_[i]));
- }
-
- Consumer_Dispatch_Set *dispatch_set;
- ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1);
-
- Event_Key event_addr (entry.conn_id_,
- entry.supplier_id_,
- entry.type_);
-
- // Add the Consumers to the Dispatch_Set.
- for (int i = 0; i < entry.total_consumers_; i++)
- {
- Proxy_Handler *proxy_handler = 0;
-
- // Lookup destination and add to Consumer_Dispatch_Set set
- // if found.
- if (this->event_channel_.find_proxy (entry.consumers_[i],
- proxy_handler) != -1)
- dispatch_set->insert (proxy_handler);
- else
- ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
- i, entry.consumers_[i]));
- }
-
- this->event_channel_.subscribe (event_addr, dispatch_set);
- }
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: consumer map configuration file was empty\n"));
- return 0;
-}
-
-// The following is a "Factory" used by the ACE_Service_Config and
-// svc.conf file to dynamically initialize the state of the Gateway.
-
-ACE_SVC_FACTORY_DEFINE (Gateway)
diff --git a/apps/Gateway/Gateway/Gateway.h b/apps/Gateway/Gateway/Gateway.h
deleted file mode 100644
index 057ce981701..00000000000
--- a/apps/Gateway/Gateway/Gateway.h
+++ /dev/null
@@ -1,25 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Gateway.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (ACE_GATEWAY)
-#define ACE_GATEWAY
-
-#include "ace/OS.h"
-
-ACE_SVC_FACTORY_DECLARE (Gateway)
-
-#endif /* ACE_GATEWAY */
-
diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.cpp b/apps/Gateway/Gateway/IO_Handler_Connector.cpp
deleted file mode 100644
index 712b348951d..00000000000
--- a/apps/Gateway/Gateway/IO_Handler_Connector.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-#include "IO_Handler_Connector.h"
-// $Id$
-
-
-IO_Handler_Connector::IO_Handler_Connector (void)
-{
-}
-
-// Override the connection-failure method to add timer support.
-// Note that these timers perform "expoential backoff" to
-// avoid rapidly trying to reestablish connections when a link
-// goes down.
-
-int
-IO_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
-{
- ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0;
-
- // Locate the ACE_Svc_Handler corresponding to the socket descriptor.
- if (this->handler_map_.find (sd, stp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n",
- sd, "find"), -1);
-
- IO_Handler *channel = stp->svc_handler ();
-
- // Schedule a reconnection request at some point in the future
- // (note that channel uses an exponential backoff scheme).
- if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0,
- channel->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- return 0;
-}
-
-// Initiate (or reinitiate) a connection to the IO_Handler.
-
-int
-IO_Handler_Connector::initiate_connection (IO_Handler *channel,
- ACE_Synch_Options &synch_options)
-{
- char buf[MAXHOSTNAMELEN];
-
- // Mark ourselves as idle so that the various iterators
- // will ignore us until we are reconnected.
- channel->state (IO_Handler::IDLE);
-
- if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1
- || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "can't obtain peer's address"), -1);
-
- // Try to connect to the Peer.
-
- if (this->connect (channel, channel->remote_addr (),
- synch_options, channel->local_addr ()) == -1)
- {
- if (errno != EWOULDBLOCK)
- {
- channel->state (IO_Handler::FAILED);
- ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
- "connect", buf));
-
- // Reschedule ourselves to try and connect again.
- if (synch_options[ACE_Synch_Options::USE_REACTOR])
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (channel, 0, channel->timeout ()) == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- else
- // Failures on synchronous connects are reported as errors
- // so that the caller can decide how to proceed.
- return -1;
- }
- else
- {
- channel->state (IO_Handler::CONNECTING);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in the process of connecting %s to %s\n",
- synch_options[ACE_Synch_Options::USE_REACTOR]
- ? "asynchronously" : "synchronously", buf));
- }
- }
- else
- {
- channel->state (IO_Handler::ESTABLISHED);
- ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
- buf, channel->get_handle ()));
- }
- return 0;
-}
diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.h b/apps/Gateway/Gateway/IO_Handler_Connector.h
deleted file mode 100644
index 585428c88ee..00000000000
--- a/apps/Gateway/Gateway/IO_Handler_Connector.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// IO_Handler_Connector.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_IO_HANDLER_CONNECTOR)
-#define _IO_HANDLER_CONNECTOR
-
-#include "ace/Connector.h"
-#include "Thr_IO_Handler.h"
-
-class IO_Handler_Connector : public ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>
- // = TITLE
- // A concrete factory class that setups connections to peerds
- // and produces a new IO_Handler object to do the dirty work...
-{
-public:
- IO_Handler_Connector (void);
-
- // Initiate (or reinitiate) a connection on the IO_Handler.
- int initiate_connection (IO_Handler *,
- ACE_Synch_Options & = ACE_Synch_Options::synch);
-
-protected:
- // Override the connection-failure method to add timer support.
- virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask);
-};
-
-#endif /* _IO_HANDLER_CONNECTOR */
diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile
deleted file mode 100644
index c3ae8dffe4d..00000000000
--- a/apps/Gateway/Gateway/Makefile
+++ /dev/null
@@ -1,454 +0,0 @@
-#----------------------------------------------------------------------------
-# @(#)Makefile 1.1 10/18/96
-#
-# Makefile for the Gateway.
-#----------------------------------------------------------------------------
-
-#----------------------------------------------------------------------------
-# Local macros
-#----------------------------------------------------------------------------
-
-BIN = gatewayd
-LIB = libGateway.a
-SHLIB = libGateway.so
-
-FILES = Config_Files \
- File_Parser \
- Gateway \
- Event_Channel \
- Event_Forwarding_Discriminator \
- Proxy_Handler \
- Proxy_Handler_Connector \
- Thr_Proxy_Handler
-
-LSRC = $(addsuffix .cpp,$(FILES))
-LOBJ = $(addsuffix .o,$(FILES))
-SHOBJ = $(addsuffix .so,$(FILES))
-
-LDLIBS = -lGateway
-LIBS = -lACE
-
-VLDLIBS = $(LDLIBS:%=%$(VAR))
-
-BUILD = $(VLIB) $(VSHLIB) $(SHLIBA) $(VBIN)
-
-#----------------------------------------------------------------------------
-# Include macros and targets
-#----------------------------------------------------------------------------
-
-include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
-
-#----------------------------------------------------------------------------
-# Local targets
-#----------------------------------------------------------------------------
-
-# Default behavior is to use single-threading. See the README
-# file for information on how to configure this with multiple
-# strategies for threading the input and output channels.
-# DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT
-
-#----------------------------------------------------------------------------
-# Dependencies
-#----------------------------------------------------------------------------
-
-# DO NOT DELETE THIS LINE -- g++dep uses it.
-# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
-
-.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- Config_Files.h File_Parser.h
-.obj/File_Parser.o .shobj/File_Parser.so: File_Parser.cpp \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- File_Parser.h
-.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- Event_Channel.h Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Consumer_Dispatch_Set.h Gateway.h
-.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \
- $(WRAPPER_ROOT)/ace/Get_Opt.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- Config_Files.h File_Parser.h Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Consumer_Dispatch_Set.h Event_Channel.h
-.obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \
- Event_Forwarding_Discriminator.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- Concurrency_Strategies.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- Event.h Consumer_Dispatch_Set.h \
- $(WRAPPER_ROOT)/ace/Set.h
-.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Consumer_Dispatch_Set.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h
-.obj/Proxy_Handler_Connector.o .shobj/Proxy_Handler_Connector.so: Proxy_Handler_Connector.cpp \
- Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Consumer_Dispatch_Set.h
-.obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \
- Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- Event_Forwarding_Discriminator.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- Concurrency_Strategies.h Event.h Consumer_Dispatch_Set.h \
- Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.i
-
-# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/apps/Gateway/Gateway/Peer_Message.h b/apps/Gateway/Gateway/Peer_Message.h
deleted file mode 100644
index d9e65650095..00000000000
--- a/apps/Gateway/Gateway/Peer_Message.h
+++ /dev/null
@@ -1,89 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Peer_Message.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (PEER_MESSAGE)
-#define PEER_MESSAGE
-
-// This is the unique connection identifier that denotes a particular
-// Channel in the Gateway.
-typedef short CONN_ID;
-
-class Peer_Addr
- // = TITLE
- // Peer address is used to identify the source/destination of a
- // routing message.
-{
-public:
- Peer_Addr (CONN_ID cid = -1, u_char lid = 0, u_char pay = 0)
- : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {}
-
- int operator== (const Peer_Addr &pa) const
- {
- return this->conn_id_ == pa.conn_id_
- && this->logical_id_ == pa.logical_id_
- && this->payload_ == pa.payload_;
- }
-
- CONN_ID conn_id_;
- // Unique connection identifier that denotes a particular Channel.
-
- u_char logical_id_;
- // Logical ID.
-
- u_char payload_;
- // Payload type.
-};
-
-
-class Peer_Header
- // = TITLE
- // Fixed sized header.
-{
-public:
- typedef u_short ROUTING_ID;
- // Type used to route messages from gatewayd.
-
- enum
- {
- INVALID_ID = -1 // No peer can validly use this number.
- };
-
- ROUTING_ID routing_id_;
- // Source ID.
-
- size_t len_;
- // Length of the message in bytes.
-};
-
-class Peer_Message
- // = TITLE
- // Variable-sized message (buf_ may be variable-sized between
- // 0 and MAX_PAYLOAD_SIZE).
-{
-public:
- enum { MAX_PAYLOAD_SIZE = 1024 };
- // The maximum size of an Peer message (see Peer protocol specs for
- // exact #).
-
- Peer_Header header_;
- // Message header.
-
- char buf_[MAX_PAYLOAD_SIZE];
- // Message payload.
-};
-
-#endif /* PEER_MESSAGE */
diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp
deleted file mode 100644
index 2f161c171f6..00000000000
--- a/apps/Gateway/Gateway/Proxy_Handler.cpp
+++ /dev/null
@@ -1,581 +0,0 @@
-// $Id$
-
-#include "Event_Channel.h"
-
-void
-Proxy_Handler::id (ACE_INT32 id)
-{
- this->id_ = id;
-}
-
-ACE_INT32
-Proxy_Handler::id (void)
-{
- return this->id_;
-}
-
-// The total number of bytes sent/received on this Proxy.
-
-size_t
-Proxy_Handler::total_bytes (void)
-{
- return this->total_bytes_;
-}
-
-void
-Proxy_Handler::total_bytes (size_t bytes)
-{
- this->total_bytes_ += bytes;
-}
-
-Proxy_Handler::Proxy_Handler (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : remote_addr_ (remote_addr),
- local_addr_ (local_addr),
- id_ (conn_id),
- total_bytes_ (0),
- state_ (Proxy_Handler::IDLE),
- timeout_ (1),
- max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT),
- event_channel_ (ec)
-{
-}
-
-// Set the proxy_role.
-
-void
-Proxy_Handler::proxy_role (char d)
-{
- this->proxy_role_ = d;
-}
-
-// Get the proxy_role.
-
-char
-Proxy_Handler::proxy_role (void)
-{
- return this->proxy_role_;
-}
-
-// Sets the timeout delay.
-
-void
-Proxy_Handler::timeout (int to)
-{
- if (to > this->max_timeout_)
- to = this->max_timeout_;
-
- this->timeout_ = to;
-}
-
-// Re-calculate the current retry timeout delay using exponential
-// backoff. Returns the original timeout (i.e., before the
-// re-calculation).
-
-int
-Proxy_Handler::timeout (void)
-{
- int old_timeout = this->timeout_;
- this->timeout_ *= 2;
-
- if (this->timeout_ > this->max_timeout_)
- this->timeout_ = this->max_timeout_;
-
- return old_timeout;
-}
-
-// Sets the max timeout delay.
-
-void
-Proxy_Handler::max_timeout (int mto)
-{
- this->max_timeout_ = mto;
-}
-
-// Gets the max timeout delay.
-
-int
-Proxy_Handler::max_timeout (void)
-{
- return this->max_timeout_;
-}
-
-// Restart connection asynchronously when timeout occurs.
-
-int
-Proxy_Handler::handle_timeout (const ACE_Time_Value &,
- const void *)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n",
- this->id (), this->timeout_));
-
- // Delegate the re-connection attempt to the Event Channel.
- return this->event_channel_.initiate_proxy_connection
- (this, ACE_Synch_Options::asynch);
-}
-
-// Handle shutdown of the Proxy_Handler object.
-
-int
-Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down Proxy_Handler %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- // Restart the connection, if possible.
- return this->event_channel_.reinitiate_proxy_connection (this);
-}
-
-// Set the state of the Proxy.
-
-void
-Proxy_Handler::state (Proxy_Handler::State s)
-{
- this->state_ = s;
-}
-
-// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates
-// control to our Proxy_Handler.
-
-int
-Proxy_Handler::open (void *)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's handle = %d\n",
- this->peer ().get_handle ()));
-
- // Turn on non-blocking I/O.
- if (this->peer ().enable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Call back to the <Event_Channel> to complete our initialization.
- else if (this->event_channel_.complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
-
- // Register ourselves to receive input events.
- else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
- else
- return 0;
-}
-
-// Return the current state of the Proxy.
-
-Proxy_Handler::State
-Proxy_Handler::state (void)
-{
- return this->state_;
-}
-
-ACE_INET_Addr &
-Proxy_Handler::remote_addr (void)
-{
- return this->remote_addr_;
-}
-
-ACE_INET_Addr &
-Proxy_Handler::local_addr (void)
-{
- return this->local_addr_;
-}
-
-Consumer_Proxy::Consumer_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : Proxy_Handler (ec, remote_addr, local_addr, conn_id)
-{
- this->proxy_role_ = 'C';
- this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE);
-}
-
-// This method should be called only when the Consumer shuts down
-// unexpectedly. This method simply marks the Proxy_Handler as having
-// failed so that handle_close () can reconnect.
-
-int
-Consumer_Proxy::handle_input (ACE_HANDLE)
-{
- char buf[1];
-
- this->state (Proxy_Handler::FAILED);
-
- switch (this->peer ().recv (buf, sizeof buf))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case 0:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- default:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- }
-}
-
-// Perform a non-blocking put() of event. If we are unable to send
-// the entire event the remainder is re-queued at the *front* of the
-// Event_List.
-
-int
-Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
-{
- // Try to send the event. If we don't send it all (e.g., due to
- // flow control), then re-queue the remainder at the head of the
- // Event_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
-
- ssize_t n = this->send (event);
-
- if (n == -1)
- {
- // Things have gone wrong, let's try to close down and set up a
- // new reconnection by calling handle_close().
- this->state (Proxy_Handler::FAILED);
- this->handle_close ();
- return -1;
- }
- else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) queueing activated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
- // ACE_Queue in *front* of the list to preserve order.
- if (this->msg_queue ()->enqueue_head
- (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
-
- // Tell ACE_Reactor to call us back when we can send again.
- else if (ACE_Service_Config::reactor ()->schedule_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
- return 0;
- }
- else
- return n;
-}
-
-ssize_t
-Consumer_Proxy::send (ACE_Message_Block *event)
-{
- ssize_t len = event->length ();
- ssize_t n = this->peer ().send (event->rd_ptr (), len);
-
- if (n <= 0)
- return errno == EWOULDBLOCK ? 0 : n;
- else if (n < len)
- // Re-adjust pointer to skip over the part we did send.
- event->rd_ptr (n);
- else // if (n == length)
- {
- // The whole event is sent, we now decrement the reference count
- // (which deletes itself with it reaches 0.
- event->release ();
- errno = 0;
- }
- this->total_bytes (n);
- return n;
-}
-
-// Finish sending an event when flow control conditions abate.
-// This method is automatically called by the ACE_Reactor.
-
-int
-Consumer_Proxy::handle_output (ACE_HANDLE)
-{
- ACE_Message_Block *event = 0;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in handle_output on handle %d\n",
- this->get_handle ()));
- // The list had better not be empty, otherwise there's a bug!
-
- if (this->msg_queue ()->dequeue_head
- (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
- {
- switch (this->nonblk_put (event))
- {
- case 0: // Partial send.
- ACE_ASSERT (errno == EWOULDBLOCK);
- // Didn't write everything this time, come back later...
- break;
-
- case -1:
- // We are responsible for releasing an ACE_Message_Block if
- // failures occur.
- event->release ();
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
-
- /* FALLTHROUGH */
- default: // Sent the whole thing.
-
- // If we succeed in writing the entire event (or we did not
- // fail due to EWOULDBLOCK) then check if there are more
- // events on the Message_Queue. If there aren't, tell the
- // ACE_Reactor not to notify us anymore (at least until
- // there are new events queued up).
-
- if (this->msg_queue ()->is_empty ())
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) queueing deactivated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
-
- if (ACE_Service_Config::reactor ()->cancel_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
- }
- }
- }
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
- return 0;
-}
-
-// Send an event to a Consumer (may queue if necessary).
-
-int
-Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
-{
- if (this->msg_queue ()->is_empty ())
- // Try to send the event *without* blocking!
- return this->nonblk_put (event);
- else
- // If we have queued up events due to flow control then just
- // enqueue and return.
- return this->msg_queue ()->enqueue_tail
- (event, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-Supplier_Proxy::Supplier_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : msg_frag_ (0),
- Proxy_Handler (ec, remote_addr, local_addr, conn_id)
-{
- this->proxy_role_ = 'S';
- this->msg_queue ()->high_water_mark (0);
-}
-
-// Receive an Event from a Supplier. Handles fragmentation.
-//
-// The event returned from recv consists of two parts:
-//
-// 1. The Address part, contains the "virtual" routing id.
-//
-// 2. The Data part, which contains the actual data to be forwarded.
-//
-// The reason for having two parts is to shield the higher layers
-// of software from knowledge of the event structure.
-
-int
-Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
-{
- if (this->msg_frag_ == 0)
- // No existing fragment...
- ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- Event *event = (Event *) this->msg_frag_->rd_ptr ();
- ssize_t header_received = 0;
-
- const ssize_t HEADER_SIZE = sizeof (Event_Header);
- ssize_t header_bytes_left_to_read =
- HEADER_SIZE - this->msg_frag_->length ();
-
- if (header_bytes_left_to_read > 0)
- {
- header_received = this->peer ().recv
- (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
-
- if (header_received == -1 /* error */
- || header_received == 0 /* EOF */)
- {
- ACE_ERROR ((LM_ERROR, "%p\n",
- "Recv error during header read "));
- ACE_DEBUG ((LM_DEBUG,
- "attempted to read %d\n",
- header_bytes_left_to_read));
- this->msg_frag_ = this->msg_frag_->release ();
- return header_received;
- }
-
- // Bump the write pointer by the amount read.
- this->msg_frag_->wr_ptr (header_received);
-
- // At this point we may or may not have the ENTIRE header.
- if (this->msg_frag_->length () < HEADER_SIZE)
- {
- ACE_DEBUG ((LM_DEBUG,
- "Partial header received: only %d bytes\n",
- this->msg_frag_->length ()));
- // Notify the caller that we didn't get an entire event.
- errno = EWOULDBLOCK;
- return -1;
- }
-
- // Convert the header into host byte order so that we can access
- // it directly without having to repeatedly muck with it...
- event->header_.decode ();
-
- if (event->header_.len_ > sizeof event->data_)
- {
- // This data_ payload is too big!
- errno = EINVAL;
- ACE_DEBUG ((LM_DEBUG,
- "Data payload is too big (%d bytes)\n",
- event->header_.len_));
- return -1;
- }
-
- }
-
- // At this point there is a complete, valid header in Event. Now we
- // need to get the event payload. Due to incomplete reads this may
- // not be the first time we've read in a fragment for this message.
- // We account for this here. Note that the first time in here
- // msg_frag_->wr_ptr() will point to event->data_. Every time we do
- // a successful fragment read, we advance wr_ptr(). Therefore, by
- // subtracting how much we've already read from the
- // event->header_.len_ we complete the data_bytes_left_to_read...
-
- ssize_t data_bytes_left_to_read =
- ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
-
- ssize_t data_received =
- this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
-
- // Try to receive the remainder of the event.
-
- switch (data_received)
- {
- case -1:
- if (errno == EWOULDBLOCK)
- // This might happen if only the header came through.
- return -1;
- else
- /* FALLTHROUGH */;
-
- case 0: // Premature EOF.
- this->msg_frag_ = this->msg_frag_->release ();
- return 0;
-
- default:
- // Set the write pointer at 1 past the end of the event.
- this->msg_frag_->wr_ptr (data_received);
-
- if (data_received != data_bytes_left_to_read)
- {
- errno = EWOULDBLOCK;
- // Inform caller that we didn't get the whole event.
- return -1;
- }
- else
- {
- // Set the read pointer to the beginning of the event.
- this->msg_frag_->rd_ptr (this->msg_frag_->base ());
-
- // Allocate an event forwarding header and chain the data
- // portion onto its continuation field.
- forward_addr = new ACE_Message_Block (sizeof (Event_Key),
- ACE_Message_Block::MB_PROTO,
- this->msg_frag_);
- if (forward_addr == 0)
- {
- this->msg_frag_ = this->msg_frag_->release ();
- errno = ENOMEM;
- return -1;
- }
-
- Event_Key event_addr (this->id (),
- event->header_.supplier_id_,
- event->header_.type_);
- // Copy the forwarding address from the Event_Key into
- // forward_addr.
- forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
-
- // Reset the pointer to indicate we've got an entire event.
- this->msg_frag_ = 0;
- }
-
- this->total_bytes (data_received + header_received);
-#if defined (VERBOSE)
- ACE_DEBUG ((LM_DEBUG, "(%t) connection id = %d, supplier id = %d, len = %d, payload = %*s",
- event_addr.conn_id_, event->header_.supplier_id_, event->header_.len_,
- event->header_.len_, event->data_));
-#else
- ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
- event->header_.supplier_id_, event->header_.len_, data_received + header_received));
-#endif /* VERBOSE */
-
- // Encode before returning so that we can set things out in
- // network byte order.
- event->header_.encode ();
- return data_received + header_received;
- }
-}
-
-// Receive various types of input (e.g., Peer event from the
-// gatewayd, as well as stdio).
-
-int
-Supplier_Proxy::handle_input (ACE_HANDLE)
-{
- ACE_Message_Block *forward_addr = 0;
-
- switch (this->recv (forward_addr))
- {
- case 0:
- // Note that a peer should never initiate a shutdown by closing
- // the connection. Instead, it should reconnect.
- this->state (Proxy_Handler::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case -1:
- if (errno == EWOULDBLOCK)
- // A short-read, we'll come back and finish it up later on!
- return 0;
- else // A weird problem occurred, shut down and start again.
- {
- this->state (Proxy_Handler::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n",
- "Peer has failed unexpectedly",
- this->id ()), -1);
- }
- /* NOTREACHED */
- default:
- return this->forward (forward_addr);
- }
-}
-
-// Forward an event to its appropriate Consumer(s). This delegates to
-// the <ACE_Event_Channel> to do the actual forwarding.
-
-int
-Supplier_Proxy::forward (ACE_Message_Block *forward_addr)
-{
- return this->event_channel_.put (forward_addr);
-}
-
-#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>;
-template class ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>;
-template class ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *>;
-#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h
deleted file mode 100644
index ffce18d1c71..00000000000
--- a/apps/Gateway/Gateway/Proxy_Handler.h
+++ /dev/null
@@ -1,202 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Proxy_Handler.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_PROXY_HANDLER)
-#define _PROXY_HANDLER
-
-#include "ace/Service_Config.h"
-#include "ace/SOCK_Connector.h"
-#include "ace/Svc_Handler.h"
-#include "Event_Forwarding_Discriminator.h"
-#include "Consumer_Dispatch_Set.h"
-#include "Event.h"
-
-// Forward declaration.
-class Proxy_Handler_Connector;
-class ACE_Event_Channel;
-
-class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
- // = TITLE
- // Proxy_Handler contains info about connection state and addressing.
- //
- // = DESCRIPTION
- // The Proxy_Handler classes process events sent to the Event
- // Channel from Suppliers and forward them to Consumers.
-{
-public:
- Proxy_Handler (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
-
- virtual int open (void * = 0);
- // Initialize and activate a single-threaded Proxy_Handler (called by
- // ACE_Connector::handle_output()).
-
- ACE_INET_Addr &remote_addr (void);
- // Returns the peer's routing address.
-
- ACE_INET_Addr &local_addr (void);
- // Returns our local address.
-
- // = Set/get routing id.
- ACE_INT32 id (void);
- void id (ACE_INT32);
-
- // = The current state of the Proxy_Handler.
- enum State
- {
- IDLE = 1, // Prior to initialization.
- CONNECTING, // During connection establishment.
- ESTABLISHED, // Proxy_Handler is established and active.
- DISCONNECTING, // Proxy_Handler is in the process of connecting.
- FAILED // Proxy_Handler has failed.
- };
-
- // = Set/get the current state.
- void state (State);
- State state (void);
-
- // = Set/get the current retry timeout delay.
- void timeout (int);
- int timeout (void);
-
- // = Set/get the maximum retry timeout delay.
- void max_timeout (int);
- int max_timeout (void);
-
- // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer
- // (necessary for error checking).
- void proxy_role (char);
- char proxy_role (void);
-
- // = The total number of bytes sent/received on this proxy.
- size_t total_bytes (void);
- void total_bytes (size_t bytes);
- // Increment count by <bytes>.
-
- virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
- // Perform timer-based Proxy_Handler reconnection.
-
-protected:
- enum
- {
- MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout.
- };
-
- virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
- // Perform Proxy_Handler termination.
-
- ACE_INET_Addr remote_addr_;
- // Address of peer.
-
- ACE_INET_Addr local_addr_;
- // Address of us.
-
- ACE_INT32 id_;
- // The assigned routing ID of this entry.
-
- size_t total_bytes_;
- // The total number of bytes sent/received on this proxy.
-
- State state_;
- // The current state of the proxy.
-
- int timeout_;
- // Amount of time to wait between reconnection attempts.
-
- int max_timeout_;
- // Maximum amount of time to wait between reconnection attempts.
-
- char proxy_role_;
- // Indicates which role the proxy plays ('S' == Supplier and 'C' ==
- // Consumer).
-
- ACE_Event_Channel &event_channel_;
- // Reference to the <ACE_Event_Channel> that we use to forward all
- // the events from Consumers and Suppliers.
-};
-
-class Supplier_Proxy : public Proxy_Handler
- // = TITLE
- // Handles reception of Events from Suppliers
- //
- // = DESCRIPTION
- // Performs framing and error checking.
-{
-public:
- // = Initialization method.
- Supplier_Proxy (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
-
-protected:
- // = All the following methods are upcalls, so they can be protected.
-
- virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- // Receive and process peer events.
-
- virtual int recv (ACE_Message_Block *&);
- // Receive an event from a Supplier.
-
- int forward (ACE_Message_Block *event);
- // Forward the <event> to its appropriate Consumer. This delegates
- // to the <ACE_Event_Channel> to do the actual forwarding.
-
- ACE_Message_Block *msg_frag_;
- // Keep track of event fragment to handle non-blocking recv's from
- // Suppliers.
-};
-
-class Consumer_Proxy : public Proxy_Handler
- // = TITLE
- // Handles transmission of events to Consumers.
- //
- // = DESCRIPTION
- // Performs queueing and error checking. Uses a single-threaded
- // Reactive approach to handle flow control.
-{
-public:
- // = Initialization method.
- Consumer_Proxy (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
-
- virtual int put (ACE_Message_Block *event,
- ACE_Time_Value * = 0);
- // Send an event to a Consumer (may be queued if necessary).
-
-protected:
- // = We'll allow up to 16 megabytes to be queued per-output proxy.
- enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16};
-
- virtual int handle_output (ACE_HANDLE);
- // Finish sending event when flow control conditions abate.
-
- int nonblk_put (ACE_Message_Block *mb);
- // Perform a non-blocking put().
-
- virtual ssize_t send (ACE_Message_Block *);
- // Send an event to a Consumer.
-
- virtual int handle_input (ACE_HANDLE);
- // Receive and process shutdowns from a Consumer.
-};
-
-#endif /* _PROXY_HANDLER */
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
deleted file mode 100644
index dc18eca8500..00000000000
--- a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-#include "Proxy_Handler_Connector.h"
-// $Id$
-
-
-Proxy_Handler_Connector::Proxy_Handler_Connector (void)
-{
-}
-
-// Override the connection-failure method to add timer support.
-// Note that these timers perform "expoential backoff" to
-// avoid rapidly trying to reestablish connections when a link
-// goes down.
-
-int
-Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
-{
- ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0;
-
- // Locate the ACE_Svc_Handler corresponding to the socket descriptor.
- if (this->handler_map_.find (sd, stp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate proxy %d in connector map, %p\n",
- sd, "find"), -1);
-
- Proxy_Handler *proxy_handler = stp->svc_handler ();
-
- // Schedule a reconnection request at some point in the future
- // (note that proxy_handler uses an exponential backoff scheme).
- if (ACE_Service_Config::reactor ()->schedule_timer
- (proxy_handler, 0, proxy_handler->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- return 0;
-}
-
-// Initiate (or reinitiate) a connection to the Proxy_Handler.
-
-int
-Proxy_Handler_Connector::initiate_connection (Proxy_Handler *proxy_handler,
- ACE_Synch_Options &synch_options)
-{
- char addr_buf[MAXHOSTNAMELEN];
-
- // Mark ourselves as idle so that the various iterators
- // will ignore us until we are reconnected.
- proxy_handler->state (Proxy_Handler::IDLE);
-
- // We check the remote addr second so that it remains in the addr_buf.
- if (proxy_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1
- || proxy_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "can't obtain peer's address"), -1);
-
- // Try to connect to the Peer.
-
- if (this->connect (proxy_handler, proxy_handler->remote_addr (),
- synch_options, proxy_handler->local_addr ()) == -1)
- {
- if (errno != EWOULDBLOCK)
- {
- proxy_handler->state (Proxy_Handler::FAILED);
- ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
- "connect", addr_buf));
-
- // Reschedule ourselves to try and connect again.
- if (synch_options[ACE_Synch_Options::USE_REACTOR])
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (proxy_handler, 0, proxy_handler->timeout ()) == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- else
- // Failures on synchronous connects are reported as errors
- // so that the caller can decide how to proceed.
- return -1;
- }
- else
- {
- proxy_handler->state (Proxy_Handler::CONNECTING);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in the process of connecting %s to %s\n",
- synch_options[ACE_Synch_Options::USE_REACTOR]
- ? "asynchronously" : "synchronously", addr_buf));
- }
- }
- else
- {
- proxy_handler->state (Proxy_Handler::ESTABLISHED);
- ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
- addr_buf, proxy_handler->get_handle ()));
- }
- return 0;
-}
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.h b/apps/Gateway/Gateway/Proxy_Handler_Connector.h
deleted file mode 100644
index 3baea75934a..00000000000
--- a/apps/Gateway/Gateway/Proxy_Handler_Connector.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Proxy_Handler_Connector.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_IO_HANDLER_CONNECTOR)
-#define _IO_HANDLER_CONNECTOR
-
-#include "ace/Connector.h"
-#include "Thr_Proxy_Handler.h"
-
-class Proxy_Handler_Connector : public ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>
- // = TITLE
- // A concrete factory class that setups connections to peerds
- // and produces a new Proxy_Handler object to do the dirty work...
-{
-public:
- Proxy_Handler_Connector (void);
-
- // Initiate (or reinitiate) a connection on the Proxy_Handler.
- int initiate_connection (Proxy_Handler *,
- ACE_Synch_Options & = ACE_Synch_Options::synch);
-
-protected:
- // Override the connection-failure method to add timer support.
- virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask);
-};
-
-#endif /* _IO_HANDLER_CONNECTOR */
diff --git a/apps/Gateway/Gateway/README b/apps/Gateway/Gateway/README
deleted file mode 100644
index e64ad26b568..00000000000
--- a/apps/Gateway/Gateway/README
+++ /dev/null
@@ -1,23 +0,0 @@
-This application illustrates an application-level Gateway which routes
-messages between Consumer and Suppliers in a distributed environment.
-
-The default configuration is single-threaded, i.e., all
-Supplier_Proxys and Consumer_Proxys are multiplexed via the ACE
-Reactor within a single thread of control. To obtain a version that
-multi-threads both Consumer_Proxys and Supplier_Proxys simply set
-the following flag in the Makefile:
-
-DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT
-
-To get a version that uses single-threading for all Supplier_Proxys,
-but a separate thread per-Consumer_Proxy set the following flag in
-the Makefile:
-
-DEFFLAGS += -DUSE_OUTPUT_MT
-
-If you examine the source code, you'll see that very few changes are
-required in the source code to switch between single-threading and
-multi-threading. The ACE Task class is primarily responsible for
-enabling the flexible modification of concurrency strategies with only
-minor changes required to the source code, design, and system
-architecture.
diff --git a/apps/Gateway/Gateway/Routing_Entry.cpp b/apps/Gateway/Gateway/Routing_Entry.cpp
deleted file mode 100644
index cc270cfac3a..00000000000
--- a/apps/Gateway/Gateway/Routing_Entry.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-// Defines an entry in the Routing Table.
-// $Id$
-
-#include "Routing_Entry.h"
-
-Routing_Entry::Routing_Entry (int validity_interval)
- : validity_interval_ (validity_interval)
-{
- ACE_NEW (this->destinations_, Routing_Entry::ENTRY_SET);
-}
-
-Routing_Entry::~Routing_Entry (void)
-{
- delete this->destinations_;
-}
-
-// Get the associated set of destinations.
-
-Routing_Entry::ENTRY_SET *
-Routing_Entry::destinations (void)
-{
- return this->destinations_;
-}
-
-// Set the associated set of destinations.
-
-void
-Routing_Entry::destinations (Routing_Entry::ENTRY_SET *s)
-{
- this->destinations_ = s;
-}
-
-// Get the current validity interval for this route.
-
-int
-Routing_Entry::validity_interval (void)
-{
- return this->validity_interval_;
-}
-
-// Set the current validity interval for this route.
-
-void
-Routing_Entry::validity_interval (int vi)
-{
- this->validity_interval_ = vi;
-}
diff --git a/apps/Gateway/Gateway/Routing_Entry.h b/apps/Gateway/Gateway/Routing_Entry.h
deleted file mode 100644
index ab8e0eee53d..00000000000
--- a/apps/Gateway/Gateway/Routing_Entry.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Routing_Entry.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_ROUTING_ENTRY)
-#define _ROUTING_ENTRY
-
-#include "ace/Set.h"
-
-// Forward reference.
-class Channel;
-
-class Routing_Entry
-{
- // = TITLE
- // Defines an entry in the Routing_Table.
-public:
- Routing_Entry (int validity_interval = 0);
- ~Routing_Entry (void);
-
- typedef ACE_Unbounded_Set<Channel *> ENTRY_SET;
- typedef ACE_Unbounded_Set_Iterator<Channel *> ENTRY_ITERATOR;
-
- // = Set/get the associated set of destinations.
- ENTRY_SET *destinations (void);
- void destinations (ENTRY_SET *);
-
- // = Set/get current validity interval for this routing entry.
- int validity_interval (void);
- void validity_interval (int);
-
-protected:
- ENTRY_SET *destinations_;
- // The set of destinations;
-
- int validity_interval_;
- // The current validity interval of this link.
-};
-
-#endif /* _ROUTING_ENTRY */
diff --git a/apps/Gateway/Gateway/Routing_Table.cpp b/apps/Gateway/Gateway/Routing_Table.cpp
deleted file mode 100644
index 3ef2f21bc1f..00000000000
--- a/apps/Gateway/Gateway/Routing_Table.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-#if !defined (_ROUTING_TABLE_C)
-#define _ROUTING_TABLE_C
-
-
-#include "Routing_Table.h"
-
-/* Bind the EXT_ID to the INT_ID. */
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Table<EXT_ID, INT_ID, LOCK>::bind (EXT_ID ext_id, INT_ID *int_id)
-{
- return this->map_.bind (ext_id, int_id);
-}
-
-/* Find the INT_ID corresponding to the EXT_ID. */
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Table<EXT_ID, INT_ID, LOCK>::find (EXT_ID ext_id, INT_ID *&int_id)
-{
- return this->map_.find (ext_id, int_id);
-}
-
-/* Unbind (remove) the EXT_ID from the map. */
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Table<EXT_ID, INT_ID, LOCK>::unbind (EXT_ID ext_id)
-{
- return this->map_.unbind (ext_id);
-}
-
-template <class EXT_ID, class INT_ID, class LOCK>
-Routing_Iterator<EXT_ID, INT_ID, LOCK>::Routing_Iterator (Routing_Table<EXT_ID,
- INT_ID, LOCK> &rt,
- int ignore_inactive)
- : map_iter_ (rt.map_),
- ignore_inactive_ (ignore_inactive)
-{
-}
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Iterator<EXT_ID, INT_ID, LOCK>::next (INT_ID *&ss)
-{
- // Loop in order to skip over inactive entries if necessary.
-
- for (ACE_Map_Entry<EXT_ID, INT_ID *> *temp = 0;
- this->map_iter_.next (temp) != 0;
- this->advance ())
- {
- // Skip over inactive entries if necessary.
- if (temp->int_id_->active () == 0 && this->ignore_inactive_)
- continue;
-
- // Otherwise, return the next item.
- ss = temp->int_id_;
- return 1;
- }
- return 0;
-}
-
-template <class EXT_ID, class INT_ID, class LOCK> int
-Routing_Iterator<EXT_ID, INT_ID, LOCK>::advance (void)
-{
- return this->map_iter_.advance ();
-}
-#endif /* _ROUTING_TABLE_C */
diff --git a/apps/Gateway/Gateway/Routing_Table.h b/apps/Gateway/Gateway/Routing_Table.h
deleted file mode 100644
index 84194f13e49..00000000000
--- a/apps/Gateway/Gateway/Routing_Table.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Routing_Table.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_ROUTING_TABLE_H)
-#define _ROUTING_TABLE_H
-
-#include "ace/Map_Manager.h"
-
-template <class EXT_ID, class INT_ID, class LOCK>
-class Routing_Table
-{
- // = TITLE
- // Define a generic routing table based on the ACE Map_Manager.
- //
- // = DESCRIPTION
- // We need to have this table, rather than just using the Map_Manager
- // directly in order to ignore "inactive" routing entries...
-public:
- int bind (EXT_ID ext_id, INT_ID *int_id);
- // Associate EXT_ID with the INT_ID.
-
- int find (EXT_ID ext_id, INT_ID *&int_id);
- // Break any association of EXID.
-
- int unbind (EXT_ID ext_id);
- // Locate EXID and pass out parameter via INID. If found,
- // return 0, else -1.
-
-public:
- ACE_Map_Manager<EXT_ID, INT_ID *, LOCK> map_;
- // Map external IDs to internal IDs.
-};
-
-template <class EXT_ID, class INT_ID, class LOCK>
-class Routing_Iterator
-{
- // = TITLE
- // Define an iterator for the Routing Table.
-public:
- Routing_Iterator (Routing_Table<EXT_ID, INT_ID, LOCK> &mm,
- int ignore_inactive = 1);
- int next (INT_ID *&);
- int advance (void);
-
-private:
- ACE_Map_Iterator<EXT_ID, INT_ID *, LOCK> map_iter_;
- int ignore_inactive_;
-};
-
-#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
-#include "Routing_Table.cpp"
-#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-#endif /* _ROUTING_TABLE_H */
diff --git a/apps/Gateway/Gateway/Thr_Channel.cpp b/apps/Gateway/Gateway/Thr_Channel.cpp
deleted file mode 100644
index 26e385e2727..00000000000
--- a/apps/Gateway/Gateway/Thr_Channel.cpp
+++ /dev/null
@@ -1,204 +0,0 @@
-#include "Thr_Channel.h"
-// $Id$
-
-#include "Channel_Connector.h"
-
-#if defined (ACE_HAS_THREADS)
-Thr_Output_Channel::Thr_Output_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Output_Channel (rt, cc, thr_mgr, socket_queue_size)
-{
-}
-
-// This method should be called only when the peer shuts down
-// unexpectedly. This method marks the Channel as having failed and
-// deactivates the ACE_Message_Queue (to wake up the thread blocked on
-// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
-// eventually try to reconnect...
-
-int
-Thr_Output_Channel::handle_input (ACE_HANDLE h)
-{
- this->Output_Channel::handle_input (h);
- ACE_Service_Config::reactor ()->remove_handler (h,
- ACE_Event_Handler::RWE_MASK
- | ACE_Event_Handler::DONT_CALL);
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
- return 0;
-}
-
-// Initialize the threaded Output_Channel object and spawn a new
-// thread.
-
-int
-Thr_Output_Channel::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Register ourselves to receive input events (which indicate that
- // the Peer has shut down unexpectedly).
- if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// ACE_Queue up a message for transmission (must not block since all
-// Input_Channels are single-threaded).
-
-int
-Thr_Output_Channel::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- // Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Transmit messages to the peer (note simplification resulting from
-// threads...)
-
-int
-Thr_Output_Channel::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Output_Channel's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread it is OK to block on
- // output.
-
- for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1; )
- if (this->send_peer (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Output_Channel %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
-
- return 0;
-}
-
-Thr_Input_Channel::Thr_Input_Channel (ROUTING_TABLE *rt,
- Channel_Connector *cc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Input_Channel (rt, cc, thr_mgr, socket_queue_size)
-{
-}
-
-int
-Thr_Input_Channel::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Receive messages from a Peer in a separate thread (note reuse of
-// existing code!).
-
-int
-Thr_Input_Channel::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Input_Channel's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread and processes
- // messages for one connection it is OK to block on input and
- // output.
-
- while (this->handle_input () != -1)
- continue;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Input_Channel %d on handle %d\n",
- this->id (),
- this->get_handle ()));
-
- this->peer ().close ();
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n", tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Thr_Channel.h b/apps/Gateway/Gateway/Thr_Channel.h
deleted file mode 100644
index a1dc91b1619..00000000000
--- a/apps/Gateway/Gateway/Thr_Channel.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Thr_Channel.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_THR_CHANNEL)
-#define _THR_CHANNEL
-
-#include "Channel.h"
-
-#if defined (ACE_HAS_THREADS)
-class Thr_Output_Channel : public Output_Channel
- // = TITLE
- // Runs each Output Channel in a separate thread.
-{
-public:
- Thr_Output_Channel (ROUTING_TABLE *,
- Channel_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the threaded Output_Channel object and spawn a new
- // thread.
-
- virtual int handle_input (ACE_HANDLE);
- // Called when Peer shutdown unexpectedly.
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- // Send a message to a peer.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-
-class Thr_Input_Channel : public Input_Channel
- // = TITLE
- // Runs each Input Channel in a separate thread.
-{
-public:
- Thr_Input_Channel (ROUTING_TABLE *,
- Channel_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the object and spawn a new thread.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-#endif /* ACE_HAS_THREADS */
-#endif /* _THR_CHANNEL */
diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.cpp b/apps/Gateway/Gateway/Thr_IO_Handler.cpp
deleted file mode 100644
index 109cfad9c3f..00000000000
--- a/apps/Gateway/Gateway/Thr_IO_Handler.cpp
+++ /dev/null
@@ -1,204 +0,0 @@
-#include "Thr_IO_Handler.h"
-// $Id$
-
-#include "IO_Handler_Connector.h"
-
-#if defined (ACE_HAS_THREADS)
-Thr_Consumer_Handler::Thr_Consumer_Handler (Consumer_Map *consumer_map,
- IO_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Consumer_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
-{
-}
-
-// This method should be called only when the peer shuts down
-// unexpectedly. This method marks the IO_Handler as having failed and
-// deactivates the ACE_Message_Queue (to wake up the thread blocked on
-// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
-// eventually try to reconnect...
-
-int
-Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
-{
- this->Consumer_Handler::handle_input (h);
- ACE_Service_Config::reactor ()->remove_handler (h,
- ACE_Event_Handler::RWE_MASK
- | ACE_Event_Handler::DONT_CALL);
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
- return 0;
-}
-
-// Initialize the threaded Consumer_Handler object and spawn a new
-// thread.
-
-int
-Thr_Consumer_Handler::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- // Register ourselves to receive input events (which indicate that
- // the Peer has shut down unexpectedly).
- if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// ACE_Queue up a message for transmission (must not block since all
-// Supplier_Handlers are single-threaded).
-
-int
-Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- // Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Transmit messages to the peer (note simplification resulting from
-// threads...)
-
-int
-Thr_Consumer_Handler::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Handler's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread it is OK to block on
- // output.
-
- for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1; )
- if (this->send (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Handler %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
-
- return 0;
-}
-
-Thr_Supplier_Handler::Thr_Supplier_Handler (Consumer_Map *consumer_map,
- IO_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Supplier_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
-{
-}
-
-int
-Thr_Supplier_Handler::open (void *)
-{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
- // Turn off non-blocking I/O.
- if (this->peer ().disable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
-
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
- // Reactivate message queue. If it was active then this is the
- // first time in and we need to spawn a thread, otherwise the queue
- // was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
- // Become an active object by spawning a new thread to transmit
- // messages to peers.
- return this->activate (THR_NEW_LWP | THR_DETACHED);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
- return 0;
- }
-}
-
-// Receive messages from a Peer in a separate thread (note reuse of
-// existing code!).
-
-int
-Thr_Supplier_Handler::svc (void)
-{
- for (;;)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Handler's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Since this method runs in its own thread and processes
- // messages for one connection it is OK to block on input and
- // output.
-
- while (this->handle_input () != -1)
- continue;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Supplier_Handler %d on handle %d\n",
- this->id (),
- this->get_handle ()));
-
- this->peer ().close ();
-
- // Deactivate the queue while we try to get reconnected.
- this->msg_queue ()->deactivate ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n", tv.sec ()));
- ACE_OS::sleep (tv);
- }
- }
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.h b/apps/Gateway/Gateway/Thr_IO_Handler.h
deleted file mode 100644
index ee056b35361..00000000000
--- a/apps/Gateway/Gateway/Thr_IO_Handler.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// apps
-//
-// = FILENAME
-// Thr_IO_Handler.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_THR_IO_HANDLER)
-#define _THR_IO_HANDLER
-
-#include "IO_Handler.h"
-
-#if defined (ACE_HAS_THREADS)
-class Thr_Consumer_Handler : public Consumer_Handler
- // = TITLE
- // Runs each Output IO_Handler in a separate thread.
-{
-public:
- Thr_Consumer_Handler (Consumer_Map *,
- IO_Handler_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the threaded Consumer_Handler object and spawn a new
- // thread.
-
- virtual int handle_input (ACE_HANDLE);
- // Called when Peer shutdown unexpectedly.
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- // Send a message to a peer.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-
-class Thr_Supplier_Handler : public Supplier_Handler
- // = TITLE
- // Runs each Input IO_Handler in a separate thread.
-{
-public:
- Thr_Supplier_Handler (Consumer_Map *,
- IO_Handler_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
-
- virtual int open (void *);
- // Initialize the object and spawn a new thread.
-
- virtual int svc (void);
- // Transmit peer messages.
-};
-#endif /* ACE_HAS_THREADS */
-#endif /* _THR_IO_HANDLER */
diff --git a/apps/Gateway/Gateway/cc_config b/apps/Gateway/Gateway/cc_config
deleted file mode 100644
index 96f9ebdedd7..00000000000
--- a/apps/Gateway/Gateway/cc_config
+++ /dev/null
@@ -1,10 +0,0 @@
-# Conn ID Hostname Remote Port Direction Max Retry Delay Local Port
-# ------- -------- ---- --------- --------------- ----------
- 1 tango.cs 10004 I 32 20000
-# 2 tango.cs 10004 O 32
- 3 merengue.cs 10004 O 32 20001
-# 4 mambo.cs 10004 O 32 20000
-# 5 lambada.cs 10004 O 32 20000
-# 6 tango.cs 10004 O 32 20000
-# 7 tango.cs 5001 I 32
-# 8 tango.cs 5002 O 32
diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config
deleted file mode 100644
index 58884340e61..00000000000
--- a/apps/Gateway/Gateway/consumer_config
+++ /dev/null
@@ -1,8 +0,0 @@
-# Consumer configuration file
-# Conn ID Supplier ID Type Consumers
-# ------- ----------- ------- ------------
-# 1 1 0 3,4,5
- 1 1 0 3
- 3 1 0 3
-# 4 1 0 4
-# 5 1 0 5
diff --git a/apps/Gateway/Gateway/gatewayd.cpp b/apps/Gateway/Gateway/gatewayd.cpp
deleted file mode 100644
index b0af5f7cace..00000000000
--- a/apps/Gateway/Gateway/gatewayd.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-// Main driver program for the Gateway. This file is completely
-// $Id$
-
-// generic code due to the ACE Service Configurator framework!
-
-#include "ace/Service_Config.h"
-#include "Gateway.h"
-
-int
-main (int argc, char *argv[])
-{
- ACE_Service_Config daemon;
-
- if (daemon.open (argc, argv) == -1)
- {
- if (errno != ENOENT)
- ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
- else // Use static binding.
- {
- ACE_Service_Object *so = ACE_SVC_INVOKE (Gateway);
-
- if (so->init (argc - 1, argv + 1) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1));
- }
- }
-
- // Run forever, performing the configured services until we are shut
- // down by a SIGINT/SIGQUIT signal.
-
- daemon.run_reactor_event_loop ();
-
- return 0;
-}
diff --git a/apps/Gateway/Gateway/rt_config b/apps/Gateway/Gateway/rt_config
deleted file mode 100644
index e951a0f09be..00000000000
--- a/apps/Gateway/Gateway/rt_config
+++ /dev/null
@@ -1,7 +0,0 @@
-# Conn ID Logical ID Payload Destinations
-# ------- ---------- ------- ------------
-# 1 1 0 3,4,5
- 1 1 0 3
- 3 1 0 3
-# 4 1 0 4
-# 5 1 0 5
diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf
deleted file mode 100644
index db7a9d04ad5..00000000000
--- a/apps/Gateway/Gateway/svc.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-#static Svc_Manager "-d -p 2913"
-dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-d -c connection_config -f consumer_config"
-
diff --git a/apps/Gateway/Makefile b/apps/Gateway/Makefile
deleted file mode 100644
index b7abb07bf51..00000000000
--- a/apps/Gateway/Makefile
+++ /dev/null
@@ -1,26 +0,0 @@
-#----------------------------------------------------------------------------
-# @(#)Makefile 1.1 10/18/96
-#
-# Makefile for the Gateway application
-#----------------------------------------------------------------------------
-
-#----------------------------------------------------------------------------
-# Local macros
-#----------------------------------------------------------------------------
-
-INFO = README
-
-DIRS = Gateway \
- Peer
-
-
-#----------------------------------------------------------------------------
-# Include macros and targets
-#----------------------------------------------------------------------------
-
-include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.nested.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.nolocal.GNU
-
diff --git a/apps/Gateway/Peer/Gateway_Handler.cpp b/apps/Gateway/Peer/Gateway_Handler.cpp
deleted file mode 100644
index cfc9a7dad6f..00000000000
--- a/apps/Gateway/Peer/Gateway_Handler.cpp
+++ /dev/null
@@ -1,652 +0,0 @@
-#include "ace/Get_Opt.h"
-// $Id$
-
-
-#include "Gateway_Handler.h"
-
-Gateway_Handler::Gateway_Handler (ACE_Thread_Manager *)
- : routing_id_ (0),
- msg_frag_ (0),
- total_bytes_ (0)
-{
- this->msg_queue ()->high_water_mark (Gateway_Handler::QUEUE_SIZE);
-}
-
-int
-Gateway_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum));
-
- // Shut down the main event loop.
- ACE_Service_Config::end_reactor_event_loop ();
- return 0;
-}
-
-// Cache a binding to the HANDLER_MAP.
-
-void
-Gateway_Handler::map (HANDLER_MAP *m)
-{
- this->map_ = m;
-}
-
-// Upcall from the ACE_Acceptor::handle_input() that turns control
-// over to our application-specific Gateway handler.
-
-int
-Gateway_Handler::open (void *a)
-{
- ACE_DEBUG ((LM_DEBUG, "Gateway handler's fd = %d\n",
- this->peer ().get_handle ()));
-
- // Call down to the base class to activate and register this
- // handler.
- if (this->inherited::open (a) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
-
- if (this->peer ().enable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enable"), -1);
-
- Gateway_Handler *this_ = this;
-
- // Add ourselves to the map so we can be removed later on.
- if (this->map_->bind (this->get_handle (), this_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind"), -1);
-
- char *to = ACE_OS::getenv ("TIMEOUT");
- int timeout = to == 0 ? 100000 : ACE_OS::atoi (to);
-
- // Schedule the time between disconnects. This should really be a
- // "tunable" parameter.
- if (ACE_Service_Config::reactor ()->schedule_timer (this, 0, timeout) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "schedule_timer"));
-
- // If there are messages left in the queue, make sure we
- // enable the ACE_Reactor appropriately to get them sent out.
- if (this->msg_queue ()->is_empty () == 0
- && ACE_Service_Config::reactor ()->schedule_wakeup (this,
- ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_wakeup"), -1);
-
- // First action is to wait to be notified of our routing id.
- this->do_action_ = &Gateway_Handler::await_route_id;
- return 0;
-}
-
-// Read messages from stdin and send them to the gatewayd.
-
-int
-Gateway_Handler::xmit_stdin (void)
-{
- if (this->routing_id_ != -1)
- {
- ssize_t n;
- ACE_Message_Block *mb;
-
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- Event *peer_msg = (Event *) mb->rd_ptr ();
- peer_msg->header_.routing_id_ = this->routing_id_;
-
- n = ACE_OS::read (ACE_STDIN, peer_msg->buf_, sizeof peer_msg->buf_);
-
- switch (n)
- {
- case 0:
- ACE_DEBUG ((LM_DEBUG, "stdin closing down\n"));
-
- // Take stdin out of the ACE_Reactor so we stop trying to
- // send messages.
- if (ACE_Service_Config::reactor ()->remove_handler
- (0, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "remove_handler"));
- delete mb;
- break;
- case -1:
- delete mb;
- ACE_ERROR ((LM_ERROR, "%p\n", "read"));
- break;
- default:
- peer_msg->header_.len_ = htonl (n);
- mb->wr_ptr (sizeof (Peer_Header) + n);
-
- if (this->put (mb) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "%p\n",
- "gateway is flow controlled, so we're dropping messages"));
- else
- ACE_ERROR ((LM_ERROR, "%p\n", "transmission failure in xmit_stdin"));
-
- // Caller is responsible for freeing a ACE_Message_Block
- // if failures occur.
- delete mb;
- }
- }
- }
- return 0;
-}
-
-// Perform a non-blocking put() of message MB. If we are unable to
-// send the entire message the remainder is re-Taskd at the *front* of
-// the Message_List.
-
-int
-Gateway_Handler::nonblk_put (ACE_Message_Block *mb)
-{
- // Try to send the message. If we don't send it all (e.g., due to
- // flow control), then re-ACE_Task the remainder at the head of the
- // Message_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
-
- ssize_t n;
-
- if ((n = this->send_peer (mb)) == -1)
- return -1;
- else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
- {
- ACE_DEBUG ((LM_DEBUG,
- "queueing activated on handle %d to routing id %d\n",
- this->get_handle (), this->routing_id_));
-
- // ACE_Queue in *front* of the list to preserve order.
- if (this->msg_queue ()->enqueue_head
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enqueue_head"), -1);
-
- // Tell ACE_Reactor to call us back when we can send again.
- if (ACE_Service_Config::reactor ()->schedule_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_wakeup"), -1);
- return 0;
- }
- else
- return n;
-}
-
-// Finish sending a message when flow control conditions abate. This
-// method is automatically called by the ACE_Reactor.
-
-int
-Gateway_Handler::handle_output (ACE_HANDLE)
-{
- ACE_Message_Block *mb = 0;
-
- ACE_DEBUG ((LM_DEBUG, "in handle_output\n"));
- // The list had better not be empty, otherwise there's a bug!
-
- if (this->msg_queue ()->dequeue_head
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
- {
- switch (this->nonblk_put (mb))
- {
- case 0: // Partial send.
- ACE_ASSERT (errno == EWOULDBLOCK);
- // Didn't write everything this time, come back later...
- break;
-
- case -1:
- // Caller is responsible for freeing a ACE_Message_Block if
- // failures occur.
- delete mb;
- ACE_ERROR ((LM_ERROR, "%p\n",
- "transmission failure in handle_output"));
-
- /* FALLTHROUGH */
- default: // Sent the whole thing.
-
- // If we succeed in writing the entire message (or we did
- // not fail due to EWOULDBLOCK) then check if there are more
- // messages on the Message_List. If there aren't, tell the
- // ACE_Reactor not to notify us anymore (at least until
- // there are new messages queued up).
-
- if (this->msg_queue ()->is_empty ())
- {
- ACE_DEBUG ((LM_DEBUG,
- "queue now empty on handle %d to routing id %d\n",
- this->get_handle (),
- this->routing_id_));
-
- if (ACE_Service_Config::reactor ()->cancel_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "cancel_wakeup"));
- }
- }
- }
- else
- ACE_ERROR ((LM_ERROR, "%p\n", "dequeue_head"));
- return 0;
-}
-
-// Send a message to a peer (may ACE_Task if necessary).
-
-int
-Gateway_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- if (this->msg_queue ()->is_empty ())
- // Try to send the message *without* blocking!
- return this->nonblk_put (mb);
- else
- // If we have queued up messages due to flow control then just
- // enqueue and return.
- return this->msg_queue ()->enqueue_tail
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Send an Peer message to gatewayd.
-
-int
-Gateway_Handler::send_peer (ACE_Message_Block *mb)
-{
- ssize_t n;
- size_t len = mb->length ();
-
- if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0)
- return errno == EWOULDBLOCK ? 0 : n;
- else if (n < (ssize_t) len)
- {
- // Re-adjust pointer to skip over the part we did send.
- mb->rd_ptr (n);
- this->total_bytes_ += n;
- }
- else /* if (n == length) */
- {
- // The whole message is sent, we can now safely deallocate the
- // buffer. Note that this should decrement a reference count...
- this->total_bytes_ += n;
- delete mb;
- errno = 0;
- }
- ACE_DEBUG ((LM_DEBUG, "sent %d bytes, total bytes sent = %d\n",
- n, this->total_bytes_));
- return n;
-}
-
-// Receive an Peer message from gatewayd. Handles fragmentation.
-
-int
-Gateway_Handler::recv_peer (ACE_Message_Block *&mb)
-{
- Event *peer_msg;
- size_t len;
- ssize_t n;
- size_t offset = 0;
-
- if (this->msg_frag_ == 0)
- {
- ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- // No existing fragment...
- if (this->msg_frag_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "out of memory\n"), -1);
-
- peer_msg = (Event *) this->msg_frag_->rd_ptr ();
-
- switch (n = this->peer ().recv (peer_msg, sizeof (Peer_Header)))
- {
- case sizeof (Peer_Header):
- len = ntohl (peer_msg->header_.len_);
- if (len <= sizeof peer_msg->buf_)
- {
- this->msg_frag_->wr_ptr (sizeof (Peer_Header));
- break; // The message is within the maximum size range.
- }
- else
- ACE_ERROR ((LM_ERROR, "message too long = %d\n", len));
- /* FALLTHROUGH */
- default:
- ACE_ERROR ((LM_ERROR, "invalid length = %d\n", n));
- n = -1;
- /* FALLTHROUGH */
- case -1:
- /* FALLTHROUGH */
- case 0:
- // Make sure to free up memory on error returns.
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return n;
- }
- }
- else
- {
- offset = this->msg_frag_->length () - sizeof (Peer_Header);
- len = peer_msg->header_.len_ - offset;
- }
-
- switch (n = this->peer ().recv (peer_msg->buf_ + offset, len))
- {
- case -1:
- if (errno == EWOULDBLOCK)
- {
- // This shouldn't happen since the ACE_Reactor
- // just triggered us to handle pending I/O!
- ACE_DEBUG ((LM_DEBUG, "unexpected recv failure\n"));
- // Since ACE_DEBUG might change errno, we need to reset it
- // here.
- errno = EWOULDBLOCK;
- return -1;
- }
- else
- /* FALLTHROUGH */;
-
- case 0: // EOF.
- delete this->msg_frag_;
- this->msg_frag_ = 0;
- return n;
-
- default:
- if (n != (ssize_t) len)
- // Re-adjust pointer to skip over the part we've read.
- {
- this->msg_frag_->wr_ptr (n);
- errno = EWOULDBLOCK;
- // Inform caller that we didn't get the whole message.
- return -1;
- }
- else
- {
- // Set the write pointer at 1 past the end of the message.
- this->msg_frag_->wr_ptr (n);
-
- // Set the read pointer to the beginning of the message.
- this->msg_frag_->rd_ptr (this->msg_frag_->base ());
-
- mb = this->msg_frag_;
-
- // Reset the pointer to indicate we've got an entire
- // message.
- this->msg_frag_ = 0;
- }
- return n;
- }
-}
-
-// Receive various types of input (e.g., Peer message from the
-// gatewayd, as well as stdio).
-
-int
-Gateway_Handler::handle_input (ACE_HANDLE sd)
-{
- ACE_DEBUG ((LM_DEBUG, "in handle_input, sd = %d\n", sd));
- if (sd == ACE_STDIN) // Handle message from stdin.
- return this->xmit_stdin ();
- else
- // Perform the appropriate action depending on the state we are
- // in.
- return (this->*do_action_) ();
-}
-
-// Action that receives the route id.
-
-int
-Gateway_Handler::await_route_id (void)
-{
- ssize_t n = this->peer ().recv (&this->routing_id_,
- sizeof this->routing_id_);
-
- if (n != sizeof this->routing_id_)
- {
- if (n == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "gatewayd has closed down unexpectedly\n"), -1);
- else
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p, bytes received on handle %d = %d\n",
- "recv", this->get_handle (), n), -1);
- }
- else
- ACE_DEBUG ((LM_DEBUG, "assigned routing id %d\n",
- this->routing_id_));
-
- // Transition to the action that waits for Peer messages.
- this->do_action_ = &Gateway_Handler::await_messages;
-
- // Reset standard input.
- ACE_OS::rewind (stdin);
-
- // Register this handler to receive test messages on stdin.
- if (ACE_Service_Config::reactor ()->register_handler
- (ACE_STDIN, this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
- return 0;
-}
-
-// Action that receives messages.
-
-int
-Gateway_Handler::await_messages (void)
-{
- ACE_Message_Block *mb = 0;
- ssize_t n = this->recv_peer (mb);
-
- switch (n)
- {
- case 0:
- ACE_ERROR_RETURN ((LM_ERROR, "gatewayd has closed down\n"), -1);
- /* NOTREACHED */
- case -1:
- if (errno == EWOULDBLOCK)
- // A short-read, we'll come back and finish it up later on!
- return 0;
- else
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "recv_peer"), -1);
- /* NOTREACHED */
- default:
- {
- // We got a valid message, so let's process it now! At the
- // moment, we just print out the message contents...
-
- Event *peer_msg = (Event *) mb->rd_ptr ();
- this->total_bytes_ += mb->length ();
-
-#if defined (VERBOSE)
- ACE_DEBUG ((LM_DEBUG,
- "route id = %d, len = %d, payload = %*s",
- peer_msg->header_.routing_id_, peer_msg->header_.len_,
- peer_msg->header_.len_, peer_msg->buf_));
-#else
- ACE_DEBUG ((LM_DEBUG,
- "route id = %d, cur len = %d, total len = %d\n",
- peer_msg->header_.routing_id_,
- peer_msg->header_.len_,
- this->total_bytes_));
-#endif
- delete mb;
- return 0;
- }
- }
-}
-
-// Periodically send messages via ACE_Reactor timer mechanism.
-
-int
-Gateway_Handler::handle_timeout (const ACE_Time_Value &, const void *)
-{
- // Skip over deactivated descriptors.
- if (this->get_handle () != -1)
- {
- // Unbind ourselves from the map.
- if (this->map_->unbind (this->get_handle ()) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "unbind"));
-
- // Shut down the handler.
- this->handle_close ();
- }
- return 0;
-}
-
-// Handle shutdown of the Gateway_Handler object.
-
-int
-Gateway_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
-{
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- ACE_DEBUG ((LM_DEBUG, "shutting down Gateway_Handler on handle %d\n",
- this->get_handle ()));
-
- // Explicitly remove ourselves for handle 0 (the ACE_Reactor
- // removes this->handle (), note that
- // ACE_Event_Handler::DONT_CALL instructs the ACE_Reactor *not*
- // to call this->handle_close(), which would otherwise lead to
- // recursion!).
- if (ACE_Service_Config::reactor ()->remove_handler
- (0, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "handle = %d: %p\n",
- 0, "remove_handler"));
-
- // Deregister this handler with the ACE_Reactor.
- if (ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::RWE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "handle = %d: %p\n",
- this->get_handle (), "remove_handler"), -1);
-
- // Close down the peer.
- this->peer ().close ();
- }
- return 0;
-}
-
-Gateway_Acceptor::Gateway_Acceptor (Gateway_Handler *handler)
- : gateway_handler_ (handler)
-{
- this->gateway_handler_->map (&this->map_);
-}
-
-// Note how this method just passes back the pre-allocated
-// Gateway_Handler instead of having the ACE_Acceptor allocate a new
-// one each time!
-
-Gateway_Handler *
-Gateway_Acceptor::make_svc_handler (void)
-{
- return this->gateway_handler_;
-}
-
-int
-Gateway_Acceptor::handle_signal (int signum, siginfo_t *, ucontext_t *)
-{
- ACE_DEBUG ((LM_DEBUG, "signal %S occurred\n", signum));
- return 0;
-}
-
-/* Returns information on the currently active service. */
-
-int
-Gateway_Acceptor::info (char **strp, size_t length) const
-{
- char buf[BUFSIZ];
- char addr_str[BUFSIZ];
-
- ACE_INET_Addr addr;
-
- if (this->acceptor ().get_local_addr (addr) == -1)
- return -1;
- else if (addr.addr_to_string (addr_str, sizeof addr) == -1)
- return -1;
-
- ACE_OS::sprintf (buf, "%s\t %s/%s %s",
- "Gateway peer daemon", addr_str, "tcp",
- "# IRIDIUM SRP traffic generator and data sink\n");
-
- if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
- return -1;
- else
- ACE_OS::strncpy (*strp, buf, length);
- return ACE_OS::strlen (buf);
-}
-
-// Hook called by the explicit dynamic linking facility to terminate
-// the peer.
-
-int
-Gateway_Acceptor::fini (void)
-{
- HANDLER_ITERATOR mi (this->map_);
-
- for (MAP_ENTRY *me = 0;
- mi.next (me) != 0;
- mi.advance ())
- {
- if (me->int_id_->get_handle () != -1)
- {
- ACE_DEBUG ((LM_DEBUG, "closing down handle %d\n",
- me->int_id_->get_handle ()));
- me->int_id_->handle_close ();
- }
- else
- ACE_DEBUG ((LM_DEBUG, "already closed %d\n"));
- me->int_id_->destroy (); // Will trigger a delete.
- }
-
- this->gateway_handler_->destroy (); // Will trigger a delete.
- return inherited::fini ();
-}
-
-// Hook called by the explicit dynamic linking facility to initialize
-// the peer.
-
-int
-Gateway_Acceptor::init (int argc, char *argv[])
-{
- ACE_Get_Opt get_opt (argc, argv, "dp:", 0);
- ACE_INET_Addr addr;
-
- for (int c; (c = get_opt ()) != -1; )
- {
- switch (c)
- {
- case 'p':
- addr.set (ACE_OS::atoi (get_opt.optarg));
- break;
- case 'd':
- break;
- default:
- break;
- }
- }
-
- if (ACE_Service_Config::reactor ()->register_handler (SIGPIPE, this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
-
- if (this->open (addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
- else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "registering service with ACE_Reactor\n"), -1);
-
- ACE_Sig_Set sig_set;
- sig_set.sig_add (SIGINT);
- sig_set.sig_add (SIGQUIT);
-
- // Register ourselves to receive SIGINT and SIGQUIT so we can shut
- // down gracefully via signals.
-
- if (ACE_Service_Config::reactor ()->register_handler (sig_set,
- this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
- return 0;
-}
-
-// Dynamically linked factory function that dynamically allocates a
-// new Gateway_Acceptor object.
-
-ACE_Service_Object *
-_alloc_peerd (void)
-{
- // This function illustrates how we can associate a ACE_Svc_Handler
- // with the ACE_Acceptor at initialization time.
- Gateway_Handler *handler;
-
- ACE_NEW_RETURN (handler, Gateway_Handler, 0);
- ACE_Service_Object *temp;
-
- ACE_NEW_RETURN (temp, Gateway_Acceptor (handler), 0);
- return temp;
-}
diff --git a/apps/Gateway/Peer/Gateway_Handler.h b/apps/Gateway/Peer/Gateway_Handler.h
deleted file mode 100644
index 6dc4539e6b7..00000000000
--- a/apps/Gateway/Peer/Gateway_Handler.h
+++ /dev/null
@@ -1,154 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-
-/* These Gateway handler classes process Peer messages sent from the
- communication gateway daemon (gatewayd) to its various peers, e.g.,
- CF and ETS, (represented collectively in this prototype as peerd).
- . These classes works as follows:
-
- 1. Gateway_Acceptor creates a listener endpoint and waits passively
- for gatewayd to connect with it.
-
- 2. When gatewayd connects, Gateway_Acceptor creates an
- Gateway_Handler object that sends/receives messages from
- gatewayd.
-
- 3. Gateway_Handler waits for gatewayd to inform it of its routing
- ID, which is prepended to all outgoing messages send from peerd.
-
- 4. Once the routing ID is set, peerd periodically sends messages to
- gatewayd. Peerd also receives and "processes" messages
- forwarded to it from gatewayd. In this program, peerd
- "processes" messages by writing them to stdout. */
-
-#if !defined (GATEWAY_HANDLER)
-#define GATEWAY_HANDLER
-
-#include "ace/Service_Config.h"
-#include "ace/Svc_Handler.h"
-#include "ace/Acceptor.h"
-#include "ace/SOCK_Stream.h"
-#include "ace/SOCK_Acceptor.h"
-#include "ace/INET_Addr.h"
-#include "ace/Map_Manager.h"
-#include "Peer_Message.h"
-
-// Forward declaration.
-class Gateway_Handler;
-
-// Maps a ACE_HANDLE onto a Gateway_Handler *.
-typedef ACE_Map_Manager <ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_MAP;
-typedef ACE_Map_Iterator<ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_ITERATOR;
-typedef ACE_Map_Entry <ACE_HANDLE, Gateway_Handler *> MAP_ENTRY;
-
-// Handle Peer messages arriving as events.
-
-class Gateway_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
-{
-public:
- Gateway_Handler (ACE_Thread_Manager * = 0);
-
- virtual int open (void * = 0);
- // Initialize the handler (called by ACE_Acceptor::handle_input())
-
- virtual int handle_input (ACE_HANDLE);
- // Receive and process peer messages.
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
- // Send a message to a gateway (may be queued if necessary).
-
- virtual int handle_output (ACE_HANDLE);
- // Finish sending a message when flow control conditions abate.
-
- virtual int handle_timeout (const ACE_Time_Value &,
- const void *arg);
- // Periodically send messages via ACE_Reactor timer mechanism.
-
- virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
- // Perform object termination.
-
- void map (HANDLER_MAP *);
- // Cache a binding to the HANDLER_MAP.
-
-protected:
- typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited;
-
- // We'll allow up to 16 megabytes to be queued per-output
- // channel!!!! This is clearly a policy in search of refinement...
- enum { QUEUE_SIZE = 1024 * 1024 * 16 };
-
- int handle_signal (int signum, siginfo_t *, ucontext_t *);
-
- Peer_Header::ROUTING_ID routing_id_;
- // Routing ID of the peer (obtained from gatewayd).
-
- virtual int nonblk_put (ACE_Message_Block *mb);
- // Perform a non-blocking put().
-
- virtual int recv_peer (ACE_Message_Block *&);
- // Receive an Peer message from gatewayd.
-
- virtual int send_peer (ACE_Message_Block *);
- // Send an Peer message to gatewayd.
-
- int xmit_stdin (void);
- // Receive a message from stdin and send it to the gateway.
-
- int (Gateway_Handler::*do_action_) (void);
- // Pointer-to-member-function for the current action to run in this state.
-
- int await_route_id (void);
- // Action that receives the route id.
-
- int await_messages (void);
- // Action that receives messages.
-
- ACE_Message_Block *msg_frag_;
- // Keep track of message fragment to handle non-blocking recv's from gateway.
-
- size_t total_bytes_;
- // The total number of bytes sent/received to the gateway.
-
- HANDLER_MAP *map_;
- // Maps the ACE_HANDLE onto the Gateway_Handler *.
-};
-
-// A factory class that accept connections from gatewayd and
-// dynamically creates a new Gateway_Handler object to do the dirty work.
-
-class Gateway_Acceptor : public ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR>
-{
-public:
- // = Initialization methods, called when dynamically linked.
- Gateway_Acceptor (Gateway_Handler *handler);
- virtual int init (int argc, char *argv[]);
- // Initialize the acceptor.
-
- virtual int info (char **, size_t) const;
- // Return info about this service.
-
- virtual int fini (void);
- // Perform termination.
-
- virtual Gateway_Handler *make_svc_handler (void);
- // Factory method that creates the Gateway_Handler once.
-
- virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
- // Handle various signals (e.g., SIGPIPE)
-
- HANDLER_MAP map_;
- // Maps the ACE_HANDLE onto the Gateway_Handler *.
-
- Gateway_Handler *gateway_handler_;
- // Pointer to memory allocated exactly once.
-
- typedef ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR> inherited;
-};
-
-// Factory function that allocates a new Peer daemon.
-extern "C" ACE_Service_Object *_alloc_peerd (void);
-
-#endif /* GATEWAY_HANDLER */
-
diff --git a/apps/Gateway/Peer/Makefile b/apps/Gateway/Peer/Makefile
deleted file mode 100644
index 3176a62b455..00000000000
--- a/apps/Gateway/Peer/Makefile
+++ /dev/null
@@ -1,116 +0,0 @@
-#----------------------------------------------------------------------------
-# @(#)Makefile 1.1 10/18/96
-#
-# Makefile for the Peer portion of the Gateway application
-#----------------------------------------------------------------------------
-
-#----------------------------------------------------------------------------
-# Local macros
-#----------------------------------------------------------------------------
-
-BIN = peerd
-
-FILES = Peer
-
-LSRC = $(addsuffix .cpp,$(FILES))
-LOBJ = $(addsuffix .o,$(FILES))
-SHOBJ = $(addsuffix .so,$(FILES))
-
-LDLIBS = $(addprefix .shobj/,$(SHOBJ))
-
-VLDLIBS = $(LDLIBS:%=%$(VAR))
-
-BUILD = $(VBIN)
-
-#----------------------------------------------------------------------------
-# Include macros and targets
-#----------------------------------------------------------------------------
-
-include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU
-include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
-
-#----------------------------------------------------------------------------
-# Local targets
-#----------------------------------------------------------------------------
-
-#----------------------------------------------------------------------------
-# Dependencies
-#----------------------------------------------------------------------------
-
-# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
-# DO NOT DELETE THIS LINE -- g++dep uses it.
-# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
-
-.obj/Peer.o .shobj/Peer.so: Peer.cpp \
- $(WRAPPER_ROOT)/ace/Get_Opt.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- Peer.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Acceptor.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Acceptor.i \
- $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- Event.h
-
-# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/apps/Gateway/Peer/Peer_Message.h b/apps/Gateway/Peer/Peer_Message.h
deleted file mode 100644
index 67f57f148cb..00000000000
--- a/apps/Gateway/Peer/Peer_Message.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// Define the Peer message schema (this may change).
-
-#if !defined (PEER_MESSAGE)
-#define PEER_MESSAGE
-
-// Fixed sized header.
-
-class Peer_Header
-{
-public:
-// Type used to route messages from gatewayd.
- typedef short ROUTING_ID;
-
- enum
- {
- INVALID_ID = -1 // No peer may use this number.
- };
-
- // Source ID.
- ROUTING_ID routing_id_;
-
- // Length of the message in bytes.
- size_t len_;
-};
-
-// Variable-sized message (buf_ may be variable-sized between
-// 0 and MAX_PAYLOAD_SIZE).
-
-class Peer_Message
-{
-public:
- // The maximum size of an Peer message (see Peer protocol specs for exact #).
- enum { MAX_PAYLOAD_SIZE = 1024 };
-
- Peer_Header header_;
-
- // Message payload
- char buf_[MAX_PAYLOAD_SIZE];
-};
-
-#endif /* PEER_MESSAGE */
diff --git a/apps/Gateway/Peer/peerd.cpp b/apps/Gateway/Peer/peerd.cpp
deleted file mode 100644
index ab59567fc08..00000000000
--- a/apps/Gateway/Peer/peerd.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-// $Id$
-
-// Driver for the peer daemon (peerd). Note that this is completely
-// generic code due to the Service Configurator framework!
-
-#include "ace/Service_Config.h"
-#include "Peer.h"
-
-int
-main (int argc, char *argv[])
-{
- ACE_Service_Config daemon;
-
- if (daemon.open (argc, argv) == -1)
- {
- if (errno != ENOENT)
- ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
- else // Use static binding.
- {
- ACE_Service_Object *so = ACE_SVC_INVOKE (Peer_Acceptor);
-
- if (so->init (argc - 1, argv + 1) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1));
- }
- }
-
- // Create an adapter to end the event loop.
- ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop);
-
- ACE_Sig_Set sig_set;
- sig_set.sig_add (SIGINT);
- sig_set.sig_add (SIGQUIT);
-
- // Register ourselves to receive SIGINT and SIGQUIT so we can shut
- // down gracefully via signals.
- ACE_Service_Config::reactor ()->register_handler (sig_set, &sa);
-
- // Run forever, performing the configured services until we are shut
- // down by a SIGINT/SIGQUIT signal.
-
- daemon.run_reactor_event_loop ();
-
- return 0;
-}
diff --git a/apps/Gateway/Peer/svc.conf b/apps/Gateway/Peer/svc.conf
deleted file mode 100644
index 6c9bc3bc3d7..00000000000
--- a/apps/Gateway/Peer/svc.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-#static Svc_Manager "-d -p 291"
-dynamic Peer1 Service_Object *.shobj/Peer:_make_Peer_Acceptor() active "-p 10004"
-#dynamic Peer2 Service_Object *.shobj/Peer:_make_Peer_Acceptor() active "-p 10003"
diff --git a/apps/Gateway/README b/apps/Gateway/README
deleted file mode 100644
index ffd7e52bdf4..00000000000
--- a/apps/Gateway/README
+++ /dev/null
@@ -1,92 +0,0 @@
-OVERVIEW
-
-This directory contains source code for a prototype application-level
-gateway implemented with ACE. This prototype was developed in my
-cs422 OS class at Washington University. It illustrates the use of
-Event Channels to forward events from Suppliers to Consumers in a
-distributed system.
-
-You can get a paper that explains the patterns used in this
-implementation at the following WWW URL:
-
-http://www.cs.wustl.edu/~schmidt/TAPOS-95.ps.gz
-
-DIRECTORY STRUCTURE
-
-There are 2 directories:
-
-Gateway
-
- -- The application Gateway, which must be started *after* all
- the Peers described below). This process reads the
- connection_config and consumer_config files:
-
- 1. The connection_config file is used to establish the "physical
- configuration." It tells the Gateway what connections
- to establish with particular hosts using particular
- ports.
-
- 2. The consumer_config file is used to establish the "logical
- configuration." It tells the Gateway how to forward
- data coming from "sources" to the appropriate
- "destinations."
-Peer
-
- -- The test driver programs that must be started *before* the
- Gateway. To do anything interesting you'll need at least
- two Peers: one to supply events and one to consume events.
- In the configuration files, these two types of Peers are
- designated as follows:
-
- 1. Supplier Peers (designated by an 'S' in the Gateway's
- connection_config configuration file). These Peers are
- "suppliers" of events to the Gateway.
-
- 2. Consumer Peers (designated by an 'C' in the Gateway's
- connection_config file). These Peers are "consumers" of
- events forwarded by the Gateway (forwarding is based on
- the settings in the consumer_config configuration file).
-
-RUNNING THE TESTS
-
-To run the tests do the following:
-
-1. Compile everything (i.e., first compile the ACE libraries, then
- compile the the Gateway directories).
-
-2. Edit the consumer_config and connection_config files as discussed
- above to indicate the desired physical and logical mappings.
-
-3. Start up the Peers (peerd). You can start up as many as you
- like, as per the connection_config file, but you'll need at least
- two (one to supply and one to consume). I typically start up each
- Peer in a different window on a different machine. The Peers
- should print out some diagnostic info and then block awaiting
- connections from the Gateway.
-
-4. Start up the Gateway (gatewayd). This will print out a bunch of
- events as it reads the config files and connects to all the Peers.
- Assuming everything works, then all the Peers will be connected.
- If some of the Peers aren't set up correctly then the Gateway will
- use an exponential backoff algorithm to attempt to reestablish
- those connections.
-
-5. Once the Gateway has connected with all the Peers you can send
- events from Supplier Peers by typing commands in the Peer window.
- This Supplier will be sent to the Gateway, which will forward the
- event to all Consumer Peers that have "subscribed" to receive these
- events.
-
- Note that if you type ^C in a Peer window the Peer will shutdown
- its handlers and exit. The Gateway will detect this and will start
- trying to reestablish the connection using the same exponential
- backoff algorithm it used for the initial connection establishment.
-
-7. When you want to terminate a Gateway, just type ^C and the process
- will shut down gracefully.
-
-Please let me know if there are any questions.
-
- Doug
-
- schmidt@cs.wustl.edu