summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Proxy_Handler.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-01 08:00:34 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-01 08:00:34 +0000
commitea0d28240863caf437a18071bfd03e7b146c5ade (patch)
tree91b695852b885a5f44f9be8c3a22bbf7f5a96b8d /apps/Gateway/Gateway/Proxy_Handler.cpp
parenta6e2ced2f5279e011b712995095a1712a29e22f0 (diff)
downloadATCD-ea0d28240863caf437a18071bfd03e7b146c5ade.tar.gz
foo
Diffstat (limited to 'apps/Gateway/Gateway/Proxy_Handler.cpp')
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.cpp345
1 files changed, 114 insertions, 231 deletions
diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp
index 86e0fff8e41..2f161c171f6 100644
--- a/apps/Gateway/Gateway/Proxy_Handler.cpp
+++ b/apps/Gateway/Gateway/Proxy_Handler.cpp
@@ -1,11 +1,18 @@
// $Id$
-#include "Dispatch_Set.h"
-#include "Proxy_Handler_Connector.h"
+#include "Event_Channel.h"
-// Convenient short-hands.
-#define CO CONDITION
-#define MU MAP_MUTEX
+void
+Proxy_Handler::id (ACE_INT32 id)
+{
+ this->id_ = id;
+}
+
+ACE_INT32
+Proxy_Handler::id (void)
+{
+ return this->id_;
+}
// The total number of bytes sent/received on this Proxy.
@@ -21,36 +28,35 @@ Proxy_Handler::total_bytes (size_t bytes)
this->total_bytes_ += bytes;
}
-Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr),
- efd_ (efd),
- id_ (-1),
+Proxy_Handler::Proxy_Handler (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : remote_addr_ (remote_addr),
+ local_addr_ (local_addr),
+ id_ (conn_id),
total_bytes_ (0),
state_ (Proxy_Handler::IDLE),
- connector_ (ioc),
timeout_ (1),
- max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT),
- socket_queue_size_ (socket_queue_size)
+ max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT),
+ event_channel_ (ec)
{
}
-// Set the direction.
+// Set the proxy_role.
void
-Proxy_Handler::direction (char d)
+Proxy_Handler::proxy_role (char d)
{
- this->direction_ = d;
+ this->proxy_role_ = d;
}
-// Get the direction.
+// Get the proxy_role.
char
-Proxy_Handler::direction (void)
+Proxy_Handler::proxy_role (void)
{
- return this->direction_;
+ return this->proxy_role_;
}
// Sets the timeout delay.
@@ -64,9 +70,9 @@ Proxy_Handler::timeout (int to)
this->timeout_ = to;
}
-// Recalculate the current retry timeout delay using exponential
+// Re-calculate the current retry timeout delay using exponential
// backoff. Returns the original timeout (i.e., before the
-// recalculation).
+// re-calculation).
int
Proxy_Handler::timeout (void)
@@ -99,37 +105,16 @@ Proxy_Handler::max_timeout (void)
// Restart connection asynchronously when timeout occurs.
int
-Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *)
+Proxy_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n",
this->id (), this->timeout_));
- return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
-}
-
-// Restart connection (blocking_semantics dicates whether we
-// restart synchronously or asynchronously).
-int
-Proxy_Handler::reinitiate_connection (void)
-{
- // Skip over deactivated descriptors.
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- // Make sure to close down peer to reclaim descriptor.
- this->peer ().close ();
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Proxy_Handler %d\n",
- this->id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Service_Config::reactor ()->schedule_timer
- (this, 0, this->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- return 0;
+ // Delegate the re-connection attempt to the Event Channel.
+ return this->event_channel_.initiate_proxy_connection
+ (this, ACE_Synch_Options::asynch);
}
// Handle shutdown of the Proxy_Handler object.
@@ -141,7 +126,8 @@ Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
"(%t) shutting down Proxy_Handler %d on handle %d\n",
this->id (), this->get_handle ()));
- return this->reinitiate_connection ();
+ // Restart the connection, if possible.
+ return this->event_channel_.reinitiate_proxy_connection (this);
}
// Set the state of the Proxy.
@@ -152,66 +138,29 @@ Proxy_Handler::state (Proxy_Handler::State s)
this->state_ = s;
}
-// Perform the first-time initiation of a connection to the peer.
-
-int
-Proxy_Handler::initialize_connection (void)
-{
- this->state_ = Proxy_Handler::ESTABLISHED;
-
- // Restart the timeout to 1.
- this->timeout (1);
-
- // Action that sends the connection id to the peerd.
-
- ACE_INT32 id = htonl (this->id ());
-
- ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
-
- if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- n == 0 ? "peer has closed down unexpectedly" : "send"),
- -1);
- return 0;
-}
-
-// Set the size of the socket queue.
-
-void
-Proxy_Handler::socket_queue_size (void)
-{
- if (this->socket_queue_size_ > 0)
- {
- int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF;
-
- if (this->peer ().set_option (SOL_SOCKET, option,
- &this->socket_queue_size_,
- sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
- }
-}
-
-// Upcall from the ACE_Acceptor::handle_input() that
-// delegates control to our application-specific Proxy_Handler.
+// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates
+// control to our Proxy_Handler.
int
-Proxy_Handler::open (void *a)
+Proxy_Handler::open (void *)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n",
+ ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's handle = %d\n",
this->peer ().get_handle ()));
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
// Turn on non-blocking I/O.
if (this->peer ().enable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
- // Call down to the base class to activate and register this handler.
- if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
- return this->initialize_connection ();
+ // Register ourselves to receive input events.
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+ else
+ return 0;
}
// Return the current state of the Proxy.
@@ -222,30 +171,6 @@ Proxy_Handler::state (void)
return this->state_;
}
-void
-Proxy_Handler::id (ACE_INT32 id)
-{
- this->id_ = id;
-}
-
-ACE_INT32
-Proxy_Handler::id (void)
-{
- return this->id_;
-}
-
-// Set the peer's address information.
-int
-Proxy_Handler::bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 id)
-{
- this->remote_addr_ = remote_addr;
- this->local_addr_ = local_addr;
- this->id_ = id;
- return 0;
-}
-
ACE_INET_Addr &
Proxy_Handler::remote_addr (void)
{
@@ -258,15 +183,13 @@ Proxy_Handler::local_addr (void)
return this->local_addr_;
}
-// Constructor sets the consumer map pointer.
-
-Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size)
+Consumer_Proxy::Consumer_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : Proxy_Handler (ec, remote_addr, local_addr, conn_id)
{
- this->direction_ = 'C';
+ this->proxy_role_ = 'C';
this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE);
}
@@ -311,7 +234,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
// Try to send the event. If we don't send it all (e.g., due to
// flow control), then re-queue the remainder at the head of the
// Event_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
+ // handle_output()) when it is possible to try again.
ssize_t n = this->send (event);
@@ -325,7 +248,8 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
}
else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
{
- ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) queueing activated on handle %d to routing id %d\n",
this->get_handle (), this->id ()));
// ACE_Queue in *front* of the list to preserve order.
@@ -354,11 +278,11 @@ Consumer_Proxy::send (ACE_Message_Block *event)
else if (n < len)
// Re-adjust pointer to skip over the part we did send.
event->rd_ptr (n);
- else /* if (n == length) */
+ else // if (n == length)
{
- // The whole event is sent, we can now safely deallocate the
- // buffer. Note that this should decrement a reference count...
- delete event;
+ // The whole event is sent, we now decrement the reference count
+ // (which deletes itself with it reaches 0.
+ event->release ();
errno = 0;
}
this->total_bytes (n);
@@ -389,9 +313,9 @@ Consumer_Proxy::handle_output (ACE_HANDLE)
break;
case -1:
- // We are responsible for freeing an ACE_Message_Block if
+ // We are responsible for releasing an ACE_Message_Block if
// failures occur.
- delete event;
+ event->release ();
ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
/* FALLTHROUGH */
@@ -436,17 +360,14 @@ Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
(event, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
-// Constructor sets the consumer map pointer and the connector
-// pointer.
-
-Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
+Supplier_Proxy::Supplier_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
: msg_frag_ (0),
- Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size)
+ Proxy_Handler (ec, remote_addr, local_addr, conn_id)
{
- this->direction_ = 'S';
+ this->proxy_role_ = 'S';
this->msg_queue ()->high_water_mark (0);
}
@@ -490,8 +411,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
ACE_DEBUG ((LM_DEBUG,
"attempted to read %d\n",
header_bytes_left_to_read));
- delete this->msg_frag_;
- this->msg_frag_ = 0;
+ this->msg_frag_ = this->msg_frag_->release ();
return header_received;
}
@@ -508,11 +428,34 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
errno = EWOULDBLOCK;
return -1;
}
+
+ // Convert the header into host byte order so that we can access
+ // it directly without having to repeatedly muck with it...
+ event->header_.decode ();
+
+ if (event->header_.len_ > sizeof event->data_)
+ {
+ // This data_ payload is too big!
+ errno = EINVAL;
+ ACE_DEBUG ((LM_DEBUG,
+ "Data payload is too big (%d bytes)\n",
+ event->header_.len_));
+ return -1;
+ }
+
}
- // At this point there is a complete, valid header in msg_frag_
+ // At this point there is a complete, valid header in Event. Now we
+ // need to get the event payload. Due to incomplete reads this may
+ // not be the first time we've read in a fragment for this message.
+ // We account for this here. Note that the first time in here
+ // msg_frag_->wr_ptr() will point to event->data_. Every time we do
+ // a successful fragment read, we advance wr_ptr(). Therefore, by
+ // subtracting how much we've already read from the
+ // event->header_.len_ we complete the data_bytes_left_to_read...
+
ssize_t data_bytes_left_to_read =
- sizeof (Event) - this->msg_frag_->length ();
+ ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
ssize_t data_received =
this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
@@ -529,8 +472,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
/* FALLTHROUGH */;
case 0: // Premature EOF.
- delete this->msg_frag_;
- this->msg_frag_ = 0;
+ this->msg_frag_ = this->msg_frag_->release ();
return 0;
default:
@@ -550,23 +492,22 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
// Allocate an event forwarding header and chain the data
// portion onto its continuation field.
- forward_addr = new ACE_Message_Block (sizeof (Event_Addr),
+ forward_addr = new ACE_Message_Block (sizeof (Event_Key),
ACE_Message_Block::MB_PROTO,
this->msg_frag_);
if (forward_addr == 0)
{
- delete this->msg_frag_;
- this->msg_frag_ = 0;
+ this->msg_frag_ = this->msg_frag_->release ();
errno = ENOMEM;
return -1;
}
- Event_Addr event_addr (this->id (),
+ Event_Key event_addr (this->id (),
event->header_.supplier_id_,
event->header_.type_);
- // Copy the forwarding address from the Event_Addr into
+ // Copy the forwarding address from the Event_Key into
// forward_addr.
- forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr));
+ forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
// Reset the pointer to indicate we've got an entire event.
this->msg_frag_ = 0;
@@ -579,8 +520,12 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
event->header_.len_, event->data_));
#else
ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
- event->header_.supplier_id_, event->header_.len_, this->total_bytes ()));
-#endif
+ event->header_.supplier_id_, event->header_.len_, data_received + header_received));
+#endif /* VERBOSE */
+
+ // Encode before returning so that we can set things out in
+ // network byte order.
+ event->header_.encode ();
return data_received + header_received;
}
}
@@ -620,79 +565,17 @@ Supplier_Proxy::handle_input (ACE_HANDLE)
}
}
-// Forward an event to its appropriate Consumer(s).
+// Forward an event to its appropriate Consumer(s). This delegates to
+// the <ACE_Event_Channel> to do the actual forwarding.
int
Supplier_Proxy::forward (ACE_Message_Block *forward_addr)
{
- // We got a valid event, so determine its virtual forwarding
- // address, which is stored in the first of the two event blocks,
- // which are chained together by this->recv().
-
- Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr ();
-
- // Skip over the address portion and get the data.
- const ACE_Message_Block *const data = forward_addr->cont ();
-
- // <dispatch_set> points to the set of Consumers associated with
- // this forwarding address.
- Dispatch_Set *dispatch_set = 0;
-
- if (this->efd_->find (*forwarding_addr, dispatch_set) != -1)
- {
- // Check to see if there are any destinations.
- if (dispatch_set->size () == 0)
- ACE_DEBUG ((LM_WARNING,
- "there are no active destinations for this event currently\n"));
-
- else // There are destinations, so forward the event.
- {
- Dispatch_Set_Iterator dsi (*dispatch_set);
-
- for (Proxy_Handler **proxy_handler = 0;
- dsi.next (proxy_handler) != 0;
- dsi.advance ())
- {
- // Only process active proxy_handlers.
- if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED)
- {
- // Clone the event portion (should be doing
- // reference counting here...)
- ACE_Message_Block *newmsg = data->clone ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n",
- (*proxy_handler)->id ()));
-
- if ((*proxy_handler)->put (newmsg) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping events"));
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n",
- "put", (*proxy_handler)->id ()));
-
- // We are responsible for freeing a
- // ACE_Message_Block if failures occur.
- delete newmsg;
- }
- }
- }
- // Will become superfluous once we have reference
- // counting...
- delete forward_addr;
- return 0;
- }
- }
- delete forward_addr;
- // Failure return.
- ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
- forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_));
- return 0;
+ return this->event_channel_.put (forward_addr);
}
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>;
-template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>;
-template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>;
+template class ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>;
+template class ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>;
+template class ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */