diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-01 08:00:34 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-01 08:00:34 +0000 |
commit | ea0d28240863caf437a18071bfd03e7b146c5ade (patch) | |
tree | 91b695852b885a5f44f9be8c3a22bbf7f5a96b8d /apps/Gateway/Gateway/Event_Channel.cpp | |
parent | a6e2ced2f5279e011b712995095a1712a29e22f0 (diff) | |
download | ATCD-ea0d28240863caf437a18071bfd03e7b146c5ade.tar.gz |
foo
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 498 |
1 files changed, 235 insertions, 263 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index d146ddfb362..02f2cd465f8 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -2,38 +2,35 @@ // $Id$ #define ACE_BUILD_SVC_DLL -#include "ace/Get_Opt.h" -#include "Config_Files.h" #include "Proxy_Handler_Connector.h" #include "Event_Channel.h" -#if !defined (ACE_EVENT_CHANNEL_C) -#define ACE_EVENT_CHANNEL_C +ACE_Event_Channel_Options::ACE_Event_Channel_Options (void) + : performance_window_ (0), + blocking_semantics_ (ACE_NONBLOCK), + socket_queue_size_ (0) +{ +} -template <class SH, class CH> -ACE_Event_Channel<SH, CH>::~ACE_Event_Channel (void) +ACE_Event_Channel::~ACE_Event_Channel (void) { } -template <class SH, class CH> -ACE_Event_Channel<SH, CH>::ACE_Event_Channel (void) - : connection_config_file_ ("connection_config"), - consumer_config_file_ ("consumer_config"), - active_connector_role_ (1), - performance_window_ (0), - blocking_semantics_ (ACE_NONBLOCK), - debug_ (0), - connector_ (0), - socket_queue_size_ (0) +ACE_Event_Channel::ACE_Event_Channel (void) +{ +} + +ACE_Event_Channel_Options & +ACE_Event_Channel::options (void) { + return this->options_; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, - const void *) +ACE_Event_Channel::handle_timeout (const ACE_Time_Value &, + const void *) { ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // If we've got a ACE_Thread Manager then use it to suspend all the // threads. This will enable us to get an accurate count. @@ -47,17 +44,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, size_t total_bytes_in = 0; size_t total_bytes_out = 0; - // Iterate through the consumer map connecting all the Proxy_Handlers. + // Iterate through the connection map summing up the number of bytes + // sent/received. for (CONNECTION_MAP_ENTRY *me = 0; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { Proxy_Handler *proxy_handler = me->int_id_; - if (proxy_handler->direction () == 'C') + if (proxy_handler->proxy_role () == 'C') total_bytes_out += proxy_handler->total_bytes (); - else // proxy_handler->direction () == 'S' + else // proxy_handler->proxy_role () == 'S' total_bytes_in += proxy_handler->total_bytes (); } @@ -74,13 +72,13 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))); #else ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - this->performance_window_, - total_bytes_in, + this->options ().performance_window_, + total_bytes_in, total_bytes_out)); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)))); + (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)))); + (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); #endif /* ACE_NLOGGING */ #if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT) @@ -95,31 +93,177 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, return 0; } +ACE_Event_Channel::put (ACE_Message_Block *forward_addr, + ACE_Time_Value *) +{ + // We got a valid event, so determine its virtual forwarding + // address, which is stored in the first of the two event blocks, + // which are chained together by this->recv(). + + Event_Key *forwarding_addr = (Event_Key *) forward_addr->rd_ptr (); + + // Skip over the address portion and get the data. + ACE_Message_Block *data = forward_addr->cont (); + + // <dispatch_set> points to the set of Consumers associated with + // this forwarding address. + Consumer_Dispatch_Set *dispatch_set = 0; + + if (this->efd_.find (*forwarding_addr, dispatch_set) == -1) + // Failure. + ACE_ERROR ((LM_DEBUG, + "(%t) find failed on conn id = %d, logical id = %d, type = %d\n", + forwarding_addr->conn_id_, + forwarding_addr->supplier_id_, + forwarding_addr->type_)); + else + { + // Check to see if there are any consumers. + if (dispatch_set->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active consumers for this event currently\n")); + + else // There are consumers, so forward the event. + { + Consumer_Dispatch_Set_Iterator dsi (*dispatch_set); + + // At this point, we should assign a thread-safe locking + // strategy to the Message_Block is we're running in a + // multi-threaded configuration. + // data->locking_strategy (MB_Locking_Strategy::instance ()); + + for (Proxy_Handler **proxy_handler = 0; + dsi.next (proxy_handler) != 0; + dsi.advance ()) + { + // Only process active proxy_handlers. + if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) + { + // Duplicate the event portion via reference + // counting. + ACE_Message_Block *dup_msg = data->duplicate (); + + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", + (*proxy_handler)->id ())); + + if ((*proxy_handler)->put (dup_msg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", + "put", (*proxy_handler)->id ())); + + // We are responsible for releasing an + // ACE_Message_Block if failures occur. + dup_msg->release (); + } + } + } + } + } + + // Release the memory in the message block. + forward_addr->release (); + return 0; +} + +ACE_Event_Channel::svc (void) +{ + return 0; +} + +int +ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler, + ACE_Synch_Options &synch_options) +{ + return this->connector_.initiate_connection (proxy_handler, + synch_options); +} + +int +ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler) +{ + int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; + int socket_queue_size = this->options ().socket_queue_size_; + + if (proxy_handler->peer ().set_option (SOL_SOCKET, + option, + &socket_queue_size, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + + proxy_handler->thr_mgr (ACE_Service_Config::thr_mgr ()); + + // Our state is now "established." + proxy_handler->state (Proxy_Handler::ESTABLISHED); + + // Restart the timeout to 1. + proxy_handler->timeout (1); + + ACE_INT32 id = htonl (proxy_handler->id ()); + + // Send the connection id to the peerd. + + ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + n == 0 ? "peer has closed down unexpectedly" : "send"), + -1); +} + +// Restart connection (blocking_semantics dicates whether we restart +// synchronously or asynchronously). + +int +ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler) +{ + // Skip over deactivated descriptors. + if (proxy_handler->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + proxy_handler->peer ().close (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of Proxy_Handler %d\n", + proxy_handler->id ())); + + // Reschedule ourselves to try and connect again. + if (ACE_Service_Config::reactor ()->schedule_timer + (proxy_handler, 0, proxy_handler->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + return 0; +} + // Initiate connections with the Consumer and Supplier Peers. -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::initiate_connections (void) +ACE_Event_Channel::initiate_connections (void) { - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); ACE_Synch_Options synch_options; - if (this->blocking_semantics_ == ACE_NONBLOCK) + if (this->options ().blocking_semantics_ == ACE_NONBLOCK) synch_options = ACE_Synch_Options::asynch; else synch_options = ACE_Synch_Options::synch; - // Iterate through the Consumer Map connecting all the Proxy_Handlers. + // Iterate through the Consumer Map connecting all the + // Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { Proxy_Handler *proxy_handler = me->int_id_; - if (this->connector_->initiate_connection + if (this->initiate_proxy_connection (proxy_handler, synch_options) == -1) - continue; + continue; // Failures are handled elsewhere... } return 0; @@ -128,8 +272,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) // This method gracefully shuts down all the Handlers in the // Proxy_Handler Connection Map. -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::close (void) +ACE_Event_Channel::close (u_long) { #if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT) ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); @@ -147,7 +290,7 @@ ACE_Event_Channel<SH, CH>::close (void) { Proxy_Handler *proxy_handler = me->int_id_; - ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", + ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n", proxy_handler->id ())); if (proxy_handler->state () != Proxy_Handler::IDLE) @@ -159,247 +302,76 @@ ACE_Event_Channel<SH, CH>::close (void) proxy_handler->destroy (); // Will trigger a delete. } - // Free up the resources allocated dynamically by the ACE_Connector. - delete this->connector_; return 0; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::open (int argc, char *argv[]) +int +ACE_Event_Channel::find_proxy (ACE_INT32 conn_id, + Proxy_Handler *&proxy_handler) { - this->parse_args (argc, argv); - - ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1); - - // Ignore SIPPIPE so each Consumer_Proxy can handle it. - ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - - if (this->active_connector_role_) - { - // Parse the connection configuration file. - this->parse_connection_config_file (); - - // Parse the consumer map config file and build the consumer map. - this->parse_consumer_config_file (); - - // Initiate connections with the peers. - this->initiate_connections (); - } - - // If this->performance_window_ > 0 start a timer. - - if (this->performance_window_ > 0) - { - if (ACE_Service_Config::reactor ()->schedule_timer - (this, 0, this->performance_window_) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); - else - ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", - this->performance_window_)); - } - - return 0; + return this->connection_map_.find (conn_id, proxy_handler); } -// Parse and build the connection table. - -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) +int +ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler) { - // File that contains the consumer map configuration information. - Connection_Config_File_Parser connection_file; - Connection_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (connection_file.open (this->connection_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1); - - // Read config file one line at a time. - while (connection_file.read_entry (entry, line_number) != FP::EOFILE) + switch (this->connection_map_.bind (proxy_handler->id (), proxy_handler)) { - file_empty = 0; - - if (this->debug_) - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, " - "direction = %c, max retry timeout = %d, local port = %d\n", - entry.conn_id_, - entry.host_, - entry.remote_poconsumer_, - entry.direction_, - entry.max_retry_delay_, - entry.local_poconsumer_)); - - Proxy_Handler *proxy_handler = 0; - - // The next few lines of code are dependent on whether we are - // making an Supplier_Proxy or an Consumer_Proxy. - - if (entry.direction_ == 'C') // Configure a Consumer_Proxy. - ACE_NEW_RETURN (proxy_handler, - CONSUMER_HANDLER (&this->efd_, - this->connector_, - ACE_Service_Config::thr_mgr (), - this->socket_queue_size_), - -1); - else /* direction == 'S' */ // Configure a Supplier_Proxy. - ACE_NEW_RETURN (proxy_handler, - SUPPLIER_HANDLER (&this->efd_, - this->connector_, - ACE_Service_Config::thr_mgr (), - this->socket_queue_size_), - -1); - - // The following code is common to both Supplier_Proxys_ and - // Consumer_Proxys. - - // Initialize the routing entry's peer addressing info. - proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), - ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_); - - // Initialize max timeout. - proxy_handler->max_timeout (entry.max_retry_delay_); - - // Try to bind the new Proxy_Handler to the connection ID. - switch (this->connection_map_.bind (entry.conn_id_, proxy_handler)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, - "(%t) duplicate connection %d, already bound\n", - entry.conn_id_)); - break; - case 0: - // Success. - break; - } + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) bind failed for connection %d\n", + proxy_handler->id ()), -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) duplicate connection %d, already bound\n", + proxy_handler->id ()), -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: connection proxy_handler configuration file was empty\n")); - return 0; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void) +int +ACE_Event_Channel::subscribe (const Event_Key &event_addr, + Consumer_Dispatch_Set *cds) { - // File that contains the consumer map configuration information. - Consumer_Config_File_Parser consumer_file; - Consumer_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (consumer_file.open (this->consumer_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1); - - // Read config file line at a time. - while (consumer_file.read_entry (entry, line_number) != FP::EOFILE) + // Bind with consumer map, keyed by peer address. + switch (this->efd_.bind (event_addr, cds)) { - file_empty = 0; - - if (this->debug_) - { - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " - "number of destinations = %d\n", - entry.conn_id_, - entry.supplier_id_, - entry.type_, - entry.total_destinations_)); - for (int i = 0; i < entry.total_destinations_; i++) - ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - Dispatch_Set *dispatch_set; - ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1); - - Event_Addr event_addr (entry.conn_id_, - entry.supplier_id_, - entry.type_); - - // Add the destinations to the Routing Entry. - for (int i = 0; i < entry.total_destinations_; i++) - { - Proxy_Handler *proxy_handler = 0; - - // Lookup destination and add to Dispatch_Set set if found. - if (this->connection_map_.find (entry.destinations_[i], - proxy_handler) != -1) - dispatch_set->insert (proxy_handler); - else - ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - // Bind with consumer map, keyed by peer address. - switch (this->efd_.bind (event_addr, dispatch_set)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " - "already bound\n", entry.conn_id_)); - break; - case 0: - // Success. - break; - } + case -1: + ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", + event_addr.conn_id_), -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " + "already bound\n", event_addr.conn_id_), -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: consumer map configuration file was empty\n")); - return 0; } -// Parse the "command-line" arguments and set the corresponding flags. - -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_args (int argc, char *argv[]) +ACE_Event_Channel::open (void *) { - ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0); + // Ignore SIPPIPE so each Consumer_Proxy can handle it. + ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - for (int c; (c = get_opt ()) != -1; ) +#if 0 + // If this->performance_window_ > 0 start a timer. + + if (this->options ().performance_window_ > 0) { - switch (c) - { - case 'b': // Use blocking connection establishment. - this->blocking_semantics_ = 0; - break; - case 'c': - this->connection_config_file_ = get_opt.optarg; - break; - case 'd': - this->debug_ = 1; - break; - case 'p': - // We are not playing the active Connector role. - this->active_connector_role_ = 0; - break; - case 'q': - this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg); - break; - case 'r': - this->consumer_config_file_ = get_opt.optarg; - break; - case 'w': // Time performance for a designated amount of time. - this->performance_window_ = ACE_OS::atoi (get_opt.optarg); - // Use blocking connection semantics so that we get accurate - // timings (since all connections start at once). - this->blocking_semantics_ = 0; - break; - default: - break; - } + if (ACE_Service_Config::reactor ()->schedule_timer + (this, 0, this->options ().performance_window_) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); + else + ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", + this->options ().performance_window_))); } +#endif + return 0; } - -#endif /* ACE_EVENT_CHANNEL_C */ |