diff options
Diffstat (limited to 'netsvcs/lib/TS_Clerk_Handler.cpp')
-rw-r--r-- | netsvcs/lib/TS_Clerk_Handler.cpp | 238 |
1 files changed, 114 insertions, 124 deletions
diff --git a/netsvcs/lib/TS_Clerk_Handler.cpp b/netsvcs/lib/TS_Clerk_Handler.cpp index 094676c99e2..f3b4ac020c6 100644 --- a/netsvcs/lib/TS_Clerk_Handler.cpp +++ b/netsvcs/lib/TS_Clerk_Handler.cpp @@ -8,7 +8,7 @@ ACE_RCSID(lib, TS_Clerk_Handler, "$Id$") ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor *processor, - ACE_INET_Addr &addr) + ACE_INET_Addr &addr) : state_ (ACE_TS_Clerk_Handler::IDLE), timeout_ (ACE_DEFAULT_TIMEOUT), max_timeout_ (ACE_TS_Clerk_Handler::MAX_RETRY_TIMEOUT), @@ -101,14 +101,14 @@ ACE_TS_Clerk_Handler::open (void *) #if !defined (ACE_WIN32) if (ACE_Reactor::instance ()->register_handler (SIGPIPE, this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n", - "register_handler (SIGPIPE)"), -1); + "register_handler (SIGPIPE)"), -1); #endif /* ACE_WIN32 */ // Register ourselves with the reactor to receive input if (ACE_Reactor::instance ()->register_handler (this->get_handle (), - this, - ACE_Event_Handler::READ_MASK | - ACE_Event_Handler::EXCEPT_MASK) == -1) + this, + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::EXCEPT_MASK) == -1) ACE_ERROR ((LM_ERROR, "%n: %p\n", "register_handler (this)")); // Figure out what remote port we're really bound to. @@ -116,9 +116,9 @@ ACE_TS_Clerk_Handler::open (void *) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_remote_addr"), -1); ACE_DEBUG ((LM_DEBUG, - "TS Clerk Daemon connected to port %d on handle %d\n", - server_addr.get_port_number (), - this->peer ().get_handle ())); + "TS Clerk Daemon connected to port %d on handle %d\n", + server_addr.get_port_number (), + this->peer ().get_handle ())); return 0; } @@ -132,7 +132,7 @@ ACE_TS_Clerk_Handler::get_handle (void) const int ACE_TS_Clerk_Handler::handle_close (ACE_HANDLE, - ACE_Reactor_Mask mask) + ACE_Reactor_Mask mask) { ACE_TRACE ("ACE_TS_Clerk_Handler::handle_close"); ACE_UNUSED_ARG (mask); @@ -157,8 +157,8 @@ ACE_TS_Clerk_Handler::reinitiate_connection (void) // Reschedule ourselves to try and connect again. if (ACE_Reactor::instance ()->schedule_timer (this, 0, - this->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); + this->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); } return 0; } @@ -195,12 +195,12 @@ ACE_TS_Clerk_Handler::handle_input (ACE_HANDLE) // Restart connection asynchronously when timeout occurs. int ACE_TS_Clerk_Handler::handle_timeout (const ACE_Time_Value &, - const void *) + const void *) { ACE_TRACE ("ACE_TS_Clerk_Handler::handle_timeout"); ACE_DEBUG ((LM_DEBUG, - "(%t) attempting to reconnect to server with timeout = %d\n", - this->timeout_)); + "(%t) attempting to reconnect to server with timeout = %d\n", + this->timeout_)); // Close down peer to reclaim descriptor if need be. Note this is // necessary to reconnect. @@ -236,19 +236,19 @@ ACE_TS_Clerk_Handler::recv_reply (ACE_Time_Request &reply) if (n != bytes_expected) { switch (n) - { - case -1: - // FALLTHROUGH - ACE_DEBUG ((LM_DEBUG, "****************** recv_reply returned -1\n")); - default: - ACE_ERROR ((LM_ERROR, "%p got %d bytes, expected %d bytes\n", - "recv failed", n, bytes_expected)); - // FALLTHROUGH - case 0: - // We've shutdown unexpectedly - return -1; - // NOTREACHED - } + { + case -1: + // FALLTHROUGH + ACE_DEBUG ((LM_DEBUG, "****************** recv_reply returned -1\n")); + default: + ACE_ERROR ((LM_ERROR, "%p got %d bytes, expected %d bytes\n", + "recv failed", n, bytes_expected)); + // FALLTHROUGH + case 0: + // We've shutdown unexpectedly + return -1; + // NOTREACHED + } } else if (reply.decode () == -1) // Decode the request into host byte order. ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "decode failed"), -1); @@ -297,10 +297,10 @@ ACE_TS_Clerk_Processor::ACE_TS_Clerk_Processor () ACE_OS::strcpy (this->poolname_, ACE_DEFAULT_BACKING_STORE); #else /* ACE_DEFAULT_BACKING_STORE */ - if (ACE::get_temp_dir (this->poolname_, + if (ACE::get_temp_dir (this->poolname_, MAXPATHLEN - 17) == -1) // -17 for ace-malloc-XXXXXX { - ACE_ERROR ((LM_ERROR, + ACE_ERROR ((LM_ERROR, "Temporary path too long, " "defaulting to current directory\n")); this->poolname_[0] = 0; @@ -341,7 +341,7 @@ ACE_TS_Clerk_Processor::alloc (void) // Query the servers for the latest time int ACE_TS_Clerk_Processor::handle_timeout (const ACE_Time_Value &, - const void *) + const void *) { ACE_TRACE ("ACE_TS_Clerk_Processor::handle_timeout"); return this->update_time (); @@ -368,20 +368,20 @@ ACE_TS_Clerk_Processor::update_time () set_iterator.advance ()) { if ((*handler)->state () == ACE_TS_Clerk_Handler::ESTABLISHED) - { - if ((*handler)->send_request (this->cur_sequence_num_, time_info) == -1) - return -1; - // Check if sequence numbers match; otherwise discard - else if (expected_sequence_num != 0 && - time_info.sequence_num_ == expected_sequence_num) - { - count++; - ACE_DEBUG ((LM_DEBUG, "[%d] Delta time: %d\n", count, time_info.delta_time_)); - - // #### Can check here if delta value falls within a threshold #### - total_delta += time_info.delta_time_; - } - } + { + if ((*handler)->send_request (this->cur_sequence_num_, time_info) == -1) + return -1; + // Check if sequence numbers match; otherwise discard + else if (expected_sequence_num != 0 && + time_info.sequence_num_ == expected_sequence_num) + { + count++; + ACE_DEBUG ((LM_DEBUG, "[%d] Delta time: %d\n", count, time_info.delta_time_)); + + // #### Can check here if delta value falls within a threshold #### + total_delta += time_info.delta_time_; + } + } } // Update system_time_ using average of times obtained from all the servers. // Note that we are keeping two things in shared memory: the delta @@ -424,8 +424,8 @@ ACE_TS_Clerk_Processor::fini (void) set_iterator.advance ()) { if ((*handler)->state () != ACE_TS_Clerk_Handler::IDLE) - // Mark state as DISCONNECTING so we don't try to reconnect... - (*handler)->state (ACE_TS_Clerk_Handler::DISCONNECTING); + // Mark state as DISCONNECTING so we don't try to reconnect... + (*handler)->state (ACE_TS_Clerk_Handler::DISCONNECTING); // Deallocate resources. (*handler)->destroy (); // Will trigger a delete @@ -477,15 +477,15 @@ ACE_TS_Clerk_Processor::init (int argc, char *argv[]) // Now set up timer to receive updates from server // set the timer to go off after timeout value this->timer_id_ = ACE_Reactor::instance ()->schedule_timer (this, - NULL, - ACE_Time_Value (this->timeout_), - ACE_Time_Value (this->timeout_)); + NULL, + ACE_Time_Value (this->timeout_), + ACE_Time_Value (this->timeout_)); return 0; } int ACE_TS_Clerk_Processor::initiate_connection (ACE_TS_Clerk_Handler *handler, - ACE_Synch_Options &synch_options) + ACE_Synch_Options &synch_options) { ACE_TRACE ("ACE_TS_Clerk_Processor::initiate_connection"); char buf[MAXHOSTNAMELEN + 1]; @@ -496,45 +496,45 @@ ACE_TS_Clerk_Processor::initiate_connection (ACE_TS_Clerk_Handler *handler, if (handler->remote_addr ().addr_to_string (buf, sizeof buf) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "can't obtain peer's address"), -1); + "can't obtain peer's address"), -1); // Establish connection with the server. if (this->connect (handler, - handler->remote_addr (), - synch_options) == -1) + handler->remote_addr (), + synch_options) == -1) { if (errno != EWOULDBLOCK) - { - handler->state (ACE_TS_Clerk_Handler::FAILED); - ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", "connect", buf)); - - // Reschedule ourselves to try and connect again. - if (synch_options[ACE_Synch_Options::USE_REACTOR]) - { - if (ACE_Reactor::instance ()->schedule_timer (handler, - 0, - handler->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); - } - else - // Failures on synchronous connects are reported as errors - // so that the caller can decide how to proceed. - return -1; - } + { + handler->state (ACE_TS_Clerk_Handler::FAILED); + ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", "connect", buf)); + + // Reschedule ourselves to try and connect again. + if (synch_options[ACE_Synch_Options::USE_REACTOR]) + { + if (ACE_Reactor::instance ()->schedule_timer (handler, + 0, + handler->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); + } + else + // Failures on synchronous connects are reported as errors + // so that the caller can decide how to proceed. + return -1; + } else - { - handler->state (ACE_TS_Clerk_Handler::CONNECTING); - ACE_DEBUG ((LM_DEBUG, - "(%t) in the process of connecting %s to %s\n", - synch_options[ACE_Synch_Options::USE_REACTOR] - ? "asynchronously" : "synchronously", buf)); - } + { + handler->state (ACE_TS_Clerk_Handler::CONNECTING); + ACE_DEBUG ((LM_DEBUG, + "(%t) in the process of connecting %s to %s\n", + synch_options[ACE_Synch_Options::USE_REACTOR] + ? "asynchronously" : "synchronously", buf)); + } } else { handler->state (ACE_TS_Clerk_Handler::ESTABLISHED); ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", - buf, handler->get_handle ())); + buf, handler->get_handle ())); } return 0; } @@ -549,46 +549,46 @@ ACE_TS_Clerk_Processor::parse_args (int argc, char *argv[]) // Create a default entry ACE_OS::sprintf (server_host, "%s:%d", - ACE_DEFAULT_SERVER_HOST, - ACE_DEFAULT_LOGGING_SERVER_PORT); + ACE_DEFAULT_SERVER_HOST, + ACE_DEFAULT_LOGGING_SERVER_PORT); ACE_Get_Opt get_opt (argc, argv, "h:t:p:b", 0); for (int c; (c = get_opt ()) != -1; ) { switch (c) - { - case 'h': - // Get the hostname:port and create an ADDR - server_addr.set (get_opt.opt_arg ()); - - // Create a new handler - ACE_NEW_RETURN (handler, - ACE_TS_Clerk_Handler (this, server_addr), - -1); - - // Cache the handler - this->handler_set_.insert (handler); - break; - case 't': - // Get the timeout value - this->timeout_ = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'p': - // Get the poolname - ACE_OS::strncpy (this->poolname_, - ACE_TEXT_CHAR_TO_TCHAR (get_opt.opt_arg ()), - sizeof this->poolname_ / sizeof (ACE_TCHAR)); - break; - case 'b': - // Blocking semantics - this->blocking_semantics_ = 1; - break; - default: - ACE_ERROR_RETURN ((LM_ERROR, - "%n:\n[-h hostname:port] [-t timeout] [-p poolname]\n%a", 1), - -1); - } + { + case 'h': + // Get the hostname:port and create an ADDR + server_addr.set (get_opt.opt_arg ()); + + // Create a new handler + ACE_NEW_RETURN (handler, + ACE_TS_Clerk_Handler (this, server_addr), + -1); + + // Cache the handler + this->handler_set_.insert (handler); + break; + case 't': + // Get the timeout value + this->timeout_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'p': + // Get the poolname + ACE_OS::strncpy (this->poolname_, + ACE_TEXT_CHAR_TO_TCHAR (get_opt.opt_arg ()), + sizeof this->poolname_ / sizeof (ACE_TCHAR)); + break; + case 'b': + // Blocking semantics + this->blocking_semantics_ = 1; + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "%n:\n[-h hostname:port] [-t timeout] [-p poolname]\n%a", 1), + -1); + } } return 0; } @@ -615,23 +615,13 @@ ACE_SVC_FACTORY_DEFINE (ACE_TS_Clerk_Processor) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Connector<ACE_TS_Clerk_Handler, ACE_SOCK_CONNECTOR>; template class ACE_Node<ACE_TS_Clerk_Handler *>; -template class ACE_Svc_Tuple<ACE_TS_Clerk_Handler>; +template class ACE_NonBlocking_Connect_Handler<ACE_TS_Clerk_Handler>; template class ACE_Unbounded_Set<ACE_TS_Clerk_Handler *>; template class ACE_Unbounded_Set_Iterator<ACE_TS_Clerk_Handler *>; -template class ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *>; -template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX>; -template class ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX>; -template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX>; -template class ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Connector<ACE_TS_Clerk_Handler, ACE_SOCK_CONNECTOR> #pragma instantiate ACE_Node<ACE_TS_Clerk_Handler *> -#pragma instantiate ACE_Svc_Tuple<ACE_TS_Clerk_Handler> +#pragma instantiate ACE_NonBlocking_Connect_Handler<ACE_TS_Clerk_Handler> #pragma instantiate ACE_Unbounded_Set<ACE_TS_Clerk_Handler *> #pragma instantiate ACE_Unbounded_Set_Iterator<ACE_TS_Clerk_Handler *> -#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *> -#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX> -#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX> -#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX> -#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<ACE_TS_Clerk_Handler> *, ACE_SYNCH_RW_MUTEX> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |