diff options
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 106 |
1 files changed, 52 insertions, 54 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 815755216c7..d146ddfb362 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -1,9 +1,10 @@ /* -*- C++ -*- */ // $Id$ +#define ACE_BUILD_SVC_DLL #include "ace/Get_Opt.h" #include "Config_Files.h" -#include "IO_Handler_Connector.h" +#include "Proxy_Handler_Connector.h" #include "Event_Channel.h" #if !defined (ACE_EVENT_CHANNEL_C) @@ -46,18 +47,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, size_t total_bytes_in = 0; size_t total_bytes_out = 0; - // Iterate through the consumer map connecting all the IO_Handlers. + // Iterate through the consumer map connecting all the Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; cti.next (me) != 0; cti.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; - if (io_handler->direction () == 'C') - total_bytes_out += io_handler->total_bytes (); - else // io_handler->direction () == 'S' - total_bytes_in += io_handler->total_bytes (); + if (proxy_handler->direction () == 'C') + total_bytes_out += proxy_handler->total_bytes (); + else // proxy_handler->direction () == 'S' + total_bytes_in += proxy_handler->total_bytes (); } #if defined (ACE_NLOGGING) @@ -108,16 +109,16 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) else synch_options = ACE_Synch_Options::synch; - // Iterate through the Consumer Map connecting all the IO_Handlers. + // Iterate through the Consumer Map connecting all the Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; cti.next (me) != 0; cti.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; if (this->connector_->initiate_connection - (io_handler, synch_options) == -1) + (proxy_handler, synch_options) == -1) continue; } @@ -125,7 +126,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) } // This method gracefully shuts down all the Handlers in the -// IO_Handler Connection Map. +// Proxy_Handler Connection Map. template <class SH, class CH> int ACE_Event_Channel<SH, CH>::close (void) @@ -136,26 +137,26 @@ ACE_Event_Channel<SH, CH>::close (void) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); #endif /* USE_INPUT_MT || USE_OUTPUT_MT */ - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // Iterate over all the handlers and shut them down. for (CONNECTION_MAP_ENTRY *me; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", - io_handler->id ())); + proxy_handler->id ())); - if (io_handler->state () != IO_Handler::IDLE) - // Mark IO_Handler as DISCONNECTING so we don't try to + if (proxy_handler->state () != Proxy_Handler::IDLE) + // Mark Proxy_Handler as DISCONNECTING so we don't try to // reconnect... - io_handler->state (IO_Handler::DISCONNECTING); + proxy_handler->state (Proxy_Handler::DISCONNECTING); - // Deallocate IO_Handler resources. - io_handler->destroy (); // Will trigger a delete. + // Deallocate Proxy_Handler resources. + proxy_handler->destroy (); // Will trigger a delete. } // Free up the resources allocated dynamically by the ACE_Connector. @@ -168,7 +169,10 @@ ACE_Event_Channel<SH, CH>::open (int argc, char *argv[]) { this->parse_args (argc, argv); - ACE_NEW_RETURN (this->connector_, IO_Handler_Connector (), -1); + ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1); + + // Ignore SIPPIPE so each Consumer_Proxy can handle it. + ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); if (this->active_connector_role_) { @@ -226,38 +230,38 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) entry.max_retry_delay_, entry.local_poconsumer_)); - IO_Handler *io_handler = 0; + Proxy_Handler *proxy_handler = 0; // The next few lines of code are dependent on whether we are - // making an Supplier_Handler or an Consumer_Handler. + // making an Supplier_Proxy or an Consumer_Proxy. - if (entry.direction_ == 'C') // Configure a Consumer_Handler. - ACE_NEW_RETURN (io_handler, - CONSUMER_HANDLER (&this->consumer_map_, + if (entry.direction_ == 'C') // Configure a Consumer_Proxy. + ACE_NEW_RETURN (proxy_handler, + CONSUMER_HANDLER (&this->efd_, this->connector_, ACE_Service_Config::thr_mgr (), this->socket_queue_size_), -1); - else /* direction == 'S' */ // Configure a Supplier_Handler. - ACE_NEW_RETURN (io_handler, - SUPPLIER_HANDLER (&this->consumer_map_, + else /* direction == 'S' */ // Configure a Supplier_Proxy. + ACE_NEW_RETURN (proxy_handler, + SUPPLIER_HANDLER (&this->efd_, this->connector_, ACE_Service_Config::thr_mgr (), this->socket_queue_size_), -1); - // The following code is common to both Supplier_Handlers_ and - // Consumer_Handlers. + // The following code is common to both Supplier_Proxys_ and + // Consumer_Proxys. // Initialize the routing entry's peer addressing info. - io_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), + proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_); // Initialize max timeout. - io_handler->max_timeout (entry.max_retry_delay_); + proxy_handler->max_timeout (entry.max_retry_delay_); - // Try to bind the new IO_Handler to the connection ID. - switch (this->connection_map_.bind (entry.conn_id_, io_handler)) + // Try to bind the new Proxy_Handler to the connection ID. + switch (this->connection_map_.bind (entry.conn_id_, proxy_handler)) { case -1: ACE_ERROR_RETURN ((LM_ERROR, @@ -277,7 +281,7 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) if (file_empty) ACE_ERROR ((LM_WARNING, - "warning: connection io_handler configuration file was empty\n")); + "warning: connection proxy_handler configuration file was empty\n")); return 0; } @@ -303,43 +307,37 @@ ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void) ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " "number of destinations = %d\n", entry.conn_id_, - entry.logical_id_, - entry.payload_type_, + entry.supplier_id_, + entry.type_, entry.total_destinations_)); for (int i = 0; i < entry.total_destinations_; i++) ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", i, entry.destinations_[i])); } - Consumer_Entry *re; - ACE_NEW_RETURN (re, Consumer_Entry, -1); - - Consumer_Entry::ENTRY_SET *io_handler_set; - ACE_NEW_RETURN (io_handler_set, Consumer_Entry::ENTRY_SET, -1); + Dispatch_Set *dispatch_set; + ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1); Event_Addr event_addr (entry.conn_id_, - entry.logical_id_, - entry.payload_type_); + entry.supplier_id_, + entry.type_); // Add the destinations to the Routing Entry. for (int i = 0; i < entry.total_destinations_; i++) { - IO_Handler *io_handler = 0; + Proxy_Handler *proxy_handler = 0; - // Lookup destination and add to Consumer_Entry set if found. + // Lookup destination and add to Dispatch_Set set if found. if (this->connection_map_.find (entry.destinations_[i], - io_handler) != -1) - io_handler_set->insert (io_handler); + proxy_handler) != -1) + dispatch_set->insert (proxy_handler); else ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", i, entry.destinations_[i])); } - // Attach set of destination io_handlers to routing entry. - re->destinations (io_handler_set); - // Bind with consumer map, keyed by peer address. - switch (this->consumer_map_.bind (event_addr, re)) + switch (this->efd_.bind (event_addr, dispatch_set)) { case -1: ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", |