diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-01-02 01:04:38 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-01-02 01:04:38 +0000 |
commit | 895521b8424ba7eac3dcd246338d9211bfbf1332 (patch) | |
tree | b41e7a860bd9ad0f95ec4d7a330044b5c345c785 /apps/Gateway/Gateway/Event_Channel.cpp | |
parent | f708a7b22cb6e8c8733ff3cc517b25e626f3dd39 (diff) | |
download | ATCD-895521b8424ba7eac3dcd246338d9211bfbf1332.tar.gz |
*** empty log message ***
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 138 |
1 files changed, 74 insertions, 64 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 55e1f769af2..0e05f5ef67c 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -5,39 +5,16 @@ #include "Proxy_Handler_Connector.h" #include "Event_Channel.h" -ACE_Event_Channel_Options::ACE_Event_Channel_Options (void) - : locking_strategy_ (0), - performance_window_ (0), - blocking_semantics_ (ACE_NONBLOCK), - socket_queue_size_ (0), - threading_strategy_ (REACTIVE), - acceptor_port_ (ACE_DEFAULT_GATEWAY_SERVER_PORT), - connector_role_ (0), - acceptor_role_ (0), - verbose_ (0) -{ -} - -ACE_Event_Channel_Options::~ACE_Event_Channel_Options (void) -{ - delete this->locking_strategy_; -} - ACE_Event_Channel::~ACE_Event_Channel (void) { } ACE_Event_Channel::ACE_Event_Channel (void) - : acceptor_ (*this) + : supplier_acceptor_ (*this), + consumer_acceptor_ (*this) { } -ACE_Event_Channel_Options & -ACE_Event_Channel::options (void) -{ - return this->options_; -} - int ACE_Event_Channel::compute_performance_statistics (void) { @@ -47,7 +24,8 @@ ACE_Event_Channel::compute_performance_statistics (void) // If we've got a ACE_Thread Manager then use it to suspend all the // threads. This will enable us to get an accurate count. - if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE) + if (Options::instance ()->threading_strategy () + != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->suspend_all () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); @@ -86,18 +64,23 @@ ACE_Event_Channel::compute_performance_statistics (void) (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))); #else ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - this->options ().performance_window_, + Options::instance ()->performance_window (), total_bytes_in, total_bytes_out)); - ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); - ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); + ACE_DEBUG ((LM_DEBUG, + "(%t) %f Mbits/sec received.\n", + (float) (total_bytes_in * 8 / + (float) (1024 * 1024 * Options::instance ()->performance_window ())))); + ACE_DEBUG ((LM_DEBUG, + "(%t) %f Mbits/sec sent.\n", + (float) (total_bytes_out * 8 / + (float) (1024 * 1024 * Options::instance ()->performance_window ())))); #endif /* ACE_NLOGGING */ // Resume all the threads again. - if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE) + if (Options::instance ()->threading_strategy () + != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->resume_all () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1); @@ -205,7 +188,7 @@ ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler) { ACE_Synch_Options synch_options; - if (this->options ().blocking_semantics_ == ACE_NONBLOCK) + if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) synch_options = ACE_Synch_Options::asynch; else synch_options = ACE_Synch_Options::synch; @@ -218,7 +201,7 @@ int ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler) { int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; - int socket_queue_size = this->options ().socket_queue_size_; + int socket_queue_size = Options::instance ()->socket_queue_size (); if (socket_queue_size > 0) if (proxy_handler->peer ().set_option (SOL_SOCKET, @@ -281,19 +264,22 @@ ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler) void ACE_Event_Channel::initiate_connector (void) { - PROXY_MAP_ITERATOR cmi (this->proxy_map_); + if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) + { + PROXY_MAP_ITERATOR cmi (this->proxy_map_); - // Iterate through the Consumer Map connecting all the - // Proxy_Handlers. + // Iterate through the Consumer Map connecting all the + // Proxy_Handlers. - for (PROXY_MAP_ENTRY *me = 0; - cmi.next (me) != 0; - cmi.advance ()) - { - Proxy_Handler *proxy_handler = me->int_id_; + for (PROXY_MAP_ENTRY *me = 0; + cmi.next (me) != 0; + cmi.advance ()) + { + Proxy_Handler *proxy_handler = me->int_id_; - if (this->initiate_proxy_connection (proxy_handler) == -1) - continue; // Failures are handled elsewhere... + if (this->initiate_proxy_connection (proxy_handler) == -1) + continue; // Failures are handled elsewhere... + } } } @@ -301,11 +287,21 @@ ACE_Event_Channel::initiate_connector (void) // to accept. void -ACE_Event_Channel::initiate_acceptor (void) +ACE_Event_Channel::initiate_acceptors (void) { - if (this->acceptor_.open (this->options ().acceptor_port_, - ACE_Reactor::instance (), - this->options ().blocking_semantics_) == -1) + if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR) + && this->consumer_acceptor_.open + (Options::instance ()->consumer_acceptor_port (), + ACE_Reactor::instance (), + Options::instance ()->blocking_semantics ()) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", + "cannot register acceptor")); + + if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR) + && this->supplier_acceptor_.open + (Options::instance ()->supplier_acceptor_port (), + ACE_Reactor::instance (), + Options::instance ()->blocking_semantics ()) == -1) ACE_ERROR ((LM_ERROR, "%p\n", "cannot register acceptor")); } @@ -316,14 +312,15 @@ ACE_Event_Channel::initiate_acceptor (void) int ACE_Event_Channel::close (u_long) { - if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE) + if (Options::instance ()->threading_strategy () + != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->suspend_all () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); } - // Tell everyone that the spaceship is here. + // First tell everyone that the spaceship is here... { PROXY_MAP_ITERATOR cmi (this->proxy_map_); @@ -345,12 +342,15 @@ ACE_Event_Channel::close (u_long) } // Close down the connector - connector_.close (); + this->connector_.close (); - // Close down the acceptor - acceptor_.close (); + // Close down the supplier acceptor. + this->supplier_acceptor_.close (); - // Tell everyone that it is now time to commit suicide. + // Close down the consumer acceptor. + this->consumer_acceptor_.close (); + + // Now tell everyone that it is now time to commit suicide. { PROXY_MAP_ITERATOR cmi (this->proxy_map_); @@ -395,11 +395,14 @@ ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler) case 0: // Success. return 0; + /* NOTREACHED */ default: ACE_ERROR_RETURN ((LM_DEBUG, "(%t) invalid result %d\n", result), -1); /* NOTREACHED */ } + + return 0; } int @@ -429,6 +432,8 @@ ACE_Event_Channel::subscribe (const Event_Key &event_addr, "(%t) invalid result %d\n", result), -1); /* NOTREACHED */ } + + return 0; } int @@ -438,22 +443,27 @@ ACE_Event_Channel::open (void *) ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); ACE_UNUSED_ARG (sig); - if (this->options ().connector_role_) - // Actively initiate Peer connections. - this->initiate_connector (); + // Actively initiate Peer connections. + this->initiate_connector (); - if (this->options ().acceptor_role_) - // Passively initiate Peer acceptor. - this->initiate_acceptor (); + // Passively initiate Peer acceptor. + this->initiate_acceptors (); // If we're not running reactively, then we need to make sure that // <ACE_Message_Block> reference counting operations are // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions. - if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE) - ACE_NEW_RETURN (this->options ().locking_strategy_, - ACE_Lock_Adapter<ACE_SYNCH_MUTEX>, - -1); + if (Options::instance ()->threading_strategy () + != Options::REACTIVE) + { + ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la; + + ACE_NEW_RETURN (la, + ACE_Lock_Adapter<ACE_SYNCH_MUTEX>, + -1); + + Options::instance ()->locking_strategy (la); + } return 0; } |