diff options
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 99 |
1 files changed, 73 insertions, 26 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 7c236c862a9..350a72a4c16 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -30,8 +30,12 @@ Event_Channel::compute_performance_statistics (void) != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads...")); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "suspend_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) suspending all threads...")); } size_t total_bytes_in = 0; @@ -74,10 +78,15 @@ Event_Channel::compute_performance_statistics (void) != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->resume_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads...")); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "resume_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) resuming all threads...")); } + return 0; } @@ -191,13 +200,15 @@ Event_Channel::routing_event (Event_Key *forwarding_address, // counting. ACE_Message_Block *dup_msg = data->duplicate (); - ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) forwarding to Consumer %d\n", (*connection_handler)->connection_id ())); if ((*connection_handler)->put (dup_msg) == -1) { if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", "gateway is flow controlled, so we're dropping events")); else ACE_ERROR ((LM_ERROR, @@ -216,11 +227,16 @@ Event_Channel::routing_event (Event_Key *forwarding_address, } int -Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler) +Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler, + int sync_directly) { ACE_Synch_Options synch_options; - if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) + if (sync_directly) + // In separated connection handler thread, connection can be + // initiated by block mode (synch mode) directly. + synch_options = ACE_Synch_Options::synch; + else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) synch_options = ACE_Synch_Options::asynch; else synch_options = ACE_Synch_Options::synch; @@ -232,15 +248,20 @@ Event_Channel::initiate_connection_connection (Connection_Handler *connection_ha int Event_Channel::complete_connection_connection (Connection_Handler *connection_handler) { - int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; - int socket_queue_size = Options::instance ()->socket_queue_size (); + int option = connection_handler->connection_role () == 'S' + ? SO_RCVBUF + : SO_SNDBUF; + int socket_queue_size = + Options::instance ()->socket_queue_size (); if (socket_queue_size > 0) if (connection_handler->peer ().set_option (SOL_SOCKET, - option, - &socket_queue_size, - sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + option, + &socket_queue_size, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "set_option")); connection_handler->thr_mgr (ACE_Thread_Manager::instance ()); @@ -254,10 +275,12 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha // Send the connection id to the peerd. - ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id); + ssize_t n = connection_handler->peer ().send ((const void *) &id, + sizeof id); if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", n == 0 ? "peer has closed down unexpectedly" : "send"), -1); return 0; @@ -269,10 +292,9 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha int Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler) { - // Skip over proxies with deactivated handles. - if (connection_handler->get_handle () != ACE_INVALID_HANDLE) - // Make sure to close down peer to reclaim descriptor. - connection_handler->peer ().close (); + // Cancel asynchronous connecting before re-initializing. It will + // close the peer and cancel the asynchronous connecting. + this->cancel_connection_connection(connection_handler); if (connection_handler->state () != Connection_Handler::DISCONNECTING) { @@ -282,9 +304,30 @@ Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_ // Reschedule ourselves to try and connect again. if (ACE_Reactor::instance ()->schedule_timer - (connection_handler, 0, connection_handler->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); + (connection_handler, + 0, + connection_handler->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "schedule_timer"), + -1); + } + return 0; +} + +// It is useful to provide a separate method to cancel the +// asynchronous connecting. + +int +Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler) +{ + // Skip over proxies with deactivated handles. + if (connection_handler->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + connection_handler->peer ().close (); + // Cancel asynchronous connecting before re-initializing. + return this->connector_.cancel(connection_handler); } return 0; } @@ -338,7 +381,7 @@ Event_Channel::initiate_acceptors (void) } if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)) { - if (this->supplier_acceptor_.open + if(this->supplier_acceptor_.open (Options::instance ()->supplier_acceptor_port (), ACE_Reactor::instance (), Options::instance ()->blocking_semantics ()) == -1) @@ -378,7 +421,7 @@ Event_Channel::close (u_long) // Iterate over all the handlers and shut them down. - for (CONNECTION_MAP_ENTRY *me; + for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. cmi.next (me) != 0; cmi.advance ()) { @@ -388,6 +431,10 @@ Event_Channel::close (u_long) "(%t) closing down connection %d\n", connection_handler->connection_id ())); + // If have no this statement, the gatewayd will abort when exiting + // with some Consumer/Supplier not connected. + if (connection_handler->state()==Connection_Handler::CONNECTING) + this->cancel_connection_connection(connection_handler); // Mark Connection_Handler as DISCONNECTING so we don't try to // reconnect... connection_handler->state (Connection_Handler::DISCONNECTING); @@ -407,7 +454,7 @@ Event_Channel::close (u_long) { CONNECTION_MAP_ITERATOR cmi (this->connection_map_); - for (CONNECTION_MAP_ENTRY *me; + for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. cmi.next (me) != 0; cmi.advance ()) { |