diff options
Diffstat (limited to 'examples/C++NPv2/Client_Logging_Daemon.cpp')
-rw-r--r-- | examples/C++NPv2/Client_Logging_Daemon.cpp | 379 |
1 files changed, 0 insertions, 379 deletions
diff --git a/examples/C++NPv2/Client_Logging_Daemon.cpp b/examples/C++NPv2/Client_Logging_Daemon.cpp deleted file mode 100644 index 5fc0d554cbd..00000000000 --- a/examples/C++NPv2/Client_Logging_Daemon.cpp +++ /dev/null @@ -1,379 +0,0 @@ -/* -** $Id$ -** -** Copyright 2002 Addison Wesley. All Rights Reserved. -*/ - -#include "ace/os_include/os_netdb.h" -#include "ace/OS_NS_sys_time.h" -#include "ace/OS_NS_sys_socket.h" -#include "ace/OS_NS_unistd.h" -#include "ace/OS_NS_string.h" -#include "ace/Event_Handler.h" -#include "ace/INET_Addr.h" -#include "ace/Get_Opt.h" -#include "ace/Log_Record.h" -#include "ace/Message_Block.h" -#include "ace/Message_Queue.h" -#include "ace/Reactor.h" -#include "ace/Service_Object.h" -#include "ace/Signal.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/SOCK_Connector.h" -#include "ace/SOCK_Stream.h" -#include "ace/Thread_Manager.h" -#include "Logging_Acceptor.h" -#include "CLD_export.h" - -#if !defined (FLUSH_TIMEOUT) -#define FLUSH_TIMEOUT 120 /* 120 seconds == 2 minutes. */ -#endif /* FLUSH_TIMEOUT */ - - -class CLD_Connector; - -class CLD_Handler : public ACE_Event_Handler { -public: - enum { QUEUE_MAX = sizeof (ACE_Log_Record) * ACE_IOV_MAX }; - - // Initialization hook method. - virtual int open (CLD_Connector *); - virtual int close (); // Shut down hook method. - - // Accessor to the connection to the logging server. - virtual ACE_SOCK_Stream &peer () { return peer_; } - - // Reactor hook methods. - virtual int handle_input (ACE_HANDLE handle); - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = 0); - -protected: - // Forward log records to the server logging daemon. - virtual ACE_THR_FUNC_RETURN forward (); - - // Send the buffered log records using a gather-write operation. - virtual int send (ACE_Message_Block *chunk[], size_t &count); - - // Entry point into forwarder thread of control. - static ACE_THR_FUNC_RETURN run_svc (void *arg); - - // A synchronized <ACE_Message_Queue> that queues messages. - ACE_Message_Queue<ACE_SYNCH> msg_queue_; - - // Manage the forwarder thread. - ACE_Thread_Manager thr_mgr_; - - // Pointer to our <CLD_Connector>. - CLD_Connector *connector_; - - // Connection to the logging server. - ACE_SOCK_Stream peer_; -}; - - -class CLD_Connector { -public: - // Establish a connection to the logging server - // at the <remote_addr>. - int connect (CLD_Handler *handler, - const ACE_INET_Addr &remote_addr); - - // Re-establish a connection to the logging server. - int reconnect (); - -private: - // Pointer to the <CLD_Handler> that we're connecting. - CLD_Handler *handler_; - - // Address at which the logging server is listening - // for connections. - ACE_INET_Addr remote_addr_; -}; - - -class CLD_Acceptor : public ACE_Event_Handler { -public: - // Initialization hook method. - virtual int open (CLD_Handler *, const ACE_INET_Addr &, - ACE_Reactor * = ACE_Reactor::instance ()); - - // Reactor hook methods. - virtual int handle_input (ACE_HANDLE handle); - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = 0); - virtual ACE_HANDLE get_handle () const; - -protected: - // Factory that passively connects <ACE_SOCK_Stream>s. - ACE_SOCK_Acceptor acceptor_; - - // Pointer to the handler of log records. - CLD_Handler *handler_; -}; - -/****************************************************/ - -int CLD_Handler::handle_input (ACE_HANDLE handle) { - ACE_Message_Block *mblk = 0; - Logging_Handler logging_handler (handle); - - if (logging_handler.recv_log_record (mblk) != -1) - if (msg_queue_.enqueue_tail (mblk->cont ()) != -1) { - mblk->cont (0); - mblk->release (); - return 0; // Success return. - } else mblk->release (); - return -1; // Error return. -} - - -int CLD_Handler::handle_close (ACE_HANDLE handle, - ACE_Reactor_Mask) -{ return ACE_OS::closesocket (handle); } - - -int CLD_Handler::open (CLD_Connector *connector) { - connector_ = connector; - int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; - peer ().set_option (SOL_SOCKET, SO_SNDBUF, - &bufsiz, sizeof bufsiz); - msg_queue_.high_water_mark (CLD_Handler::QUEUE_MAX); - return thr_mgr_.spawn (&CLD_Handler::run_svc, - this, THR_SCOPE_SYSTEM); -} - - -ACE_THR_FUNC_RETURN CLD_Handler::run_svc (void *arg) { - CLD_Handler *handler = static_cast<CLD_Handler *> (arg); - return handler->forward (); -} - - -ACE_THR_FUNC_RETURN CLD_Handler::forward () { - ACE_Message_Block *chunk[ACE_IOV_MAX]; - size_t message_index = 0; - ACE_Time_Value time_of_last_send (ACE_OS::gettimeofday ()); - ACE_Time_Value timeout; - ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN); - ACE_Sig_Action original_action; - no_sigpipe.register_action (SIGPIPE, &original_action); - - for (;;) { - if (message_index == 0) { - timeout = ACE_OS::gettimeofday (); - timeout += FLUSH_TIMEOUT; - } - ACE_Message_Block *mblk = 0; - if (msg_queue_.dequeue_head (mblk, &timeout) == -1) { - if (errno != EWOULDBLOCK) break; - else if (message_index == 0) continue; - } else { - if (mblk->size () == 0 - && mblk->msg_type () == ACE_Message_Block::MB_STOP) - { mblk->release (); break; } - chunk[message_index] = mblk; - ++message_index; - } - if (message_index >= ACE_IOV_MAX || - (ACE_OS::gettimeofday () - time_of_last_send - >= ACE_Time_Value(FLUSH_TIMEOUT))) { - if (send (chunk, message_index) == -1) break; - time_of_last_send = ACE_OS::gettimeofday (); - } - } - - if (message_index > 0) send (chunk, message_index); - msg_queue_.close (); - no_sigpipe.restore_action (SIGPIPE, original_action); - return 0; -} - - -int CLD_Handler::send (ACE_Message_Block *chunk[], size_t &count) { - iovec iov[ACE_IOV_MAX]; - size_t iov_size; - int result = 0; - - for (iov_size = 0; iov_size < count; ++iov_size) { - iov[iov_size].iov_base = chunk[iov_size]->rd_ptr (); - iov[iov_size].iov_len = chunk[iov_size]->length (); - } - while (peer ().sendv_n (iov, iov_size) == -1) - if (connector_->reconnect () == -1) { - result = -1; - break; - } - - while (iov_size > 0) { - chunk[--iov_size]->release (); chunk[iov_size] = 0; - } - count = iov_size; - return result; -} - - -int CLD_Handler::close () { - ACE_Message_Block *shutdown_message = 0; - ACE_NEW_RETURN - (shutdown_message, - ACE_Message_Block (0, ACE_Message_Block::MB_STOP), -1); - msg_queue_.enqueue_tail (shutdown_message); - return thr_mgr_.wait (); -} - -/**************************************************************/ - - -int CLD_Acceptor::open (CLD_Handler *handler, - const ACE_INET_Addr &local_addr, - ACE_Reactor *r) { - reactor (r); // Store the reactor pointer. - handler_ = handler; - if (acceptor_.open (local_addr) == -1 - || reactor ()->register_handler - (this, ACE_Event_Handler::ACCEPT_MASK) == -1) - return -1; - return 0; -} - - -ACE_HANDLE CLD_Acceptor::get_handle () const -{ return acceptor_.get_handle (); } - - -int CLD_Acceptor::handle_input (ACE_HANDLE) { - ACE_SOCK_Stream peer_stream; - if (acceptor_.accept (peer_stream) == -1) return -1; - else if (reactor ()->register_handler - (peer_stream.get_handle (), - handler_, - ACE_Event_Handler::READ_MASK) == -1) - return -1; - else return 0; -} - - -int CLD_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { - acceptor_.close (); - handler_->close (); - return 0; -} - - -/***************************************************/ - - -int CLD_Connector::connect - (CLD_Handler *handler, - const ACE_INET_Addr &remote_addr) { - ACE_SOCK_Connector connector; - - if (connector.connect (handler->peer (), remote_addr) == -1) - return -1; - else if (handler->open (this) == -1) - { handler->handle_close (); return -1; } - handler_ = handler; - remote_addr_ = remote_addr; - return 0; -} - - -int CLD_Connector::reconnect () { - // Maximum number of times to retry connect. - const size_t MAX_RETRIES = 5; - - ACE_SOCK_Connector connector; - ACE_Time_Value timeout (1); // Start with 1 second timeout. - size_t i; - for (i = 0; i < MAX_RETRIES; ++i) { - if (i > 0) ACE_OS::sleep (timeout); - if (connector.connect (handler_->peer (), remote_addr_, - &timeout) == -1) - timeout *= 2; // Exponential backoff. - else { - int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; - handler_->peer ().set_option (SOL_SOCKET, SO_SNDBUF, - &bufsiz, sizeof bufsiz); - break; - } - } - return i == MAX_RETRIES ? -1 : 0; -} - - -/*******************************************************/ - -class Client_Logging_Daemon : public ACE_Service_Object { -public: - virtual ~Client_Logging_Daemon () {} // Turn off g++ warnings. - - // Service Configurator hook methods. - virtual int init (int argc, ACE_TCHAR *argv[]); - virtual int fini (); -#if 0 - // Implementing these methods is left as an exercise for the reader. - virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const; - virtual int suspend (); - virtual int resume (); -#endif - -protected: - // Receives, processes, and forwards log records. - CLD_Handler handler_; - - // Factory that passively connects the <CLD_Handler>. - CLD_Acceptor acceptor_; - - // Factory that actively connects the <CLD_Handler>. - CLD_Connector connector_; -}; - - -int Client_Logging_Daemon::init (int argc, ACE_TCHAR *argv[]) { - u_short cld_port = ACE_DEFAULT_SERVICE_PORT; - u_short sld_port = ACE_DEFAULT_LOGGING_SERVER_PORT; - ACE_TCHAR sld_host[MAXHOSTNAMELEN]; - ACE_OS::strcpy (sld_host, ACE_LOCALHOST); - - ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:r:s:"), 0); - get_opt.long_option (ACE_TEXT ("client_port"), 'p', - ACE_Get_Opt::ARG_REQUIRED); - get_opt.long_option (ACE_TEXT ("server_port"), 'r', - ACE_Get_Opt::ARG_REQUIRED); - get_opt.long_option (ACE_TEXT ("server_name"), 's', - ACE_Get_Opt::ARG_REQUIRED); - - for (int c; (c = get_opt ()) != -1;) - switch (c) { - case 'p': // Client logging daemon acceptor port number. - cld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 'r': // Server logging daemon acceptor port number. - sld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ())); - break; - case 's': // Server logging daemon hostname. - ACE_OS::strsncpy - (sld_host, get_opt.opt_arg (), MAXHOSTNAMELEN); - break; - } - - ACE_INET_Addr cld_addr (cld_port); - ACE_INET_Addr sld_addr (sld_port, sld_host); - - if (acceptor_.open (&handler_, cld_addr) == -1) - return -1; - else if (connector_.connect (&handler_, sld_addr) == -1) - { acceptor_.handle_close (); return -1; } - return 0; -} - - -int Client_Logging_Daemon::fini () { - acceptor_.handle_close (); - handler_.close (); - return 0; -} - - -ACE_FACTORY_DEFINE (CLD, Client_Logging_Daemon) |