diff options
Diffstat (limited to 'apps/Gateway/Gateway/Proxy_Handler.cpp')
-rw-r--r-- | apps/Gateway/Gateway/Proxy_Handler.cpp | 345 |
1 files changed, 114 insertions, 231 deletions
diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp index 86e0fff8e41..2f161c171f6 100644 --- a/apps/Gateway/Gateway/Proxy_Handler.cpp +++ b/apps/Gateway/Gateway/Proxy_Handler.cpp @@ -1,11 +1,18 @@ // $Id$ -#include "Dispatch_Set.h" -#include "Proxy_Handler_Connector.h" +#include "Event_Channel.h" -// Convenient short-hands. -#define CO CONDITION -#define MU MAP_MUTEX +void +Proxy_Handler::id (ACE_INT32 id) +{ + this->id_ = id; +} + +ACE_INT32 +Proxy_Handler::id (void) +{ + return this->id_; +} // The total number of bytes sent/received on this Proxy. @@ -21,36 +28,35 @@ Proxy_Handler::total_bytes (size_t bytes) this->total_bytes_ += bytes; } -Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), - efd_ (efd), - id_ (-1), +Proxy_Handler::Proxy_Handler (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : remote_addr_ (remote_addr), + local_addr_ (local_addr), + id_ (conn_id), total_bytes_ (0), state_ (Proxy_Handler::IDLE), - connector_ (ioc), timeout_ (1), - max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), - socket_queue_size_ (socket_queue_size) + max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), + event_channel_ (ec) { } -// Set the direction. +// Set the proxy_role. void -Proxy_Handler::direction (char d) +Proxy_Handler::proxy_role (char d) { - this->direction_ = d; + this->proxy_role_ = d; } -// Get the direction. +// Get the proxy_role. char -Proxy_Handler::direction (void) +Proxy_Handler::proxy_role (void) { - return this->direction_; + return this->proxy_role_; } // Sets the timeout delay. @@ -64,9 +70,9 @@ Proxy_Handler::timeout (int to) this->timeout_ = to; } -// Recalculate the current retry timeout delay using exponential +// Re-calculate the current retry timeout delay using exponential // backoff. Returns the original timeout (i.e., before the -// recalculation). +// re-calculation). int Proxy_Handler::timeout (void) @@ -99,37 +105,16 @@ Proxy_Handler::max_timeout (void) // Restart connection asynchronously when timeout occurs. int -Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *) +Proxy_Handler::handle_timeout (const ACE_Time_Value &, + const void *) { ACE_DEBUG ((LM_DEBUG, "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n", this->id (), this->timeout_)); - return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); -} - -// Restart connection (blocking_semantics dicates whether we -// restart synchronously or asynchronously). -int -Proxy_Handler::reinitiate_connection (void) -{ - // Skip over deactivated descriptors. - if (this->get_handle () != ACE_INVALID_HANDLE) - { - // Make sure to close down peer to reclaim descriptor. - this->peer ().close (); - - ACE_DEBUG ((LM_DEBUG, - "(%t) scheduling reinitiation of Proxy_Handler %d\n", - this->id ())); - - // Reschedule ourselves to try and connect again. - if (ACE_Service_Config::reactor ()->schedule_timer - (this, 0, this->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - return 0; + // Delegate the re-connection attempt to the Event Channel. + return this->event_channel_.initiate_proxy_connection + (this, ACE_Synch_Options::asynch); } // Handle shutdown of the Proxy_Handler object. @@ -141,7 +126,8 @@ Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) "(%t) shutting down Proxy_Handler %d on handle %d\n", this->id (), this->get_handle ())); - return this->reinitiate_connection (); + // Restart the connection, if possible. + return this->event_channel_.reinitiate_proxy_connection (this); } // Set the state of the Proxy. @@ -152,66 +138,29 @@ Proxy_Handler::state (Proxy_Handler::State s) this->state_ = s; } -// Perform the first-time initiation of a connection to the peer. - -int -Proxy_Handler::initialize_connection (void) -{ - this->state_ = Proxy_Handler::ESTABLISHED; - - // Restart the timeout to 1. - this->timeout (1); - - // Action that sends the connection id to the peerd. - - ACE_INT32 id = htonl (this->id ()); - - ssize_t n = this->peer ().send ((const void *) &id, sizeof id); - - if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - n == 0 ? "peer has closed down unexpectedly" : "send"), - -1); - return 0; -} - -// Set the size of the socket queue. - -void -Proxy_Handler::socket_queue_size (void) -{ - if (this->socket_queue_size_ > 0) - { - int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; - - if (this->peer ().set_option (SOL_SOCKET, option, - &this->socket_queue_size_, - sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); - } -} - -// Upcall from the ACE_Acceptor::handle_input() that -// delegates control to our application-specific Proxy_Handler. +// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates +// control to our Proxy_Handler. int -Proxy_Handler::open (void *a) +Proxy_Handler::open (void *) { - ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's handle = %d\n", this->peer ().get_handle ())); - // Set the size of the socket queue. - this->socket_queue_size (); - // Turn on non-blocking I/O. if (this->peer ().enable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - // Call down to the base class to activate and register this handler. - if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_.complete_proxy_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); - return this->initialize_connection (); + // Register ourselves to receive input events. + else if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + else + return 0; } // Return the current state of the Proxy. @@ -222,30 +171,6 @@ Proxy_Handler::state (void) return this->state_; } -void -Proxy_Handler::id (ACE_INT32 id) -{ - this->id_ = id; -} - -ACE_INT32 -Proxy_Handler::id (void) -{ - return this->id_; -} - -// Set the peer's address information. -int -Proxy_Handler::bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32 id) -{ - this->remote_addr_ = remote_addr; - this->local_addr_ = local_addr; - this->id_ = id; - return 0; -} - ACE_INET_Addr & Proxy_Handler::remote_addr (void) { @@ -258,15 +183,13 @@ Proxy_Handler::local_addr (void) return this->local_addr_; } -// Constructor sets the consumer map pointer. - -Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) +Consumer_Proxy::Consumer_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : Proxy_Handler (ec, remote_addr, local_addr, conn_id) { - this->direction_ = 'C'; + this->proxy_role_ = 'C'; this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE); } @@ -311,7 +234,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) // Try to send the event. If we don't send it all (e.g., due to // flow control), then re-queue the remainder at the head of the // Event_List and ask the ACE_Reactor to inform us (via - // handle_output()) when it is possible to try again. + // handle_output()) when it is possible to try again. ssize_t n = this->send (event); @@ -325,7 +248,8 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) } else if (errno == EWOULDBLOCK) // Didn't manage to send everything. { - ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing activated on handle %d to routing id %d\n", this->get_handle (), this->id ())); // ACE_Queue in *front* of the list to preserve order. @@ -354,11 +278,11 @@ Consumer_Proxy::send (ACE_Message_Block *event) else if (n < len) // Re-adjust pointer to skip over the part we did send. event->rd_ptr (n); - else /* if (n == length) */ + else // if (n == length) { - // The whole event is sent, we can now safely deallocate the - // buffer. Note that this should decrement a reference count... - delete event; + // The whole event is sent, we now decrement the reference count + // (which deletes itself with it reaches 0. + event->release (); errno = 0; } this->total_bytes (n); @@ -389,9 +313,9 @@ Consumer_Proxy::handle_output (ACE_HANDLE) break; case -1: - // We are responsible for freeing an ACE_Message_Block if + // We are responsible for releasing an ACE_Message_Block if // failures occur. - delete event; + event->release (); ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); /* FALLTHROUGH */ @@ -436,17 +360,14 @@ Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *) (event, (ACE_Time_Value *) &ACE_Time_Value::zero); } -// Constructor sets the consumer map pointer and the connector -// pointer. - -Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) +Supplier_Proxy::Supplier_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) : msg_frag_ (0), - Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) + Proxy_Handler (ec, remote_addr, local_addr, conn_id) { - this->direction_ = 'S'; + this->proxy_role_ = 'S'; this->msg_queue ()->high_water_mark (0); } @@ -490,8 +411,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) ACE_DEBUG ((LM_DEBUG, "attempted to read %d\n", header_bytes_left_to_read)); - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); return header_received; } @@ -508,11 +428,34 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) errno = EWOULDBLOCK; return -1; } + + // Convert the header into host byte order so that we can access + // it directly without having to repeatedly muck with it... + event->header_.decode (); + + if (event->header_.len_ > sizeof event->data_) + { + // This data_ payload is too big! + errno = EINVAL; + ACE_DEBUG ((LM_DEBUG, + "Data payload is too big (%d bytes)\n", + event->header_.len_)); + return -1; + } + } - // At this point there is a complete, valid header in msg_frag_ + // At this point there is a complete, valid header in Event. Now we + // need to get the event payload. Due to incomplete reads this may + // not be the first time we've read in a fragment for this message. + // We account for this here. Note that the first time in here + // msg_frag_->wr_ptr() will point to event->data_. Every time we do + // a successful fragment read, we advance wr_ptr(). Therefore, by + // subtracting how much we've already read from the + // event->header_.len_ we complete the data_bytes_left_to_read... + ssize_t data_bytes_left_to_read = - sizeof (Event) - this->msg_frag_->length (); + ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); ssize_t data_received = this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); @@ -529,8 +472,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) /* FALLTHROUGH */; case 0: // Premature EOF. - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); return 0; default: @@ -550,23 +492,22 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) // Allocate an event forwarding header and chain the data // portion onto its continuation field. - forward_addr = new ACE_Message_Block (sizeof (Event_Addr), + forward_addr = new ACE_Message_Block (sizeof (Event_Key), ACE_Message_Block::MB_PROTO, this->msg_frag_); if (forward_addr == 0) { - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); errno = ENOMEM; return -1; } - Event_Addr event_addr (this->id (), + Event_Key event_addr (this->id (), event->header_.supplier_id_, event->header_.type_); - // Copy the forwarding address from the Event_Addr into + // Copy the forwarding address from the Event_Key into // forward_addr. - forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr)); + forward_addr->copy ((char *) &event_addr, sizeof (Event_Key)); // Reset the pointer to indicate we've got an entire event. this->msg_frag_ = 0; @@ -579,8 +520,12 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) event->header_.len_, event->data_)); #else ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", - event->header_.supplier_id_, event->header_.len_, this->total_bytes ())); -#endif + event->header_.supplier_id_, event->header_.len_, data_received + header_received)); +#endif /* VERBOSE */ + + // Encode before returning so that we can set things out in + // network byte order. + event->header_.encode (); return data_received + header_received; } } @@ -620,79 +565,17 @@ Supplier_Proxy::handle_input (ACE_HANDLE) } } -// Forward an event to its appropriate Consumer(s). +// Forward an event to its appropriate Consumer(s). This delegates to +// the <ACE_Event_Channel> to do the actual forwarding. int Supplier_Proxy::forward (ACE_Message_Block *forward_addr) { - // We got a valid event, so determine its virtual forwarding - // address, which is stored in the first of the two event blocks, - // which are chained together by this->recv(). - - Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr (); - - // Skip over the address portion and get the data. - const ACE_Message_Block *const data = forward_addr->cont (); - - // <dispatch_set> points to the set of Consumers associated with - // this forwarding address. - Dispatch_Set *dispatch_set = 0; - - if (this->efd_->find (*forwarding_addr, dispatch_set) != -1) - { - // Check to see if there are any destinations. - if (dispatch_set->size () == 0) - ACE_DEBUG ((LM_WARNING, - "there are no active destinations for this event currently\n")); - - else // There are destinations, so forward the event. - { - Dispatch_Set_Iterator dsi (*dispatch_set); - - for (Proxy_Handler **proxy_handler = 0; - dsi.next (proxy_handler) != 0; - dsi.advance ()) - { - // Only process active proxy_handlers. - if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) - { - // Clone the event portion (should be doing - // reference counting here...) - ACE_Message_Block *newmsg = data->clone (); - - ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", - (*proxy_handler)->id ())); - - if ((*proxy_handler)->put (newmsg) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", - "gateway is flow controlled, so we're dropping events")); - else - ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", - "put", (*proxy_handler)->id ())); - - // We are responsible for freeing a - // ACE_Message_Block if failures occur. - delete newmsg; - } - } - } - // Will become superfluous once we have reference - // counting... - delete forward_addr; - return 0; - } - } - delete forward_addr; - // Failure return. - ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", - forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_)); - return 0; + return this->event_channel_.put (forward_addr); } #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>; -template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>; -template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>; +template class ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ |