diff options
Diffstat (limited to 'ACE/apps/Gateway/Peer/Peer.cpp')
-rw-r--r-- | ACE/apps/Gateway/Peer/Peer.cpp | 887 |
1 files changed, 887 insertions, 0 deletions
diff --git a/ACE/apps/Gateway/Peer/Peer.cpp b/ACE/apps/Gateway/Peer/Peer.cpp new file mode 100644 index 00000000000..233148682d3 --- /dev/null +++ b/ACE/apps/Gateway/Peer/Peer.cpp @@ -0,0 +1,887 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Signal.h" +#include "Peer.h" + +Peer_Handler::Peer_Handler (void) + : connection_id_ (-1), // Maybe it's better than 0. + msg_frag_ (0), + total_bytes_ (0) +{ + // Set the high water mark of the <ACE_Message_Queue>. This is used + // to exert flow control. + this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ()); + first_time_ = 1; // It will be first time to open Peer_Handler. +} + +// Upcall from the <ACE_Acceptor::handle_input> that turns control +// over to our application-specific Gateway handler. + +int +Peer_Handler::open (void *a) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("handle = %d\n"), + this->peer ().get_handle ())); + + // Call down to the base class to activate and register this handler + // with an <ACE_Reactor>. + if (this->inherited::open (a) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + -1); + + if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enable")), + -1); + + ACE_Time_Value timeout (Options::instance ()->timeout ()); + + // Schedule the time between disconnects. This should really be a + // "tunable" parameter. + if (ACE_Reactor::instance ()->schedule_timer + (this, 0, timeout) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("schedule_timer"))); + + // If there are events left in the queue, make sure we enable the + // <ACE_Reactor> appropriately to get them sent out. + if (this->msg_queue ()->is_empty () == 0 + && ACE_Reactor::instance ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("schedule_wakeup")), + -1); + + // First action is to wait to be notified of our connection id. + this->do_action_ = &Peer_Handler::await_connection_id; + return 0; +} + +int +Peer_Handler::transmit (ACE_Message_Block *mb, + size_t n, + int event_type) +{ + Event *event = (Event *) mb->rd_ptr (); + + // Initialize the header. + new (&event->header_) Event_Header (n, + this->connection_id_, + event_type, + 0); + + // Convert all the fields into network byte order. + event->header_.encode (); + + // Move the write pointer to the end of the event. + mb->wr_ptr (sizeof (Event_Header) + n); + + if (this->put (mb) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("gateway is flow controlled, so we're dropping events"))); + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed. + // Caller is responsible for freeing a ACE_Message_Block + // if failures occur. + mb->release (); + return -1; + } + return 0; +} + +// Read events from stdin and send them to the gatewayd. + +int +Peer_Handler::transmit_stdin (void) +{ + // If return value is -1, then first_time_ must be reset to 1. + int result = 0; + if (this->connection_id_ != -1) + { + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (sizeof (Event)), + -1); + + // Cast the message block payload into an <Event> pointer. + Event *event = (Event *) mb->rd_ptr (); + + ssize_t n = ACE_OS::read (ACE_STDIN, + event->data_, + sizeof event->data_); + switch (n) + { + case 0: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("stdin closing down\n"))); + + // Take stdin out of the ACE_Reactor so we stop trying to + // send events. + ACE_Reactor::instance ()->remove_handler + (ACE_STDIN, + ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK); + mb->release (); + result = 0; // + break; + /* NOTREACHED */ + case -1: + mb->release (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("read"))); + result = 0; // + break; + /* NOTREACHED */ + default: + // Do not return directly, save the return value. + result = this->transmit (mb, n, ROUTING_EVENT); + break; + /* NOTREACHED */ + } + + // Do not return at here, but at exit of function. + /*return 0;*/ + } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Must transmit over an opened channel.\n"))); + result = -1; // Save return value at here, return at exit of function. + } + // If transmit error, the stdin-thread will be cancelled, so should + // reset first_time_ to 1, which will register_stdin_handler again. + if (result == -1) + first_time_ = 1; + + return result; +} + +// Perform a non-blocking <put> of event MB. If we are unable to send +// the entire event the remainder is re-queue'd at the *front* of the +// Message_Queue. + +int +Peer_Handler::nonblk_put (ACE_Message_Block *mb) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via + // <handle_output>) when it is possible to try again. + + ssize_t n = this->send (mb); + + if (n == -1) + // -1 is returned only when things have really gone wrong (i.e., + // not when flow control occurs). + return -1; + else if (errno == EWOULDBLOCK) + { + // We didn't manage to send everything, so requeue. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("queueing activated on handle %d to connection id %d\n"), + this->get_handle (), + this->connection_id_)); + + // Re-queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (mb, + (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_head")), + -1); + // Tell ACE_Reactor to call us back when we can send again. + if (ACE_Reactor::instance ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("schedule_wakeup")), + -1); + return 0; + } + else + return n; +} + +// Finish sending a event when flow control conditions abate. This +// method is automatically called by the ACE_Reactor. + +int +Peer_Handler::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *mb = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("in handle_output\n"))); + + if (this->msg_queue ()->dequeue_head + (mb, + (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (mb)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + // Didn't write everything this time, come back later... + break; + /* NOTREACHED */ + case -1: + // Caller is responsible for freeing a ACE_Message_Block if + // failures occur. + mb->release (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("transmission failure in handle_output"))); + /* FALLTHROUGH */ + default: // Sent the whole thing. + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the <ACE_Message_Queue>. If there aren't, tell + // the <ACE_Reactor> not to notify us anymore (at least + // until there are new events queued up). + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("queue now empty on handle %d to connection id %d\n"), + this->get_handle (), + this->connection_id_)); + + if (ACE_Reactor::instance ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("cancel_wakeup"))); + } + } + return 0; + } + else + // If the list is empty there's a bug! + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + 0); +} + +// Send an event to a peer (may block if necessary). + +int +Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (mb); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Send an Peer event to gatewayd. + +int +Peer_Handler::send (ACE_Message_Block *mb) +{ + size_t len = mb->length (); + + ssize_t n = this->peer ().send (mb->rd_ptr (), len); + + if (n <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < (ssize_t) len) + { + // Re-adjust pointer to skip over the part we did send. + mb->rd_ptr (n); + this->total_bytes_ += n; + } + else // if (n == length). + { + // The whole event is sent, we can now safely deallocate the + // buffer. Note that this should decrement a reference count... + this->total_bytes_ += n; + mb->release (); + errno = 0; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"), + n, + this->total_bytes_)); + return n; +} + +// Receive an Event from gatewayd. Handles fragmentation. + +int +Peer_Handler::recv (ACE_Message_Block *&mb) +{ + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event)), + -1); + + Event *event = (Event *) this->msg_frag_->rd_ptr (); + ssize_t header_received = 0; + + const size_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = + HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + header_received = this->peer ().recv + (this->msg_frag_->wr_ptr (), + header_bytes_left_to_read); + + if (header_received == -1 /* error */ + || header_received == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Recv error during header read"))); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("attempted to read %d bytes\n"), + header_bytes_left_to_read)); + this->msg_frag_ = this->msg_frag_->release (); + return header_received; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (header_received); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Partial header received: only %d bytes\n"), + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + + // Convert the header into host byte order so that we can access + // it directly without having to repeatedly muck with it... + event->header_.decode (); + + if (event->header_.len_ > ACE_INT32 (sizeof event->data_)) + { + // This data_ payload is too big! + errno = EINVAL; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Data payload is too big (%d bytes)\n"), + event->header_.len_)); + return -1; + } + } + + // At this point there is a complete, valid header in Event. Now we + // need to get the event payload. Due to incomplete reads this may + // not be the first time we've read in a fragment for this message. + // We account for this here. Note that the first time in here + // <msg_frag_->wr_ptr> will point to <event->data_>. Every time we + // do a successful fragment read, we advance <wr_ptr>. Therefore, + // by subtracting how much we've already read from the + // <event->header_.len_> we complete the + // <data_bytes_left_to_read>... + + ssize_t data_bytes_left_to_read = + ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); + + // peer().recv() should not be called when data_bytes_left_to_read is 0. + ssize_t data_received = !data_bytes_left_to_read ? 0 : + this->peer ().recv (this->msg_frag_->wr_ptr (), + data_bytes_left_to_read); + + // Try to receive the remainder of the event. + + switch (data_received) + { + case -1: + if (errno == EWOULDBLOCK) + // This might happen if only the header came through. + return -1; + /* FALLTHROUGH */; + + case 0: // Premature EOF. + if (data_bytes_left_to_read) + { + this->msg_frag_ = this->msg_frag_->release (); + return 0; + } + /* FALLTHROUGH */; + + default: + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (data_received); + + if (data_received != data_bytes_left_to_read) + { + errno = EWOULDBLOCK; + // Inform caller that we didn't get the whole event. + return -1; + } + else + { + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + mb = this->msg_frag_; + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"), + event->header_.connection_id_, + event->header_.len_, + data_received + header_received)); + if (Options::instance ()->enabled (Options::VERBOSE)) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("data_ = %*s\n"), + event->header_.len_ - 2, + event->data_)); + return data_received + header_received; + } +} + +// Receive various types of input (e.g., Peer event from the gatewayd, +// as well as stdio). + +int +Peer_Handler::handle_input (ACE_HANDLE sd) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("in handle_input, sd = %d\n"), + sd)); + if (sd == ACE_STDIN) // Handle event from stdin. + return this->transmit_stdin (); + else + // Perform the appropriate action depending on the state we are + // in. + return (this->*do_action_) (); +} + +// Action that receives our connection id from the Gateway. + +int +Peer_Handler::await_connection_id (void) +{ + ssize_t n = this->peer ().recv (&this->connection_id_, + sizeof this->connection_id_); + + if (n != sizeof this->connection_id_) + { + if (n == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("gatewayd has closed down unexpectedly\n")), + -1); + else + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p, bytes received on handle %d = %d\n"), + ACE_TEXT ("recv"), + this->get_handle (), + n), + -1); + } + else + { + this->connection_id_ = ntohl (this->connection_id_); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("assigned connection id %d\n"), + this->connection_id_)); + } + + // Subscribe for events if we're a Consumer. + if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)) + this->subscribe (); + + // No need to disconnect by timeout. + ACE_Reactor::instance ()->cancel_timer(this); + // Transition to the action that waits for Peer events. + this->do_action_ = &Peer_Handler::await_events; + + // Reset standard input. + ACE_OS::rewind (stdin); + + // Call register_stdin_handler only once, until the stdin-thread + // closed which caused by transmit_stdin error. + if (first_time_) + { + // Register this handler to receive test events on stdin. + if (ACE_Event_Handler::register_stdin_handler + (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("register_stdin_handler")), + -1); + + // Next time in await_connection_id(), I'll don't call + // register_stdin_handler(). + first_time_ = 0; + } + return 0; +} + +int +Peer_Handler::subscribe (void) +{ + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (sizeof (Event)), + -1); + + Subscription *subscription = + (Subscription *) ((Event *) mb->rd_ptr ())->data_; + subscription->connection_id_ = + Options::instance ()->connection_id (); + + return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT); +} + +// Action that receives events from the Gateway. + +int +Peer_Handler::await_events (void) +{ + ACE_Message_Block *mb = 0; + + ssize_t n = this->recv (mb); + + switch (n) + { + case 0: + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("gatewayd has closed down\n")), + -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("recv")), + -1); + /* NOTREACHED */ + default: + { + // We got a valid event, so let's process it now! At the + // moment, we just print out the event contents... + + Event *event = (Event *) mb->rd_ptr (); + this->total_bytes_ += mb->length (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"), + event->header_.connection_id_, + event->header_.len_, + this->total_bytes_)); + if (Options::instance ()->enabled (Options::VERBOSE)) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("data_ = %*s\n"), + event->header_.len_ - 2, + event->data_)); + mb->release (); + return 0; + } + } +} + +// Periodically send events via ACE_Reactor timer mechanism. + +int +Peer_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // Shut down the handler. + return this->handle_close (); +} + +Peer_Handler::~Peer_Handler (void) +{ + // Shut down the handler. + this->handle_close (); +} + +// Handle shutdown of the Peer object. + +int +Peer_Handler::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) +{ + if (this->get_handle () != ACE_INVALID_HANDLE) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("shutting down Peer on handle %d\n"), + this->get_handle ())); + + ACE_Reactor_Mask mask = + ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK; + + // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor> + // removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL> + // instructs the ACE_Reactor *not* to call <handle_close>, which + // would otherwise lead to infinite recursion!). + ACE_Reactor::instance ()->remove_handler + (ACE_STDIN, mask); + + // Deregister this handler with the ACE_Reactor. + if (ACE_Reactor::instance ()->remove_handler + (this, mask) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("handle = %d: %p\n"), + this->get_handle (), + ACE_TEXT ("remove_handler")), + -1); + // Close down the peer. + this->peer ().close (); + } + return 0; +} + +int +Peer_Acceptor::start (u_short port) +{ + // This object only gets allocated once and is just recycled + // forever. + ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1); + + this->addr_.set (port); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("opening acceptor at port %d\n"), + port)); + + // Call down to the <Acceptor::open> method. + if (this->inherited::open (this->addr_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + -1); + else if (this->acceptor ().get_local_addr (this->addr_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_local_addr")), + -1); + else + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("accepting at port %d\n"), + this->addr_.get_port_number ())); + return 0; +} + +Peer_Acceptor::Peer_Acceptor (void) + : peer_handler_ (0) +{ +} + +int +Peer_Acceptor::close (void) +{ + // Will trigger a delete. + if (this->peer_handler_ != 0) + this->peer_handler_->destroy (); + + // Close down the base class. + return this->inherited::close (); +} + +// Note how this method just passes back the pre-allocated +// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new +// one each time! + +int +Peer_Acceptor::make_svc_handler (Peer_Handler *&sh) +{ + sh = this->peer_handler_; + return 0; +} + +int +Peer_Connector::open_connector (Peer_Handler *&peer_handler, + u_short port) +{ + // This object only gets allocated once and is just recycled + // forever. + ACE_NEW_RETURN (peer_handler, + Peer_Handler, + -1); + + ACE_INET_Addr addr (port, + Options::instance ()->connector_host ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("connecting to %s:%d\n"), + addr.get_host_name (), + addr.get_port_number ())); + + if (this->connect (peer_handler, addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("connect")), + -1); + else + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("connected to %C:%d\n"), + addr.get_host_name (), + addr.get_port_number ())); + return 0; +} + +int +Peer_Connector::open (ACE_Reactor *, int) +{ + this->supplier_peer_handler_ = 0; + this->consumer_peer_handler_ = 0; + + if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR) + && this->open_connector (this->supplier_peer_handler_, + Options::instance ()->supplier_connector_port ()) == -1) + return -1; + + if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR) + && this->open_connector (this->consumer_peer_handler_, + Options::instance ()->consumer_connector_port ()) == -1) + return -1; + + return 0; +} + +int +Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + if (signum != SIGPIPE) + { + // Shut down the main event loop. + ACE_DEBUG((LM_NOTICE, ACE_TEXT ("Exit case signal\n"))); // Why do I exit? + ACE_Reactor::instance ()->end_reactor_event_loop(); + } + + return 0; +} + +// Returns information on the currently active service. + +int +Peer_Factory::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + ACE_TCHAR consumer_addr_str[BUFSIZ]; + ACE_TCHAR supplier_addr_str[BUFSIZ]; + + ACE_INET_Addr addr; + + if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1) + return -1; + else if (addr.addr_to_string (consumer_addr_str, + sizeof addr) == -1) + return -1; + else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1) + return -1; + else if (addr.addr_to_string (supplier_addr_str, + sizeof addr) == -1) + return -1; + + ACE_OS::strcpy (buf, ACE_TEXT ("peerd\t C:")); + ACE_OS::strcat (buf, consumer_addr_str); + ACE_OS::strcat (buf, ACE_TEXT ("|S:")); + ACE_OS::strcat (buf, supplier_addr_str); + ACE_OS::strcat + (buf, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n")); + + if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, buf, length); + return ACE_OS::strlen (buf); +} + +// Hook called by the explicit dynamic linking facility to terminate +// the peer. + +int +Peer_Factory::fini (void) +{ + this->consumer_acceptor_.close (); + this->supplier_acceptor_.close (); + return 0; +} + +// Hook called by the explicit dynamic linking facility to initialize +// the peer. + +int +Peer_Factory::init (int argc, ACE_TCHAR *argv[]) +{ + Options::instance ()->parse_args (argc, argv); + + ACE_Sig_Set sig_set; + + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + sig_set.sig_add (SIGPIPE); + + // Register ourselves to receive signals so we can shut down + // gracefully. + + if (ACE_Reactor::instance ()->register_handler (sig_set, + this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_handler")), + -1); + + if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR) + && this->supplier_acceptor_.start + (Options::instance ()->supplier_acceptor_port ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Acceptor::open")), + -1); + else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR) + && this->consumer_acceptor_.start + (Options::instance ()->consumer_acceptor_port ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Acceptor::open")), + -1); + else if (this->connector_.open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Connector::open")), + -1); + return 0; +} + +// The following is a "Factory" used by the <ACE_Service_Config> and +// svc.conf file to dynamically initialize the <Peer_Acceptor> and +// <Peer_Connector>. + +ACE_SVC_FACTORY_DEFINE (Peer_Factory) + |