diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
commit | c44379cc7d9c7aa113989237ab0f56db12aa5219 (patch) | |
tree | 66a84b20d47f2269d8bdc6e0323f338763424d3a /ACE/netsvcs/lib/Client_Logging_Handler.cpp | |
parent | 3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (diff) | |
download | ATCD-c44379cc7d9c7aa113989237ab0f56db12aa5219.tar.gz |
Repo restructuring
Diffstat (limited to 'ACE/netsvcs/lib/Client_Logging_Handler.cpp')
-rw-r--r-- | ACE/netsvcs/lib/Client_Logging_Handler.cpp | 654 |
1 files changed, 654 insertions, 0 deletions
diff --git a/ACE/netsvcs/lib/Client_Logging_Handler.cpp b/ACE/netsvcs/lib/Client_Logging_Handler.cpp new file mode 100644 index 00000000000..eb00b21d89c --- /dev/null +++ b/ACE/netsvcs/lib/Client_Logging_Handler.cpp @@ -0,0 +1,654 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/Get_Opt.h" +#include "ace/Acceptor.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SPIPE_Acceptor.h" +#include "ace/Log_Record.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_sys_socket.h" +#include "ace/OS_NS_unistd.h" +#include "ace/CDR_Stream.h" +#include "ace/Auto_Ptr.h" +#include "Client_Logging_Handler.h" + +ACE_RCSID(lib, Client_Logging_Handler, "$Id$") + +ACE_Client_Logging_Handler::ACE_Client_Logging_Handler (ACE_HANDLE output_handle) + : logging_output_ (output_handle) +{ + // Register ourselves to receive SIGPIPE so we can attempt + // reconnections. +#if !defined (ACE_LACKS_UNIX_SIGNALS) + if (ACE_Reactor::instance ()->register_handler (SIGPIPE, + this) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%n: %p\n"), + ACE_TEXT ("register_handler (SIGPIPE)"))); +#endif /* !ACE_LACKS_UNIX_SIGNALS */ +} + +// This is called when a <send> to the logging server fails... + +int +ACE_Client_Logging_Handler::handle_signal (int signum, + siginfo_t *, + ucontext_t *) +{ + if (signum == SIGPIPE) + return 0; + else + return -1; +} + +// This function is called every time a client connects to us. + +int +ACE_Client_Logging_Handler::open (void *) +{ + LOGGING_ADDR server_addr; + + // Register ourselves to receive <handle_input> callbacks when + // clients send us logging records. Note that since we're really a + // Singleton, this->peer() will change after each connect, so we + // need to grab the value now. + if (ACE_Reactor::instance ()->register_handler + (this->peer ().get_handle (), + this, + ACE_Event_Handler::READ_MASK + | ACE_Event_Handler::EXCEPT_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%n: %p\n"), + ACE_TEXT ("register_handler")), + -1); + // Figure out what remote port we're really bound to. + if (this->peer ().get_remote_addr (server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_remote_addr")), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("connected to client on handle %u\n"), + this->peer ().get_handle ())); + return 0; +} + +/* VIRTUAL */ ACE_HANDLE +ACE_Client_Logging_Handler::get_handle (void) const +{ + ACE_TRACE ("ACE_Client_Logging_Handler::get_handle"); + + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("get_handle() shouldn't be called\n"))); + + return ACE_INVALID_HANDLE; +} + +// Receive a logging record from an application. + +int +ACE_Client_Logging_Handler::handle_input (ACE_HANDLE handle) +{ +#if 0 + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("in ACE_Client_Logging_Handler::handle_input, handle = %u\n"), + handle)); +#endif /* 0 */ + + if (handle == this->logging_output_) + // We're getting a message from the logging server! + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("received data from server!\n")), + -1); + ACE_Log_Record log_record; + + // We need to use the old two-read trick here since TCP sockets + // don't support framing natively. Allocate a message block for the + // payload; initially at least large enough to hold the header, but + // needs some room for alignment. + ACE_Message_Block *payload_p = 0; + ACE_Message_Block *header_p = 0; + ACE_NEW_RETURN (header_p, + ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE), + -1); + + auto_ptr <ACE_Message_Block> header (header_p); + + // Align the Message Block for a CDR stream + ACE_CDR::mb_align (header.get ()); + + ACE_CDR::Boolean byte_order; + ACE_CDR::ULong length; + +#if defined (ACE_HAS_STREAM_PIPES) + // We're getting a logging message from a local application using + // STREAM pipes, which are nicely prioritized for us. + + ACE_Str_Buf header_msg (header->wr_ptr (), + 0, + 8); + + ACE_SPIPE_Stream spipe; + spipe.set_handle (handle); + int flags = 0; + + // We've got a framed IPC mechanism, so we can just to a <recv>. + int result = spipe.recv (&header_msg, + (ACE_Str_Buf *) 0, + &flags); + + if (result < 0 || header_msg.len == 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("client closing down unexpectedly\n"))); + + if (ACE_Reactor::instance ()->remove_handler + (handle, + ACE_Event_Handler::READ_MASK + | ACE_Event_Handler::EXCEPT_MASK + | ACE_Event_Handler::DONT_CALL) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%n: %p\n"), + ACE_TEXT ("remove_handler")), + -1); + spipe.close (); + return 0; + } +#else + // We're getting a logging message from a local application using + // sockets pipes, which are NOT prioritized for us. + + ssize_t count = ACE::recv_n (handle, + header->wr_ptr (), + 8); + switch (count) + { + // Handle shutdown and error cases. + default: + case -1: + case 0: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("client closing down\n"))); + + if (ACE_Reactor::instance ()->remove_handler + (handle, + ACE_Event_Handler::READ_MASK + | ACE_Event_Handler::EXCEPT_MASK + | ACE_Event_Handler::DONT_CALL) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%n: %p\n"), + ACE_TEXT ("remove_handler")), + 0); + if (handle == this->peer ().get_handle ()) + this->peer ().close (); + else + ACE_OS::closesocket (handle); + // Release the memory to prevent a leak. + return 0; + /* NOTREACHED */ + + case 8: + // Just fall through in this case.. + break; + } +#endif /* ACE_HAS_STREAM_PIPES */ + + // Reflect addition of 8 bytes for the header. + header->wr_ptr (8); + + // Create a CDR stream to parse the 8-byte header. + ACE_InputCDR header_cdr (header.get ()); + + // Extract the byte-order and use helper methods to disambiguate + // octet, booleans, and chars. + header_cdr >> ACE_InputCDR::to_boolean (byte_order); + + // Set the byte-order on the stream... + header_cdr.reset_byte_order (byte_order); + + // Extract the length + header_cdr >> length; + + ACE_NEW_RETURN (payload_p, + ACE_Message_Block (length), + -1); + auto_ptr <ACE_Message_Block> payload (payload_p); + + // Ensure there's sufficient room for log record payload. + ACE_CDR::grow (payload.get (), 8 + ACE_CDR::MAX_ALIGNMENT + length); + +#if defined (ACE_HAS_STREAM_PIPES) + ACE_Str_Buf payload_msg (payload->wr_ptr (), + 0, + length); + + // We've got a framed IPC mechanism, so we can just do a <recv>. + result = spipe.recv ((ACE_Str_Buf *) 0, + &payload_msg, + &flags); + + if (result < 0 || payload_msg.len == 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%p\n"), + ACE_TEXT ("client closing down due to error\n"))); + + if (ACE_Reactor::instance ()->remove_handler + (handle, + ACE_Event_Handler::READ_MASK + | ACE_Event_Handler::EXCEPT_MASK + | ACE_Event_Handler::DONT_CALL) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%n: %p\n"), + ACE_TEXT ("remove_handler")), + -1); + spipe.close (); + return 0; + } +#else + // Use <recv_n> to obtain the contents. + if (ACE::recv_n (handle, payload->wr_ptr (), length) <= 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("recv_n()"))); + + if (ACE_Reactor::instance ()->remove_handler + (handle, + ACE_Event_Handler::READ_MASK + | ACE_Event_Handler::EXCEPT_MASK + | ACE_Event_Handler::DONT_CALL) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%n: %p\n"), + ACE_TEXT ("remove_handler"))); + + ACE_OS::closesocket (handle); + return 0; + } +#endif /* ACE_HAS_STREAM_PIPES */ + + // Reflect additional bytes for the message. + payload->wr_ptr (length); + + ACE_InputCDR payload_cdr (payload.get ()); + payload_cdr.reset_byte_order (byte_order); + payload_cdr >> log_record; // Finally extract the <ACE_log_record>. + + log_record.length (length); + + // Forward the logging record to the server. + if (this->send (log_record) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("send"))); + return 0; +} + +// Receive a logging record from an application send via a non-0 +// MSG_BAND... This just calls handle_input(). + +int +ACE_Client_Logging_Handler::handle_exception (ACE_HANDLE handle) +{ + return this->handle_input (handle); +} + +// Called when object is removed from the ACE_Reactor + +int +ACE_Client_Logging_Handler::close (u_long) +{ + if (this->logging_output_ != ACE_STDERR) + ACE_OS::closesocket (this->logging_output_); + + this->destroy (); + return 0; +} + +int +ACE_Client_Logging_Handler::handle_output (ACE_HANDLE) +{ + return 0; +} + +// Encodes the contents of log_record object using network byte-order +// and sends it to the logging server. + +int +ACE_Client_Logging_Handler::send (ACE_Log_Record &log_record) +{ + ostream *orig_ostream = ACE_Log_Msg::instance ()->msg_ostream (); + + // This logic must occur before we do the encode() on <log_record> + // since otherwise the values of the <log_record> fields will be in + // network byte order. + + if (orig_ostream) + log_record.print (ACE_TEXT ("<localhost>"), + ACE_Log_Msg::instance ()->flags (), + *orig_ostream); + + if (this->logging_output_ == ACE_STDERR) + log_record.print (ACE_TEXT ("<localhost>"), + ACE_Log_Msg::instance ()->flags (), + stderr); + else + { + // Serialize the log record using a CDR stream, allocate enough + // space for the complete <ACE_Log_Record>. + const size_t max_payload_size = + 4 // type() + + 8 // timestamp + + 4 // process id + + 4 // data length + + ACE_Log_Record::MAXLOGMSGLEN // data + + ACE_CDR::MAX_ALIGNMENT; // padding; + + // Insert contents of <log_record> into payload stream. + ACE_OutputCDR payload (max_payload_size); + payload << log_record; + + // Get the number of bytes used by the CDR stream. + ACE_CDR::ULong length = payload.total_length (); + + // Send a header so the receiver can determine the byte order and + // size of the incoming CDR stream. + ACE_OutputCDR header (ACE_CDR::MAX_ALIGNMENT + 8); + header << ACE_OutputCDR::from_boolean (ACE_CDR_BYTE_ORDER); + + // Store the size of the payload that follows + header << ACE_CDR::ULong (length); + + // Use an iovec to send both buffer and payload simultaneously. + iovec iov[2]; + iov[0].iov_base = header.begin ()->rd_ptr (); + iov[0].iov_len = 8; + iov[1].iov_base = payload.begin ()->rd_ptr (); + iov[1].iov_len = length; + + // We're running over sockets, so send header and payload + // efficiently using "gather-write". + if (ACE::sendv_n (this->logging_output_,iov, 2) == -1) + { + ACE_DEBUG ((LM_DEBUG, + "Something about the sendv_n() failed, so switch to stderr\n")); + if (ACE_Log_Msg::instance ()->msg_ostream () == 0) + // Switch over to logging to stderr for now. At some + // point, we'll improve the implementation to queue up the + // message, try to reestablish a connection, and then send + // the queued data once we've reconnect to the logging + // server. If you'd like to implement this functionality + // and contribute it back to ACE that would be great! + this->logging_output_ = ACE_STDERR; + } + else + ACE_DEBUG ((LM_DEBUG, + "Sent logging message %s successfully to Server Logging Daemon!\n", + log_record.priority_name (ACE_Log_Priority (log_record.type ())))); + } + + return 0; +} + +class ACE_Client_Logging_Acceptor : public ACE_Acceptor<ACE_Client_Logging_Handler, LOGGING_ACCEPTOR> +{ + // = TITLE + // This factory creates connections with the + // <Server_Logging_Acceptor>. + // + // = DESCRIPTION + // This class contains the service-specific methods that can't + // easily be factored into the <ACE_Acceptor>. +public: + // = Initialization method. + ACE_Client_Logging_Acceptor (void); + // Default constructor. + +protected: + // = Dynamic linking hooks. + virtual int init (int argc, ACE_TCHAR *argv[]); + // Called when service is linked. + + virtual int fini (void); + // Called when service is unlinked. + + virtual int info (ACE_TCHAR **strp, size_t length) const; + // Called to determine info about the service. + + virtual int make_svc_handler (ACE_Client_Logging_Handler *&sh); + // Factory that always returns the <handler_>. + + // = Scheduling hooks. + virtual int suspend (void); + virtual int resume (void); + +private: + int parse_args (int argc, ACE_TCHAR *argv[]); + // Parse svc.conf arguments. + + const ACE_TCHAR *server_host_; + // Host where the logging server is located. + + u_short server_port_; + // Port number where the logging server is listening for + // connections. + + ACE_INET_Addr server_addr_; + // Address of the client logging daemon. + + const ACE_TCHAR *logger_key_; + // Communication endpoint where the client logging daemon will + // listen for connections from clients. + + ACE_Client_Logging_Handler *handler_; + // Pointer to the singleton handler that receives messages from + // clients and forwards to the server. +}; + +int +ACE_Client_Logging_Acceptor::fini (void) +{ + this->close (); + + if (this->handler_ != 0) + this->handler_->close (0); + + // Try to unlink the logger key so weird things don't happen if + // we're using STREAM pipes. + ACE_OS::unlink (this->logger_key_); + + // This memory was allocated by <ACE_OS::strdup>. + ACE_OS::free ((void *) this->logger_key_); + ACE_OS::free ((void *) this->server_host_); + + return 0; +} + +int +ACE_Client_Logging_Acceptor::make_svc_handler (ACE_Client_Logging_Handler *&sh) +{ + // Always return a pointer to the Singleton handler. + sh = this->handler_; + return 0; +} + +int +ACE_Client_Logging_Acceptor::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + + ACE_OS::sprintf (buf, ACE_TEXT ("%d/%s %s"), + this->server_addr_.get_port_number (), "tcp", + "# client logging daemon\n"); + + if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, buf, length); + return ACE_OS::strlen (buf); +} + +ACE_Client_Logging_Acceptor::ACE_Client_Logging_Acceptor (void) + : server_host_ (ACE_OS::strdup (ACE_DEFAULT_SERVER_HOST)), + server_port_ (ACE_DEFAULT_LOGGING_SERVER_PORT), + logger_key_ (ACE_OS::strdup (ACE_DEFAULT_LOGGER_KEY)), + handler_ (0) +{ +} + +int +ACE_Client_Logging_Acceptor::init (int argc, ACE_TCHAR *argv[]) +{ + // We'll log *our* error and debug messages to stderr! + if (ACE_LOG_MSG->open (ACE_TEXT ("Client Logging Service")) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Can't open ACE_Log_Msg\n")), + -1); + + // Use the options hook to parse the command line arguments and set + // options. + this->parse_args (argc, argv); + + // Try to unlink the logger key so weird things don't happen if + // we're using STREAM pipes. + ACE_OS::unlink (this->logger_key_); + + // Initialize the acceptor endpoint. + if (this->open (LOGGING_ADDR (this->logger_key_)) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + this->logger_key_), + -1); + // Establish connection with the server. + ACE_SOCK_Connector con; + ACE_SOCK_Stream stream; + ACE_INET_Addr server_addr; + +#if defined (ACE_HAS_STREAM_PIPES) + ACE_SPIPE_Addr lserver_addr; + + // Figure out what local port we're really bound to. + if (this->acceptor ().get_local_addr (lserver_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_local_addr")), + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("starting up Client Logging Daemon, ") + ACE_TEXT ("bounded to STREAM addr %s on handle %u\n"), + lserver_addr.get_path_name (), + this->acceptor ().get_handle ())); +#else + ACE_INET_Addr lserver_addr; + + // Figure out what local port we're really bound to. + if (this->acceptor ().get_local_addr (lserver_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_local_addr")), + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("starting up Client Logging Daemon, ") + ACE_TEXT ("bounded to local port %d on handle %u\n"), + lserver_addr.get_port_number (), + this->acceptor ().get_handle ())); +#endif /* ACE_HAS_STREAM_PIPES */ + + if (con.connect (stream, + this->server_addr_) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("can't connect to logging server %s on port %d: ") + ACE_TEXT ("%m, using stderr\n"), + this->server_addr_.get_host_name (), + this->server_addr_.get_port_number (), + errno)); + if (ACE_Log_Msg::instance ()->msg_ostream () == 0) + // If we can't connect to the server then we'll send the logging + // messages to stderr. + stream.set_handle (ACE_STDERR); + } + else + { + // Figure out what remote port we're really bound to. + if (stream.get_remote_addr (server_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_remote_addr")), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Client Logging Daemon is connected to Server Logging Daemon %s on port %d on handle %u\n"), + server_addr.get_host_name (), + server_addr.get_port_number (), + stream.get_handle ())); + } + + // Create the Singleton <Client_Logging_Handler>. + ACE_NEW_RETURN (this->handler_, + ACE_Client_Logging_Handler (stream.get_handle ()), + -1); + return 0; +} + +int +ACE_Client_Logging_Acceptor::parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:k:p:"), 0); + + for (int c; (c = get_opt ()) != -1; ) + { + switch (c) + { + case 'h': + ACE_OS::free ((void *) this->server_host_); + this->server_host_ = ACE_OS::strdup (get_opt.opt_arg ()); + break; + case 'k': + ACE_OS::free ((void *) this->logger_key_); + this->logger_key_ = ACE_OS::strdup (get_opt.opt_arg ()); + break; + case 'p': + this->server_port_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%n:\n[-p server-port]\n%a"), 1), + -1); + } + } + + if (this->server_addr_.set (this->server_port_, + this->server_host_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("set")), + -1); + return 0; +} + +int +ACE_Client_Logging_Acceptor::suspend (void) +{ + // To be done... + return 0; +} + +int +ACE_Client_Logging_Acceptor::resume (void) +{ + // To be done... + return 0; +} + +// The following is a "Factory" used by the ACE_Service_Config and +// svc.conf file to dynamically initialize the state of the +// single-threaded logging server. + +ACE_SVC_FACTORY_DEFINE (ACE_Client_Logging_Acceptor) + |