summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/Channel.cpp')
-rw-r--r--apps/Gateway/Gateway/Channel.cpp713
1 files changed, 713 insertions, 0 deletions
diff --git a/apps/Gateway/Gateway/Channel.cpp b/apps/Gateway/Gateway/Channel.cpp
new file mode 100644
index 00000000000..f07b5a81978
--- /dev/null
+++ b/apps/Gateway/Gateway/Channel.cpp
@@ -0,0 +1,713 @@
+#include "ace/Log_Msg.h"
+// @(#)Channel.cpp 1.1 10/18/96
+
+#include "Routing_Entry.h"
+#include "Channel_Connector.h"
+
+// Convenient short-hands.
+#define CO CONDITION
+#define MU MUTEX
+
+// = The total number of bytes sent/received on this channel.
+size_t
+Channel::total_bytes (void)
+{
+ return this->total_bytes_;
+}
+
+void
+Channel::total_bytes (size_t bytes)
+{
+ this->total_bytes_ += bytes;
+}
+
+Channel::Channel (ROUTING_TABLE *rt,
+ Channel_Connector *cc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : id_ (-1),
+ total_bytes_ (0),
+ state_ (Channel::IDLE),
+ routing_table_ (rt),
+ connector_ (cc),
+ timeout_ (1),
+ max_timeout_ (Channel::MAX_RETRY_TIMEOUT),
+ socket_queue_size_ (socket_queue_size),
+ ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH> (thr_mgr)
+{
+}
+
+// Set the associated channel.
+
+void
+Channel::active (int a)
+{
+ this->state (a == 0 ? Channel::IDLE : Channel::ESTABLISHED);
+}
+
+// Get the associated channel.
+
+int
+Channel::active (void)
+{
+ return this->state () == Channel::ESTABLISHED;
+}
+
+// Set the direction.
+
+void
+Channel::direction (char d)
+{
+ this->direction_ = d;
+}
+
+// Get the direction.
+
+char
+Channel::direction (void)
+{
+ return this->direction_;
+}
+
+// Sets the timeout delay.
+
+void
+Channel::timeout (int to)
+{
+ if (to > this->max_timeout_)
+ to = this->max_timeout_;
+
+ this->timeout_ = to;
+}
+
+// Recalculate the current retry timeout delay using exponential
+// backoff. Returns the original timeout (i.e., before the
+// recalculation).
+
+int
+Channel::timeout (void)
+{
+ int old_timeout = this->timeout_;
+ this->timeout_ *= 2;
+
+ if (this->timeout_ > this->max_timeout_)
+ this->timeout_ = this->max_timeout_;
+
+ return old_timeout;
+}
+
+// Sets the max timeout delay.
+
+void
+Channel::max_timeout (int mto)
+{
+ this->max_timeout_ = mto;
+}
+
+// Gets the max timeout delay.
+
+int
+Channel::max_timeout (void)
+{
+ return this->max_timeout_;
+}
+
+// Restart connection asynchronously when timeout occurs.
+
+int
+Channel::handle_timeout (const ACE_Time_Value &, const void *)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) attempting to reconnect Channel %d with timeout = %d\n",
+ this->id (), this->timeout_));
+ return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
+}
+
+// Restart connection (blocking_semantics dicates whether we
+// restart synchronously or asynchronously).
+
+int
+Channel::reinitiate_connection (void)
+{
+ // Skip over deactivated descriptors.
+ if (this->get_handle () != -1)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ this->peer ().close ();
+
+#if 0
+// if (this->state () == FAILED)
+// {
+ // Reinitiate timeout to improve reconnection time.
+// this->timeout (1);
+#endif
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) scheduling reinitiation of Channel %d\n",
+ this->id ()));
+
+ // Reschedule ourselves to try and connect again.
+ if (ACE_Service_Config::reactor ()->schedule_timer (this, 0,
+ this->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
+ return 0;
+}
+
+// Handle shutdown of the Channel object.
+
+int
+Channel::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down Channel %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ return this->reinitiate_connection ();
+}
+
+// Set the state of the channel.
+
+void
+Channel::state (Channel::State s)
+{
+ this->state_ = s;
+}
+
+// Perform the first-time initiation of a connection to the peer.
+
+int
+Channel::initialize_connection (void)
+{
+ this->state_ = Channel::ESTABLISHED;
+
+ // Restart the timeout to 1.
+ this->timeout (1);
+
+#if defined (ASSIGN_ROUTING_ID)
+ // Action that sends the route id to the peerd.
+
+ CONN_ID id = htons (this->id ());
+
+ ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
+
+ if (n != sizeof id)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ n == 0 ? "gatewayd has closed down unexpectedly" : "send"), -1);
+#endif /* ASSIGN_ROUTING_ID */
+ return 0;
+}
+
+// Set the size of the socket queue.
+
+void
+Channel::socket_queue_size (void)
+{
+ if (this->socket_queue_size_ > 0)
+ {
+ int option = this->direction_ == 'I' ? SO_RCVBUF : SO_SNDBUF;
+
+ if (this->peer ().set_option (SOL_SOCKET, option,
+ &this->socket_queue_size_, sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
+ }
+}
+
+// Upcall from the ACE_Acceptor::handle_input() that
+// delegates control to our application-specific Channel.
+
+int
+Channel::open (void *a)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Channel's fd = %d\n", this->peer ().get_handle ()));
+
+ // Set the size of the socket queue.
+ this->socket_queue_size ();
+
+ // Turn on non-blocking I/O.
+ if (this->peer ().enable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Call down to the base class to activate and register this handler.
+ if (this->ACE_Svc_Handler<CHANNEL_PEER_STREAM, SYNCH>::open (a) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
+
+ return this->initialize_connection ();
+}
+
+// Return the current state of the channel.
+
+Channel::State
+Channel::state (void)
+{
+ return this->state_;
+}
+
+void
+Channel::id (CONN_ID id)
+{
+ this->id_ = id;
+}
+
+CONN_ID
+Channel::id (void)
+{
+ return this->id_;
+}
+
+// Set the peer's address information.
+int
+Channel::bind (const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ CONN_ID id)
+{
+ this->remote_addr_ = remote_addr;
+ this->local_addr_ = local_addr;
+ this->id_ = id;
+ return 0;
+}
+
+ACE_INET_Addr &
+Channel::remote_addr (void)
+{
+ return this->remote_addr_;
+}
+
+ACE_INET_Addr &
+Channel::local_addr (void)
+{
+ return this->local_addr_;
+}
+
+// Constructor sets the routing table pointer.
+
+Output_Channel::Output_Channel (ROUTING_TABLE *rt,
+ Channel_Connector *cc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : Channel (rt, cc, thr_mgr, socket_queue_size)
+{
+ this->direction_ = 'O';
+ this->msg_queue ()->high_water_mark (Output_Channel::QUEUE_SIZE);
+}
+
+// This method should be called only when the peer shuts down
+// unexpectedly. This method simply marks the Channel as
+// having failed so that handle_close () can reconnect.
+
+int
+Output_Channel::handle_input (ACE_HANDLE)
+{
+ char buf[1];
+
+ this->state (Channel::FAILED);
+
+ switch (this->peer ().recv (buf, sizeof buf))
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has failed unexpectedly for Output Channel %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case 0:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has shutdown unexpectedly for Output Channel %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer is sending input on Output Channel %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ }
+}
+
+int
+Output_Channel::svc (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Output_Channel!\n"), -1);
+}
+
+// Perform a non-blocking put() of message MB. If we are unable to
+// send the entire message the remainder is re-queued at the *front* of
+// the Message_List.
+
+int
+Output_Channel::nonblk_put (ACE_Message_Block *mb)
+{
+ // Try to send the message. If we don't send it all (e.g., due to
+ // flow control), then re-queue the remainder at the head of the
+ // Message_List and ask the ACE_Reactor to inform us (via
+ // handle_output()) when it is possible to try again.
+
+ ssize_t n;
+
+ if ((n = this->send_peer (mb)) == -1)
+ {
+ // Things have gone wrong, let's try to close down and set up a new reconnection.
+ this->state (Channel::FAILED);
+ this->handle_close ();
+ return -1;
+ }
+ else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+ // ACE_Queue in *front* of the list to preserve order.
+ if (this->msg_queue ()->enqueue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
+
+ // Tell ACE_Reactor to call us back when we can send again.
+ else if (ACE_Service_Config::reactor ()->
+ schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
+ return 0;
+ }
+ else
+ return n;
+}
+
+int
+Output_Channel::send_peer (ACE_Message_Block *mb)
+{
+ ssize_t n;
+ size_t len = mb->length ();
+
+ if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0)
+ return errno == EWOULDBLOCK ? 0 : n;
+ else if (n < len)
+ // Re-adjust pointer to skip over the part we did send.
+ mb->rd_ptr (n);
+ else /* if (n == length) */
+ {
+ // The whole message is sent, we can now safely deallocate the buffer.
+ // Note that this should decrement a reference count...
+ delete mb;
+ errno = 0;
+ }
+ this->total_bytes (n);
+ return n;
+}
+
+// Finish sending a message when flow control conditions abate.
+// This method is automatically called by the ACE_Reactor.
+
+int
+Output_Channel::handle_output (ACE_HANDLE)
+{
+ ACE_Message_Block *mb = 0;
+ int status = 0;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) in handle_output on handle %d\n", this->get_handle ()));
+ // The list had better not be empty, otherwise there's a bug!
+
+ if (this->msg_queue ()->dequeue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (mb))
+ {
+ case 0: // Partial send.
+ ACE_ASSERT (errno == EWOULDBLOCK);
+ // Didn't write everything this time, come back later...
+ break;
+
+ case -1:
+ // Caller is responsible for freeing a ACE_Message_Block if failures occur.
+ delete mb;
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
+
+ /* FALLTHROUGH */
+ default: // Sent the whole thing.
+
+ // If we succeed in writing the entire message (or we did not fail
+ // due to EWOULDBLOCK) then check if there are more messages on the Message_List.
+ // If there aren't, tell the ACE_Reactor not to notify us anymore (at least
+ // until there are new messages queued up).
+
+ if (this->msg_queue ()->is_empty ())
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) queueing deactivated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+
+ if (ACE_Service_Config::reactor ()->
+ cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
+ }
+ }
+ }
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
+ return 0;
+}
+
+// Send a message to a peer (may queue if necessary).
+
+int
+Output_Channel::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ if (this->msg_queue ()->is_empty ())
+ // Try to send the message *without* blocking!
+ return this->nonblk_put (mb);
+ else
+ // If we have queued up messages due to flow control
+ // then just enqueue and return.
+ return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Constructor sets the routing table pointer and the connector pointer.
+
+Input_Channel::Input_Channel (ROUTING_TABLE *rt,
+ Channel_Connector *cc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : msg_frag_ (0),
+ Channel (rt, cc, thr_mgr, socket_queue_size)
+{
+ this->direction_ = 'I';
+ this->msg_queue ()->high_water_mark (0);
+}
+
+int
+Input_Channel::put (ACE_Message_Block *, ACE_Time_Value *)
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) put should not be called on Input_Channel!\n"), -1);
+}
+
+int
+Input_Channel::svc (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) svc should not be called on Input_Channel!\n"), -1);
+}
+
+// Receive a Peer message from peerd. Handles fragmentation.
+//
+// The routing message returned from recv_peer consists of two parts:
+// 1. The Address part, contains the virtual routing id.
+// 2. The Data part, which contains the actual data to be routed.
+//
+// The reason for having two parts is to shield the higher layers
+// of software from knowledge of the message structure.
+
+int
+Input_Channel::recv_peer (ACE_Message_Block *&route_addr)
+{
+ Peer_Message *peer_msg;
+ size_t len;
+ ssize_t n = 0;
+ ssize_t m = 0;
+ size_t offset = 0;
+
+ if (this->msg_frag_ == 0)
+ {
+ // No existing fragment...
+ ACE_NEW_RETURN (this->msg_frag_,
+ ACE_Message_Block (sizeof (Peer_Message)),
+ -1);
+
+ if (this->msg_frag_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) out of memory\n"), -1);
+
+ peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr ();
+
+ switch (n = this->peer ().recv (peer_msg, sizeof (Peer_Header)))
+ {
+ case sizeof (Peer_Header):
+ len = ntohl (peer_msg->header_.len_);
+ if (len <= sizeof peer_msg->buf_)
+ {
+ this->msg_frag_->wr_ptr (sizeof (Peer_Header));
+ break; // The message is within the maximum size range.
+ }
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) message too long = %d\n", len));
+ /* FALLTHROUGH */
+ default:
+ ACE_ERROR ((LM_ERROR, "(%t) invalid length = %d\n", n));
+ n = -1;
+ /* FALLTHROUGH */
+ case -1:
+ /* FALLTHROUGH */
+ case 0: // Premature EOF.
+ // Make sure to free up memory on error returns.
+ delete this->msg_frag_;
+ this->msg_frag_ = 0;
+ return n;
+ }
+ }
+ else
+ {
+ // Figure out where we left off.
+ peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr ();
+ offset = this->msg_frag_->length () - sizeof (Peer_Header);
+ len = peer_msg->header_.len_ - offset;
+ }
+
+ // Try to receive the remainder of the message.
+
+ switch (m = this->peer ().recv (peer_msg->buf_ + offset, len))
+ {
+ case -1:
+ if (errno == EWOULDBLOCK)
+ {
+ // This shouldn't happen since the ACE_Reactor
+ // just triggered us to handle pending I/O!
+ ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n"));
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ else
+ /* FALLTHROUGH */;
+
+ case 0: // Premature EOF.
+ delete this->msg_frag_;
+ this->msg_frag_ = 0;
+ return 0;
+
+ default:
+ if (m != len)
+ // Re-adjust pointer to skip over the part we've read.
+ {
+ this->msg_frag_->wr_ptr (m);
+ errno = EWOULDBLOCK;
+ return -1; // Inform caller that we didn't get the whole message.
+ }
+ else
+ {
+ // Set the write pointer at 1 past the end of the message.
+ this->msg_frag_->wr_ptr (m);
+
+ // Set the read pointer to the beginning of the message.
+ this->msg_frag_->rd_ptr (this->msg_frag_->base ());
+
+ // Allocate a routing message header and chain the data portion
+ // onto its continuation field.
+ ACE_NEW_RETURN (route_addr,
+ ACE_Message_Block (sizeof (Peer_Addr),
+ ACE_Message_Block::MB_PROTO,
+ this->msg_frag_),
+ -1);
+
+ Peer_Addr peer_addr (this->id (), peer_msg->header_.routing_id_, 0);
+ // Copy the routing address from the Peer_Message into routing_addr.
+ route_addr->copy ((char *) &peer_addr, sizeof (Peer_Addr));
+
+ // Reset the pointer to indicate we've got an entire message.
+ this->msg_frag_ = 0;
+ }
+ this->total_bytes (m + n);
+#if defined (VERBOSE)
+ ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s",
+ peer_addr.conn_id_, peer_msg->header_.routing_id_, peer_msg->header_.len_,
+ peer_msg->header_.len_, peer_msg->buf_));
+#else
+ ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n",
+ peer_msg->header_.routing_id_, peer_msg->header_.len_, this->total_bytes ()));
+#endif
+ return m + n;
+ }
+}
+
+// Receive various types of input (e.g., Peer message from the
+// gatewayd, as well as stdio).
+
+int
+Input_Channel::handle_input (ACE_HANDLE)
+{
+ ACE_Message_Block *route_addr = 0;
+
+ switch (this->recv_peer (route_addr))
+ {
+ case 0:
+ // Note that a peer should never initiate a shutdown.
+ this->state (Channel::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has closed down unexpectedly for Input Channel %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // A short-read, we'll come back and finish it up later on!
+ return 0;
+ else // A weird problem occurred, shut down and start again.
+ {
+ this->state (Channel::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Channel %d\n",
+ "Peer has failed unexpectedly",
+ this->id ()), -1);
+ }
+ /* NOTREACHED */
+ default:
+ return this->route_message (route_addr);
+ }
+}
+
+// Route a message to its appropriate destination.
+
+int
+Input_Channel::route_message (ACE_Message_Block *route_addr)
+{
+ // We got a valid message, so determine its virtual routing id,
+ // which is stored in the first of the two message blocks chained together.
+
+ Peer_Addr *routing_key = (Peer_Addr *) route_addr->rd_ptr ();
+
+ // Skip over the address portion.
+ const ACE_Message_Block *const data = route_addr->cont ();
+
+ // RE points to the routing entry located for this routing id.
+ Routing_Entry *re = 0;
+
+ if (this->routing_table_->find (*routing_key, re) != -1)
+ {
+ // Check to see if there are any destinations.
+ if (re->destinations ()->size () == 0)
+ ACE_DEBUG ((LM_WARNING,
+ "there are no active destinations for this message currently\n"));
+
+ else // There are destinations, so forward the message.
+ {
+ Routing_Entry::ENTRY_SET *esp = re->destinations ();
+ Routing_Entry::ENTRY_ITERATOR si (*esp);
+
+ for (Channel **channel = 0; si.next (channel) != 0; si.advance ())
+ {
+ // Only process active channels.
+ if ((*channel)->active ())
+ {
+ // Clone the message portion (should be doing reference counting here...)
+ ACE_Message_Block *newmsg = data->clone ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ()));
+
+ if ((*channel)->put (newmsg) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n",
+ "gateway is flow controlled, so we're dropping messages"));
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n",
+ "put", (*channel)->id ()));
+
+ // Caller is responsible for freeing a ACE_Message_Block if failures occur.
+ delete newmsg;
+ }
+ }
+ }
+ // Will become superfluous once we have reference counting...
+ delete route_addr;
+ return 0;
+ }
+ }
+ delete route_addr;
+ // Failure return.
+ ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
+ routing_key->conn_id_, routing_key->logical_id_, routing_key->payload_));
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Map_Manager<Peer_Addr, Routing_Entry *, MUTEX>;
+template class ACE_Map_Iterator<Peer_Addr, Routing_Entry *, MUTEX>;
+template class ACE_Map_Entry<Peer_Addr, Routing_Entry *>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */