summaryrefslogtreecommitdiff
path: root/ACE/netsvcs/lib/TS_Clerk_Handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/netsvcs/lib/TS_Clerk_Handler.cpp')
-rw-r--r--ACE/netsvcs/lib/TS_Clerk_Handler.cpp631
1 files changed, 631 insertions, 0 deletions
diff --git a/ACE/netsvcs/lib/TS_Clerk_Handler.cpp b/ACE/netsvcs/lib/TS_Clerk_Handler.cpp
new file mode 100644
index 00000000000..2077c1162ae
--- /dev/null
+++ b/ACE/netsvcs/lib/TS_Clerk_Handler.cpp
@@ -0,0 +1,631 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/Get_Opt.h"
+#include "TS_Clerk_Handler.h"
+#include "ace/Signal.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_time.h"
+#include "ace/os_include/os_netdb.h"
+
+ACE_RCSID(lib, TS_Clerk_Handler, "$Id$")
+
+ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor *processor,
+ ACE_INET_Addr &addr)
+: state_ (ACE_TS_Clerk_Handler::IDLE),
+ timeout_ (ACE_DEFAULT_TIMEOUT),
+ max_timeout_ (ACE_TS_Clerk_Handler::MAX_RETRY_TIMEOUT),
+ remote_addr_ (addr),
+ processor_ (processor)
+{
+ ACE_TRACE ("ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler");
+ this->time_info_.delta_time_ = 0;
+ this->time_info_.sequence_num_ = 0;
+}
+
+// Set the connection state
+void
+ACE_TS_Clerk_Handler::state (ACE_TS_Clerk_Handler::State state)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::state"));
+ this->state_ = state;
+}
+
+// Get the connection state
+ACE_TS_Clerk_Handler::State
+ACE_TS_Clerk_Handler::state (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::state"));
+ return this->state_;
+}
+
+// Sets the timeout delay.
+void
+ACE_TS_Clerk_Handler::timeout (long to)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::timeout"));
+ if (to > this->max_timeout_)
+ to = this->max_timeout_;
+
+ this->timeout_ = to;
+}
+
+// Recalculate the current retry timeout delay using exponential
+// backoff. Returns the original timeout (i.e., before the
+// recalculation).
+long
+ACE_TS_Clerk_Handler::timeout (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::timeout"));
+ long old_timeout = this->timeout_;
+ this->timeout_ *= 2;
+
+ if (this->timeout_ > this->max_timeout_)
+ this->timeout_ = this->max_timeout_;
+
+ return old_timeout;
+}
+
+// This is called when a <send> to the logging server fails...
+
+int
+ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
+{
+ return -1;
+}
+
+// Set the max timeout delay.
+void
+ACE_TS_Clerk_Handler::max_timeout (long mto)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::max_timeout"));
+ this->max_timeout_ = mto;
+}
+
+// Gets the max timeout delay.
+long
+ACE_TS_Clerk_Handler::max_timeout (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::max_timeout"));
+ return this->max_timeout_;
+}
+
+int
+ACE_TS_Clerk_Handler::open (void *)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::open"));
+ ACE_INET_Addr server_addr;
+
+ // Set connection state as established
+ this->state (ACE_TS_Clerk_Handler::ESTABLISHED);
+
+ // Register ourselves to receive SIGPIPE so we can attempt
+ // reconnections.
+#if !defined (ACE_WIN32)
+ if (ACE_Reactor::instance ()->register_handler (SIGPIPE, this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%n: %p\n"),
+ ACE_TEXT ("register_handler (SIGPIPE)")), -1);
+#endif /* ACE_WIN32 */
+
+ // Register ourselves with the reactor to receive input
+ if (ACE_Reactor::instance ()->register_handler (this->get_handle (),
+ this,
+ ACE_Event_Handler::READ_MASK |
+ ACE_Event_Handler::EXCEPT_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("%n: %p\n"),
+ ACE_TEXT ("register_handler (this)")));
+
+ // Figure out what remote port we're really bound to.
+ else if (this->peer ().get_remote_addr (server_addr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("get_remote_addr")),
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TS Clerk Daemon connected to port %d on handle %d\n"),
+ server_addr.get_port_number (),
+ this->peer ().get_handle ()));
+
+ return 0;
+}
+
+ACE_HANDLE
+ACE_TS_Clerk_Handler::get_handle (void) const
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::get_handle"));
+ return this->peer().get_handle ();
+}
+
+int
+ACE_TS_Clerk_Handler::handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask mask)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::handle_close"));
+ ACE_UNUSED_ARG (mask);
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) shutting down on handle %d\n"),
+ this->get_handle ()));
+
+ return this->reinitiate_connection ();
+}
+
+int
+ACE_TS_Clerk_Handler::reinitiate_connection (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::reinitiate_connection"));
+ // Skip over deactivated descriptors.
+
+ // Set state to connecting so that we don't try to send anything
+ // using this handler
+ this->state (ACE_TS_Clerk_Handler::CONNECTING);
+ if (this->get_handle () != ACE_INVALID_HANDLE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Scheduling reinitiation of connection\n")));
+
+ // Reschedule ourselves to try and connect again.
+ ACE_Time_Value const timeout (this->timeout ());
+ if (ACE_Reactor::instance ()->schedule_timer (this, 0,
+ timeout) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("schedule_timer")), -1);
+ }
+ return 0;
+}
+
+// Receive a time update from a server
+int
+ACE_TS_Clerk_Handler::handle_input (ACE_HANDLE)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::handle_input"));
+ // We're getting a time update message from a server
+ ACE_Time_Request reply;
+ if (this->recv_reply (reply) != 0)
+ return -1;
+ else
+ {
+ // Get current local time
+ ACE_UINT32 local_time = ACE_OS::time (0);
+
+ // Compure delta time (difference between current local time and
+ // system time obtained from the server)
+ long t = reply.time () - local_time;
+
+ // Compute round trip delay and adjust time accordingly
+ ACE_UINT32 one_way_time = (local_time - this->start_time_)/2;
+ t += one_way_time;
+
+ // Now update time info (to be retrieved by Clerk_Processor)
+ this->time_info_.delta_time_ = t;
+ this->time_info_.sequence_num_ = this->cur_sequence_num_;
+ }
+ return 0;
+}
+
+// Restart connection asynchronously when timeout occurs.
+int
+ACE_TS_Clerk_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::handle_timeout"));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) attempting to reconnect to server with timeout = %d\n"),
+ this->timeout_));
+
+ // Close down peer to reclaim descriptor if need be. Note this is
+ // necessary to reconnect.
+ this->peer ().close ();
+
+ return this->processor_->initiate_connection (this, ACE_Synch_Options::asynch);
+}
+
+void
+ACE_TS_Clerk_Handler::remote_addr (ACE_INET_Addr &addr)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::remote_addr"));
+ this->remote_addr_ = addr;
+}
+
+ACE_INET_Addr &
+ACE_TS_Clerk_Handler::remote_addr (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::remote_addr"));
+ return this->remote_addr_;
+}
+
+int
+ACE_TS_Clerk_Handler::recv_reply (ACE_Time_Request &reply)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::recv_reply"));
+ const int bytes_expected = reply.size ();
+
+ // Since Time_Request messages are fixed size, read the entire
+ // message in one go.
+ ssize_t n = this->peer ().recv ((void *) &reply, bytes_expected);
+
+ if (n != bytes_expected)
+ {
+ switch (n)
+ {
+ case -1:
+ // FALLTHROUGH
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("****************** recv_reply returned -1\n")));
+ default:
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p got %d bytes, expected %d bytes\n"),
+ ACE_TEXT ("recv failed"), n, bytes_expected));
+ // FALLTHROUGH
+ case 0:
+ // We've shutdown unexpectedly
+ return -1;
+ // NOTREACHED
+ }
+ }
+ else if (reply.decode () == -1) // Decode the request into host byte order.
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("decode failed")), -1);
+ return 0;
+}
+
+
+int
+ACE_TS_Clerk_Handler::send_request (ACE_UINT32 sequence_num, ACE_Time_Info &time_info)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Handler::send_request"));
+ void *buffer;
+ ssize_t length;
+
+ // Update current sequence number
+ this->cur_sequence_num_ = sequence_num;
+
+ // First update the current time info.
+ time_info.delta_time_ = this->time_info_.delta_time_;
+ time_info.sequence_num_ = this->time_info_.sequence_num_;
+
+ // Now prepare a new time update request
+ ACE_Time_Request request (ACE_Time_Request::TIME_UPDATE, 0, 0);
+
+ if ((length = request.encode (buffer)) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("encode failed")), -1);
+
+ // Compute start time of sending request (needed to compute
+ // roundtrip delay)
+ this->start_time_ = ACE_OS::time (0);
+
+ // Send the request
+ if (this->peer ().send_n (buffer, length) != length)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("send_n failed")),
+ -1);
+
+ return 0;
+}
+
+ACE_TS_Clerk_Processor::ACE_TS_Clerk_Processor ()
+: timeout_ (ACE_DEFAULT_TIMEOUT),
+ blocking_semantics_ (0),
+ cur_sequence_num_ (0)
+{
+#if defined (ACE_DEFAULT_BACKING_STORE)
+ // Create a temporary file.
+ ACE_OS::strcpy (this->poolname_,
+ ACE_DEFAULT_BACKING_STORE);
+#else /* ACE_DEFAULT_BACKING_STORE */
+ if (ACE::get_temp_dir (this->poolname_,
+ MAXPATHLEN - 17) == -1) // -17 for ace-malloc-XXXXXX
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Temporary path too long, ")
+ ACE_TEXT ("defaulting to current directory\n")));
+ this->poolname_[0] = 0;
+ }
+
+ // Add the filename to the end
+ ACE_OS::strcat (this->poolname_, ACE_TEXT ("ace-malloc-XXXXXX"));
+
+#endif /* ACE_DEFAULT_BACKING_STORE */
+}
+
+void
+ACE_TS_Clerk_Processor::alloc (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::alloc"));
+ ACE_NEW (this->shmem_, ALLOCATOR (this->poolname_));
+
+ // Only create the state if it doesn't already exist.
+ if (this->shmem_->find (ACE_DEFAULT_TIME_SERVER_STR) == -1)
+ {
+ // Allocate the space out of shared memory for the system time entry
+ void *temp = this->shmem_->malloc (sizeof (this->system_time_));
+
+ // Give it a name binding
+ this->shmem_->bind (ACE_DEFAULT_TIME_SERVER_STR, temp);
+
+ // Set up pointers. Note that we add one to get to the second
+ // field in the structure
+ this->system_time_.delta_time_ = (long *) temp;
+ this->system_time_.last_local_time_ = ((long *) temp) + 1;
+
+ // Initialize
+ *(this->system_time_.delta_time_) = 0;
+ *(this->system_time_.last_local_time_) = ACE_OS::time (0);
+ }
+}
+
+// Query the servers for the latest time
+int
+ACE_TS_Clerk_Processor::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::handle_timeout"));
+ return this->update_time ();
+}
+
+int
+ACE_TS_Clerk_Processor::update_time ()
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::update_time"));
+ ACE_UINT32 expected_sequence_num = this->cur_sequence_num_;
+
+ // Increment sequence number
+ this->cur_sequence_num_++;
+
+ int count = 0;
+ long total_delta = 0;
+ ACE_Time_Info time_info;
+
+ // Call send_request() on all handlers
+ ACE_TS_Clerk_Handler **handler = 0;
+
+ for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
+ set_iterator.next (handler) != 0;
+ set_iterator.advance ())
+ {
+ if ((*handler)->state () == ACE_TS_Clerk_Handler::ESTABLISHED)
+ {
+ if ((*handler)->send_request (this->cur_sequence_num_, time_info) == -1)
+ return -1;
+ // Check if sequence numbers match; otherwise discard
+ else if (expected_sequence_num != 0 &&
+ time_info.sequence_num_ == expected_sequence_num)
+ {
+ count++;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("[%d] Delta time: %d\n"),
+ count, time_info.delta_time_));
+
+ // #### Can check here if delta value falls within a threshold ####
+ total_delta += time_info.delta_time_;
+ }
+ }
+ }
+ // Update system_time_ using average of times obtained from all the servers.
+ // Note that we are keeping two things in shared memory: the delta
+ // time (difference between our system clock and the local clock),
+ // and the last local time
+ if (count > 0)
+ {
+ // At least one server is out there
+ *(this->system_time_.delta_time_) = total_delta/count;
+ }
+ else
+ {
+ // No servers are out there (or this is the first time around
+ // computing the time) so set delta time to zero. This
+ // would mean that clients would use the actual local system time.
+ *(this->system_time_.delta_time_) = 0;
+ }
+ // Update the last local time
+ *(this->system_time_.last_local_time_) = ACE_OS::time (0);
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Average delta time: %d\n"),
+ *(this->system_time_.delta_time_)));
+ return 0;
+}
+
+
+int
+ACE_TS_Clerk_Processor::fini (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::fini"));
+
+ // Cancel the timer
+ if (this->timer_id_ != -1)
+ ACE_Reactor::instance ()->cancel_timer (this->timer_id_);
+
+ // Destroy all the handlers
+ ACE_TS_Clerk_Handler **handler = 0;
+
+ for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
+ set_iterator.next (handler) != 0;
+ set_iterator.advance ())
+ {
+ if ((*handler)->state () != ACE_TS_Clerk_Handler::IDLE)
+ // Mark state as DISCONNECTING so we don't try to reconnect...
+ (*handler)->state (ACE_TS_Clerk_Handler::DISCONNECTING);
+
+ // Deallocate resources.
+ (*handler)->destroy (); // Will trigger a delete
+ }
+
+ // Remove the backing store
+ this->shmem_->remove ();
+
+ ACE_Connector <ACE_TS_Clerk_Handler, ACE_SOCK_CONNECTOR>::fini ();
+
+ return 0;
+}
+
+int
+ACE_TS_Clerk_Processor::info (ACE_TCHAR **, size_t) const
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::info"));
+ return 0;
+}
+
+int
+ACE_TS_Clerk_Processor::init (int argc, ACE_TCHAR *argv[])
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::init"));
+ // Use the options hook to parse the command line arguments and set
+ // options.
+ this->parse_args (argc, argv);
+
+ this->alloc ();
+
+#if !defined (ACE_WIN32)
+ // Ignore SIPPIPE so each Output_Channel can handle it.
+ ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
+ ACE_UNUSED_ARG (sig);
+#endif /* ACE_WIN32 */
+
+ ACE_Synch_Options &synch_options = this->blocking_semantics_ == 0
+ ? ACE_Synch_Options::asynch : ACE_Synch_Options::synch;
+
+ // Now set up connections to all servers
+ ACE_TS_Clerk_Handler **handler = 0;
+
+ for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
+ set_iterator.next (handler) != 0;
+ set_iterator.advance ())
+ {
+ this->initiate_connection (*handler, synch_options);
+ }
+ // Now set up timer to receive updates from server
+ // set the timer to go off after timeout value
+ this->timer_id_ = ACE_Reactor::instance ()->schedule_timer (this,
+ NULL,
+ ACE_Time_Value (this->timeout_),
+ ACE_Time_Value (this->timeout_));
+ return 0;
+}
+
+int
+ACE_TS_Clerk_Processor::initiate_connection (ACE_TS_Clerk_Handler *handler,
+ ACE_Synch_Options &synch_options)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::initiate_connection"));
+ ACE_TCHAR buf[MAXHOSTNAMELEN + 1];
+
+ // Mark ourselves as idle so that the various iterators will ignore
+ // us until we are connected/reconnected.
+ handler->state (ACE_TS_Clerk_Handler::IDLE);
+
+ if (handler->remote_addr ().addr_to_string (buf, MAXHOSTNAMELEN) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("can't obtain peer's address")), -1);
+
+ // Establish connection with the server.
+ if (this->connect (handler,
+ handler->remote_addr (),
+ synch_options) == -1)
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ handler->state (ACE_TS_Clerk_Handler::FAILED);
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) %p on address %s\n"),
+ ACE_TEXT ("connect"), buf));
+
+ // Reschedule ourselves to try and connect again.
+ if (synch_options[ACE_Synch_Options::USE_REACTOR])
+ {
+ ACE_Time_Value const handler_timeout (handler->timeout ());
+ if (ACE_Reactor::instance ()->schedule_timer (handler,
+ 0,
+ handler_timeout) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("schedule_timer")), -1);
+ }
+ else
+ // Failures on synchronous connects are reported as errors
+ // so that the caller can decide how to proceed.
+ return -1;
+ }
+ else
+ {
+ handler->state (ACE_TS_Clerk_Handler::CONNECTING);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) in the process of connecting %s to %s\n"),
+ synch_options[ACE_Synch_Options::USE_REACTOR] ?
+ ACE_TEXT ("asynchronously") : ACE_TEXT ("synchronously"),
+ buf));
+ }
+ }
+ else
+ {
+ handler->state (ACE_TS_Clerk_Handler::ESTABLISHED);
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) connected to %s on %d\n"),
+ buf, handler->get_handle ()));
+ }
+ return 0;
+}
+
+int
+ACE_TS_Clerk_Processor::parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::parse_args"));
+ ACE_INET_Addr server_addr;
+ ACE_TS_Clerk_Handler *handler;
+
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:t:p:b"), 0);
+
+ for (int c; (c = get_opt ()) != -1; )
+ {
+ switch (c)
+ {
+ case 'h':
+ // Get the hostname:port and create an ADDR
+ server_addr.set (get_opt.opt_arg ());
+
+ // Create a new handler
+ ACE_NEW_RETURN (handler,
+ ACE_TS_Clerk_Handler (this, server_addr),
+ -1);
+
+ // Cache the handler
+ this->handler_set_.insert (handler);
+ break;
+ case 't':
+ // Get the timeout value
+ this->timeout_ = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'p':
+ // Get the poolname
+ ACE_OS::strncpy (this->poolname_,
+ get_opt.opt_arg (),
+ sizeof this->poolname_ / sizeof (ACE_TCHAR));
+ break;
+ case 'b':
+ // Blocking semantics
+ this->blocking_semantics_ = 1;
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%n:\n[-h hostname:port] [-t timeout] [-p poolname]\n")),
+ -1);
+ }
+ }
+ return 0;
+}
+
+int
+ACE_TS_Clerk_Processor::suspend (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::suspend"));
+ return 0;
+}
+
+int
+ACE_TS_Clerk_Processor::resume (void)
+{
+ ACE_TRACE (ACE_TEXT ("ACE_TS_Clerk_Processor::resume"));
+ return 0;
+}
+
+// The following is a "Factory" used by the ACE_Service_Config and
+// svc.conf file to dynamically initialize the state of the TS_Clerk.
+
+ACE_SVC_FACTORY_DEFINE (ACE_TS_Clerk_Processor)
+