summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Event_Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp99
1 files changed, 73 insertions, 26 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 7c236c862a9..350a72a4c16 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -30,8 +30,12 @@ Event_Channel::compute_performance_statistics (void)
!= Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads..."));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "suspend_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) suspending all threads..."));
}
size_t total_bytes_in = 0;
@@ -74,10 +78,15 @@ Event_Channel::compute_performance_statistics (void)
!= Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->resume_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads..."));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "resume_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) resuming all threads..."));
}
+
return 0;
}
@@ -191,13 +200,15 @@ Event_Channel::routing_event (Event_Key *forwarding_address,
// counting.
ACE_Message_Block *dup_msg = data->duplicate ();
- ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) forwarding to Consumer %d\n",
(*connection_handler)->connection_id ()));
if ((*connection_handler)->put (dup_msg) == -1)
{
if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
"gateway is flow controlled, so we're dropping events"));
else
ACE_ERROR ((LM_ERROR,
@@ -216,11 +227,16 @@ Event_Channel::routing_event (Event_Key *forwarding_address,
}
int
-Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler)
+Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler,
+ int sync_directly)
{
ACE_Synch_Options synch_options;
- if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
+ if (sync_directly)
+ // In separated connection handler thread, connection can be
+ // initiated by block mode (synch mode) directly.
+ synch_options = ACE_Synch_Options::synch;
+ else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
synch_options = ACE_Synch_Options::asynch;
else
synch_options = ACE_Synch_Options::synch;
@@ -232,15 +248,20 @@ Event_Channel::initiate_connection_connection (Connection_Handler *connection_ha
int
Event_Channel::complete_connection_connection (Connection_Handler *connection_handler)
{
- int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
- int socket_queue_size = Options::instance ()->socket_queue_size ();
+ int option = connection_handler->connection_role () == 'S'
+ ? SO_RCVBUF
+ : SO_SNDBUF;
+ int socket_queue_size =
+ Options::instance ()->socket_queue_size ();
if (socket_queue_size > 0)
if (connection_handler->peer ().set_option (SOL_SOCKET,
- option,
- &socket_queue_size,
- sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
+ option,
+ &socket_queue_size,
+ sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "set_option"));
connection_handler->thr_mgr (ACE_Thread_Manager::instance ());
@@ -254,10 +275,12 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha
// Send the connection id to the peerd.
- ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id);
+ ssize_t n = connection_handler->peer ().send ((const void *) &id,
+ sizeof id);
if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
n == 0 ? "peer has closed down unexpectedly" : "send"),
-1);
return 0;
@@ -269,10 +292,9 @@ Event_Channel::complete_connection_connection (Connection_Handler *connection_ha
int
Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler)
{
- // Skip over proxies with deactivated handles.
- if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
- // Make sure to close down peer to reclaim descriptor.
- connection_handler->peer ().close ();
+ // Cancel asynchronous connecting before re-initializing. It will
+ // close the peer and cancel the asynchronous connecting.
+ this->cancel_connection_connection(connection_handler);
if (connection_handler->state () != Connection_Handler::DISCONNECTING)
{
@@ -282,9 +304,30 @@ Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_
// Reschedule ourselves to try and connect again.
if (ACE_Reactor::instance ()->schedule_timer
- (connection_handler, 0, connection_handler->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
+ (connection_handler,
+ 0,
+ connection_handler->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "schedule_timer"),
+ -1);
+ }
+ return 0;
+}
+
+// It is useful to provide a separate method to cancel the
+// asynchronous connecting.
+
+int
+Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler)
+{
+ // Skip over proxies with deactivated handles.
+ if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ connection_handler->peer ().close ();
+ // Cancel asynchronous connecting before re-initializing.
+ return this->connector_.cancel(connection_handler);
}
return 0;
}
@@ -338,7 +381,7 @@ Event_Channel::initiate_acceptors (void)
}
if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR))
{
- if (this->supplier_acceptor_.open
+ if(this->supplier_acceptor_.open
(Options::instance ()->supplier_acceptor_port (),
ACE_Reactor::instance (),
Options::instance ()->blocking_semantics ()) == -1)
@@ -378,7 +421,7 @@ Event_Channel::close (u_long)
// Iterate over all the handlers and shut them down.
- for (CONNECTION_MAP_ENTRY *me;
+ for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
cmi.next (me) != 0;
cmi.advance ())
{
@@ -388,6 +431,10 @@ Event_Channel::close (u_long)
"(%t) closing down connection %d\n",
connection_handler->connection_id ()));
+ // If have no this statement, the gatewayd will abort when exiting
+ // with some Consumer/Supplier not connected.
+ if (connection_handler->state()==Connection_Handler::CONNECTING)
+ this->cancel_connection_connection(connection_handler);
// Mark Connection_Handler as DISCONNECTING so we don't try to
// reconnect...
connection_handler->state (Connection_Handler::DISCONNECTING);
@@ -407,7 +454,7 @@ Event_Channel::close (u_long)
{
CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
- for (CONNECTION_MAP_ENTRY *me;
+ for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
cmi.next (me) != 0;
cmi.advance ())
{