diff options
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 548 |
1 files changed, 0 insertions, 548 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp deleted file mode 100644 index 3204b371ee7..00000000000 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ /dev/null @@ -1,548 +0,0 @@ -// $Id$ - -#define ACE_BUILD_SVC_DLL - -#include "Connection_Handler_Connector.h" -#include "Event_Channel.h" - -ACE_RCSID(Gateway, Event_Channel, "$Id$") - -Event_Channel::~Event_Channel (void) -{ -} - -Event_Channel::Event_Channel (void) - : supplier_acceptor_ (*this, 'S'), - consumer_acceptor_ (*this, 'C') -{ -} - -int -Event_Channel::compute_performance_statistics (void) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); - CONNECTION_MAP_ITERATOR cmi (this->connection_map_); - - // If we've got a <ACE_Thread_Manager> then use it to suspend all - // the threads. This will enable us to get an accurate count. - - if (Options::instance ()->threading_strategy () - != Options::REACTIVE) - { - if (ACE_Thread_Manager::instance ()->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads...")); - } - - size_t total_bytes_in = 0; - size_t total_bytes_out = 0; - - // Iterate through the connection map summing up the number of bytes - // sent/received. - - for (CONNECTION_MAP_ENTRY *me = 0; - cmi.next (me) != 0; - cmi.advance ()) - { - Connection_Handler *connection_handler = me->int_id_; - - if (connection_handler->connection_role () == 'C') - total_bytes_out += connection_handler->total_bytes (); - else // connection_handler->connection_role () == 'S' - total_bytes_in += connection_handler->total_bytes (); - } - - ACE_DEBUG ((LM_DEBUG, - "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - Options::instance ()->performance_window (), - total_bytes_in, - total_bytes_out)); - - ACE_DEBUG ((LM_DEBUG, - "(%t) %f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / - (float) (1024 * 1024 * Options::instance ()->performance_window ())))); - - ACE_DEBUG ((LM_DEBUG, - "(%t) %f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / - (float) (1024 * 1024 * Options::instance ()->performance_window ())))); - - // Resume all the threads again. - - if (Options::instance ()->threading_strategy () - != Options::REACTIVE) - { - if (ACE_Thread_Manager::instance ()->resume_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads...")); - } - - return 0; -} - -int -Event_Channel::handle_timeout (const ACE_Time_Value &, - const void *) -{ - // This is called periodically to compute performance statistics. - return this->compute_performance_statistics (); -} - -int -Event_Channel::put (ACE_Message_Block *event, - ACE_Time_Value *) -{ - // We got a valid event, so determine its type, which is stored in - // the first of the two <ACE_Message_Block>s, which are chained - // together by <ACE::recv>. - Event_Key *event_key = (Event_Key *) event->rd_ptr (); - - // Skip over the address portion and get the data, which is in the - // second <ACE_Message_Block>. - ACE_Message_Block *data = event->cont (); - - switch (event_key->type_) - { - case ROUTING_EVENT: - this->routing_event (event_key, - data); - break; - case SUBSCRIPTION_EVENT: - this->subscription_event (data); - break; - } - - // Release the memory in the message block. - event->release (); - return 0; -} - -void -Event_Channel::subscription_event (ACE_Message_Block *data) -{ - Event *event = (Event *) data->rd_ptr (); - - ACE_DEBUG ((LM_DEBUG, - "(%t) received a subscription with %d bytes from connection id %d\n", - event->header_.len_, - event->header_.connection_id_)); - Subscription *subscription = (Subscription *) event->data_; - // Convert the subscription into host byte order so that we can - // access it directly without having to repeatedly muck with it... - subscription->decode (); - - ACE_DEBUG ((LM_DEBUG, - "(%t) connection_id_ = %d, total_consumers_ = %d\n", - subscription->connection_id_, - subscription->total_consumers_)); - - for (ACE_INT32 i = 0; - i < subscription->total_consumers_; - i++) - ACE_DEBUG ((LM_DEBUG, - "(%t) consumers_[%d] = %d\n", - i, - subscription->consumers_[i])); - -} - -void -Event_Channel::routing_event (Event_Key *forwarding_address, - ACE_Message_Block *data) -{ - Consumer_Dispatch_Set *dispatch_set = 0; - - // Initialize the <dispatch_set> to points to the set of Consumers - // associated with this forwarding address. - - if (this->efd_.find (*forwarding_address, - dispatch_set) == -1) - // Failure. - ACE_ERROR ((LM_DEBUG, - "(%t) find failed on connection id = %d, type = %d\n", - forwarding_address->connection_id_, - forwarding_address->type_)); - else - { - // Check to see if there are any consumers. - if (dispatch_set->size () == 0) - ACE_DEBUG ((LM_WARNING, - "there are no active consumers for this event currently\n")); - - else // There are consumers, so forward the event. - { - // Initialize the interator. - Consumer_Dispatch_Set_Iterator dsi (*dispatch_set); - - // At this point, we should assign a thread-safe locking - // strategy to the <ACE_Message_Block> is we're running in a - // multi-threaded configuration. - data->locking_strategy (Options::instance ()->locking_strategy ()); - - for (Connection_Handler **connection_handler = 0; - dsi.next (connection_handler) != 0; - dsi.advance ()) - { - // Only process active connection_handlers. - if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED) - { - // Duplicate the event portion via reference - // counting. - ACE_Message_Block *dup_msg = data->duplicate (); - - ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n", - (*connection_handler)->connection_id ())); - - if ((*connection_handler)->put (dup_msg) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", - "gateway is flow controlled, so we're dropping events")); - else - ACE_ERROR ((LM_ERROR, - "(%t) %p transmission error to peer %d\n", - "put", - (*connection_handler)->connection_id ())); - - // We are responsible for releasing an - // ACE_Message_Block if failures occur. - dup_msg->release (); - } - } - } - } - } -} - -int -Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler) -{ - ACE_Synch_Options synch_options; - - if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) - synch_options = ACE_Synch_Options::asynch; - else - synch_options = ACE_Synch_Options::synch; - - return this->connector_.initiate_connection (connection_handler, - synch_options); -} - -int -Event_Channel::complete_connection_connection (Connection_Handler *connection_handler) -{ - int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; - int socket_queue_size = Options::instance ()->socket_queue_size (); - - if (socket_queue_size > 0) - if (connection_handler->peer ().set_option (SOL_SOCKET, - option, - &socket_queue_size, - sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); - - connection_handler->thr_mgr (ACE_Thread_Manager::instance ()); - - // Our state is now "established." - connection_handler->state (Connection_Handler::ESTABLISHED); - - // Restart the timeout to 1. - connection_handler->timeout (1); - - ACE_INT32 id = htonl (connection_handler->connection_id ()); - - // Send the connection id to the peerd. - - ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id); - - if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - n == 0 ? "peer has closed down unexpectedly" : "send"), - -1); - return 0; -} - -// Restart connection (blocking_semantics dicates whether we restart -// synchronously or asynchronously). - -int -Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler) -{ - // Skip over proxies with deactivated handles. - if (connection_handler->get_handle () != ACE_INVALID_HANDLE) - // Make sure to close down peer to reclaim descriptor. - connection_handler->peer ().close (); - - if (connection_handler->state () != Connection_Handler::DISCONNECTING) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) scheduling reinitiation of Connection_Handler %d\n", - connection_handler->connection_id ())); - - // Reschedule ourselves to try and connect again. - if (ACE_Reactor::instance ()->schedule_timer - (connection_handler, 0, connection_handler->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - return 0; -} - -// Initiate active connections with the Consumer and Supplier Peers. - -void -Event_Channel::initiate_connector (void) -{ - if (Options::instance ()->enabled - (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) - { - CONNECTION_MAP_ITERATOR cmi (this->connection_map_); - - // Iterate through the Consumer Map connecting all the - // Connection_Handlers. - - for (CONNECTION_MAP_ENTRY *me = 0; - cmi.next (me) != 0; - cmi.advance ()) - { - Connection_Handler *connection_handler = me->int_id_; - - if (this->initiate_connection_connection (connection_handler) == -1) - continue; // Failures are handled elsewhere... - } - } -} - -// Initiate passive acceptor to wait for Consumer and Supplier Peers -// to accept. - -int -Event_Channel::initiate_acceptors (void) -{ - if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR) - && this->consumer_acceptor_.open - (Options::instance ()->consumer_acceptor_port (), - ACE_Reactor::instance (), - Options::instance ()->blocking_semantics ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "cannot register acceptor"), - -1); - else - ACE_DEBUG ((LM_DEBUG, - "accepting Consumers at %d\n", - Options::instance ()->consumer_acceptor_port ())); - - if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR) - && this->supplier_acceptor_.open - (Options::instance ()->supplier_acceptor_port (), - ACE_Reactor::instance (), - Options::instance ()->blocking_semantics ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "cannot register acceptor"), - -1); - else - ACE_DEBUG ((LM_DEBUG, - "accepting Suppliers at %d\n", - Options::instance ()->supplier_acceptor_port ())); - - return 0; -} - -// This method gracefully shuts down all the Handlers in the -// Connection_Handler Connection Map. - -int -Event_Channel::close (u_long) -{ - if (Options::instance ()->threading_strategy () != Options::REACTIVE) - { - if (ACE_Thread_Manager::instance ()->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) %p\n", - "suspend_all"), - -1); - ACE_DEBUG ((LM_DEBUG, - "(%t) suspending all threads\n")); - } - - // First tell everyone that the spaceship is here... - { - CONNECTION_MAP_ITERATOR cmi (this->connection_map_); - - // Iterate over all the handlers and shut them down. - - for (CONNECTION_MAP_ENTRY *me; - cmi.next (me) != 0; - cmi.advance ()) - { - Connection_Handler *connection_handler = me->int_id_; - - ACE_DEBUG ((LM_DEBUG, - "(%t) closing down connection %d\n", - connection_handler->connection_id ())); - - // Mark Connection_Handler as DISCONNECTING so we don't try to - // reconnect... - connection_handler->state (Connection_Handler::DISCONNECTING); - } - } - - // Close down the connector - this->connector_.close (); - - // Close down the supplier acceptor. - this->supplier_acceptor_.close (); - - // Close down the consumer acceptor. - this->consumer_acceptor_.close (); - - // Now tell everyone that it is now time to commit suicide. - { - CONNECTION_MAP_ITERATOR cmi (this->connection_map_); - - for (CONNECTION_MAP_ENTRY *me; - cmi.next (me) != 0; - cmi.advance ()) - { - Connection_Handler *connection_handler = me->int_id_; - - // Deallocate Connection_Handler resources. - connection_handler->destroy (); // Will trigger a delete. - } - } - - return 0; -} - -int -Event_Channel::find_proxy (ACE_INT32 connection_id, - Connection_Handler *&connection_handler) -{ - return this->connection_map_.find (connection_id, - connection_handler); -} - -int -Event_Channel::bind_proxy (Connection_Handler *connection_handler) -{ - int result = this->connection_map_.bind (connection_handler->connection_id (), - connection_handler); - - switch (result) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) bind failed for connection %d\n", - connection_handler->connection_id ()), - -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) duplicate connection %d, already bound\n", - connection_handler->connection_id ()), - -1); - /* NOTREACHED */ - case 0: - // Success. - return 0; - /* NOTREACHED */ - default: - ACE_ERROR_RETURN ((LM_DEBUG, - "(%t) invalid result %d\n", - result), - -1); - /* NOTREACHED */ - } - - ACE_NOTREACHED (return 0); -} - -int -Event_Channel::subscribe (const Event_Key &event_addr, - Consumer_Dispatch_Set *cds) -{ - int result = this->efd_.bind (event_addr, cds); - - // Bind with consumer map, keyed by peer address. - switch (result) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) bind failed for connection %d\n", - event_addr.connection_id_), - -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_ERROR_RETURN ((LM_DEBUG, - "(%t) duplicate consumer map entry %d, " - "already bound\n", - event_addr.connection_id_), - -1); - /* NOTREACHED */ - case 0: - // Success. - return 0; - default: - ACE_ERROR_RETURN ((LM_DEBUG, - "(%t) invalid result %d\n", - result), - -1); - /* NOTREACHED */ - } - - ACE_NOTREACHED (return 0); -} - -int -Event_Channel::open (void *) -{ - // Ignore <SIGPIPE> so each <Consumer_Handler> can handle it. - ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE); - ACE_UNUSED_ARG (sig); - - // Actively initiate Peer connections. - this->initiate_connector (); - - // Passively initiate Peer acceptor. - if (this->initiate_acceptors () == -1) - return -1; - - // If we're not running reactively, then we need to make sure that - // <ACE_Message_Block> reference counting operations are - // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is - // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions. - if (Options::instance ()->threading_strategy () - != Options::REACTIVE) - { - ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la; - - ACE_NEW_RETURN (la, - ACE_Lock_Adapter<ACE_SYNCH_MUTEX>, - -1); - - Options::instance ()->locking_strategy (la); - } - - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>; -template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>; -template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>; -template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>; -template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>; -template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>; -template class ACE_Unbounded_Set_Iterator<Connection_Handler *>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX> -#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *> -#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |