summaryrefslogtreecommitdiff
path: root/apps/Gateway/Peer/Peer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Peer/Peer.cpp')
-rw-r--r--apps/Gateway/Peer/Peer.cpp903
1 files changed, 0 insertions, 903 deletions
diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp
deleted file mode 100644
index 074d1479c61..00000000000
--- a/apps/Gateway/Peer/Peer.cpp
+++ /dev/null
@@ -1,903 +0,0 @@
-// $Id$
-
-#define ACE_BUILD_SVC_DLL
-
-#include "ace/OS_NS_stdio.h"
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_unistd.h"
-#include "ace/Signal.h"
-#include "Peer.h"
-
-ACE_RCSID(Peer, Peer, "$Id$")
-
-Peer_Handler::Peer_Handler (void)
- : connection_id_ (-1), // Maybe it's better than 0.
- msg_frag_ (0),
- total_bytes_ (0)
-{
- // Set the high water mark of the <ACE_Message_Queue>. This is used
- // to exert flow control.
- this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
- first_time_ = 1; // It will be first time to open Peer_Handler.
-}
-
-// Upcall from the <ACE_Acceptor::handle_input> that turns control
-// over to our application-specific Gateway handler.
-
-int
-Peer_Handler::open (void *a)
-{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("handle = %d\n"),
- this->peer ().get_handle ()));
-
- // Call down to the base class to activate and register this handler
- // with an <ACE_Reactor>.
- if (this->inherited::open (a) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("open")),
- -1);
-
- if (this->peer ().enable (ACE_NONBLOCK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("enable")),
- -1);
-
- ACE_Time_Value timeout (Options::instance ()->timeout ());
-
- // Schedule the time between disconnects. This should really be a
- // "tunable" parameter.
- if (ACE_Reactor::instance ()->schedule_timer
- (this, 0, timeout) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("schedule_timer")));
-
- // If there are events left in the queue, make sure we enable the
- // <ACE_Reactor> appropriately to get them sent out.
- if (this->msg_queue ()->is_empty () == 0
- && ACE_Reactor::instance ()->schedule_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("schedule_wakeup")),
- -1);
-
- // First action is to wait to be notified of our connection id.
- this->do_action_ = &Peer_Handler::await_connection_id;
- return 0;
-}
-
-int
-Peer_Handler::transmit (ACE_Message_Block *mb,
- size_t n,
- int event_type)
-{
- Event *event = (Event *) mb->rd_ptr ();
-
- // Initialize the header.
- new (&event->header_) Event_Header (n,
- this->connection_id_,
- event_type,
- 0);
-
- // Convert all the fields into network byte order.
- event->header_.encode ();
-
- // Move the write pointer to the end of the event.
- mb->wr_ptr (sizeof (Event_Header) + n);
-
- if (this->put (mb) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("gateway is flow controlled, so we're dropping events")));
- else
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed.
- // Caller is responsible for freeing a ACE_Message_Block
- // if failures occur.
- mb->release ();
- return -1;
- }
- return 0;
-}
-
-// Read events from stdin and send them to the gatewayd.
-
-int
-Peer_Handler::transmit_stdin (void)
-{
- // If return value is -1, then first_time_ must be reset to 1.
- int result = 0;
- if (this->connection_id_ != -1)
- {
- ACE_Message_Block *mb;
-
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- // Cast the message block payload into an <Event> pointer.
- Event *event = (Event *) mb->rd_ptr ();
-
- ssize_t n = ACE_OS::read (ACE_STDIN,
- event->data_,
- sizeof event->data_);
- switch (n)
- {
- case 0:
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("stdin closing down\n")));
-
- // Take stdin out of the ACE_Reactor so we stop trying to
- // send events.
- ACE_Reactor::instance ()->remove_handler
- (ACE_STDIN,
- ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK);
- mb->release ();
- result = 0; //
- break;
- /* NOTREACHED */
- case -1:
- mb->release ();
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("read")));
- result = 0; //
- break;
- /* NOTREACHED */
- default:
- // Do not return directly, save the return value.
- result = this->transmit (mb, n, ROUTING_EVENT);
- break;
- /* NOTREACHED */
- }
-
- // Do not return at here, but at exit of function.
- /*return 0;*/
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Must transmit over an opened channel.\n")));
- result = -1; // Save return value at here, return at exit of function.
- }
- // If transmit error, the stdin-thread will be cancelled, so should
- // reset first_time_ to 1, which will register_stdin_handler again.
- if (result == -1)
- first_time_ = 1;
-
- return result;
-}
-
-// Perform a non-blocking <put> of event MB. If we are unable to send
-// the entire event the remainder is re-queue'd at the *front* of the
-// Message_Queue.
-
-int
-Peer_Handler::nonblk_put (ACE_Message_Block *mb)
-{
- // Try to send the event. If we don't send it all (e.g., due to
- // flow control), then re-queue the remainder at the head of the
- // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via
- // <handle_output>) when it is possible to try again.
-
- ssize_t n = this->send (mb);
-
- if (n == -1)
- // -1 is returned only when things have really gone wrong (i.e.,
- // not when flow control occurs).
- return -1;
- else if (errno == EWOULDBLOCK)
- {
- // We didn't manage to send everything, so requeue.
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("queueing activated on handle %d to connection id %d\n"),
- this->get_handle (),
- this->connection_id_));
-
- // Re-queue in *front* of the list to preserve order.
- if (this->msg_queue ()->enqueue_head
- (mb,
- (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("enqueue_head")),
- -1);
- // Tell ACE_Reactor to call us back when we can send again.
- if (ACE_Reactor::instance ()->schedule_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("schedule_wakeup")),
- -1);
- return 0;
- }
- else
- return n;
-}
-
-// Finish sending a event when flow control conditions abate. This
-// method is automatically called by the ACE_Reactor.
-
-int
-Peer_Handler::handle_output (ACE_HANDLE)
-{
- ACE_Message_Block *mb = 0;
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("in handle_output\n")));
-
- if (this->msg_queue ()->dequeue_head
- (mb,
- (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
- {
- switch (this->nonblk_put (mb))
- {
- case 0: // Partial send.
- ACE_ASSERT (errno == EWOULDBLOCK);
- // Didn't write everything this time, come back later...
- break;
- /* NOTREACHED */
- case -1:
- // Caller is responsible for freeing a ACE_Message_Block if
- // failures occur.
- mb->release ();
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("transmission failure in handle_output")));
- /* FALLTHROUGH */
- default: // Sent the whole thing.
- // If we succeed in writing the entire event (or we did not
- // fail due to EWOULDBLOCK) then check if there are more
- // events on the <ACE_Message_Queue>. If there aren't, tell
- // the <ACE_Reactor> not to notify us anymore (at least
- // until there are new events queued up).
-
- if (this->msg_queue ()->is_empty ())
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("queue now empty on handle %d to connection id %d\n"),
- this->get_handle (),
- this->connection_id_));
-
- if (ACE_Reactor::instance ()->cancel_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("cancel_wakeup")));
- }
- }
- return 0;
- }
- else
- // If the list is empty there's a bug!
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("dequeue_head")),
- 0);
-}
-
-// Send an event to a peer (may block if necessary).
-
-int
-Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
-{
- if (this->msg_queue ()->is_empty ())
- // Try to send the event *without* blocking!
- return this->nonblk_put (mb);
- else
- // If we have queued up events due to flow control then just
- // enqueue and return.
- return this->msg_queue ()->enqueue_tail
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-// Send an Peer event to gatewayd.
-
-int
-Peer_Handler::send (ACE_Message_Block *mb)
-{
- size_t len = mb->length ();
-
- ssize_t n = this->peer ().send (mb->rd_ptr (), len);
-
- if (n <= 0)
- return errno == EWOULDBLOCK ? 0 : n;
- else if (n < (ssize_t) len)
- {
- // Re-adjust pointer to skip over the part we did send.
- mb->rd_ptr (n);
- this->total_bytes_ += n;
- }
- else // if (n == length).
- {
- // The whole event is sent, we can now safely deallocate the
- // buffer. Note that this should decrement a reference count...
- this->total_bytes_ += n;
- mb->release ();
- errno = 0;
- }
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"),
- n,
- this->total_bytes_));
- return n;
-}
-
-// Receive an Event from gatewayd. Handles fragmentation.
-
-int
-Peer_Handler::recv (ACE_Message_Block *&mb)
-{
- if (this->msg_frag_ == 0)
- // No existing fragment...
- ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- Event *event = (Event *) this->msg_frag_->rd_ptr ();
- ssize_t header_received = 0;
-
- const size_t HEADER_SIZE = sizeof (Event_Header);
- ssize_t header_bytes_left_to_read =
- HEADER_SIZE - this->msg_frag_->length ();
-
- if (header_bytes_left_to_read > 0)
- {
- header_received = this->peer ().recv
- (this->msg_frag_->wr_ptr (),
- header_bytes_left_to_read);
-
- if (header_received == -1 /* error */
- || header_received == 0 /* EOF */)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Recv error during header read")));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("attempted to read %d bytes\n"),
- header_bytes_left_to_read));
- this->msg_frag_ = this->msg_frag_->release ();
- return header_received;
- }
-
- // Bump the write pointer by the amount read.
- this->msg_frag_->wr_ptr (header_received);
-
- // At this point we may or may not have the ENTIRE header.
- if (this->msg_frag_->length () < HEADER_SIZE)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Partial header received: only %d bytes\n"),
- this->msg_frag_->length ()));
- // Notify the caller that we didn't get an entire event.
- errno = EWOULDBLOCK;
- return -1;
- }
-
- // Convert the header into host byte order so that we can access
- // it directly without having to repeatedly muck with it...
- event->header_.decode ();
-
- if (event->header_.len_ > ACE_INT32 (sizeof event->data_))
- {
- // This data_ payload is too big!
- errno = EINVAL;
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Data payload is too big (%d bytes)\n"),
- event->header_.len_));
- return -1;
- }
- }
-
- // At this point there is a complete, valid header in Event. Now we
- // need to get the event payload. Due to incomplete reads this may
- // not be the first time we've read in a fragment for this message.
- // We account for this here. Note that the first time in here
- // <msg_frag_->wr_ptr> will point to <event->data_>. Every time we
- // do a successful fragment read, we advance <wr_ptr>. Therefore,
- // by subtracting how much we've already read from the
- // <event->header_.len_> we complete the
- // <data_bytes_left_to_read>...
-
- ssize_t data_bytes_left_to_read =
- ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
-
- // peer().recv() should not be called when data_bytes_left_to_read is 0.
- ssize_t data_received = !data_bytes_left_to_read ? 0 :
- this->peer ().recv (this->msg_frag_->wr_ptr (),
- data_bytes_left_to_read);
-
- // Try to receive the remainder of the event.
-
- switch (data_received)
- {
- case -1:
- if (errno == EWOULDBLOCK)
- // This might happen if only the header came through.
- return -1;
- else
- /* FALLTHROUGH */;
-
- case 0: // Premature EOF.
- if (data_bytes_left_to_read)
- {
- this->msg_frag_ = this->msg_frag_->release ();
- return 0;
- }
- /* FALLTHROUGH */;
-
- default:
- // Set the write pointer at 1 past the end of the event.
- this->msg_frag_->wr_ptr (data_received);
-
- if (data_received != data_bytes_left_to_read)
- {
- errno = EWOULDBLOCK;
- // Inform caller that we didn't get the whole event.
- return -1;
- }
- else
- {
- // Set the read pointer to the beginning of the event.
- this->msg_frag_->rd_ptr (this->msg_frag_->base ());
-
- mb = this->msg_frag_;
-
- // Reset the pointer to indicate we've got an entire event.
- this->msg_frag_ = 0;
- }
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"),
- event->header_.connection_id_,
- event->header_.len_,
- data_received + header_received));
- if (Options::instance ()->enabled (Options::VERBOSE))
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("data_ = %*s\n"),
- event->header_.len_ - 2,
- event->data_));
- return data_received + header_received;
- }
-}
-
-// Receive various types of input (e.g., Peer event from the gatewayd,
-// as well as stdio).
-
-int
-Peer_Handler::handle_input (ACE_HANDLE sd)
-{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("in handle_input, sd = %d\n"),
- sd));
- if (sd == ACE_STDIN) // Handle event from stdin.
- return this->transmit_stdin ();
- else
- // Perform the appropriate action depending on the state we are
- // in.
- return (this->*do_action_) ();
-}
-
-// Action that receives our connection id from the Gateway.
-
-int
-Peer_Handler::await_connection_id (void)
-{
- ssize_t n = this->peer ().recv (&this->connection_id_,
- sizeof this->connection_id_);
-
- if (n != sizeof this->connection_id_)
- {
- if (n == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("gatewayd has closed down unexpectedly\n")),
- -1);
- else
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p, bytes received on handle %d = %d\n"),
- ACE_TEXT ("recv"),
- this->get_handle (),
- n),
- -1);
- }
- else
- {
- this->connection_id_ = ntohl (this->connection_id_);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("assigned connection id %d\n"),
- this->connection_id_));
- }
-
- // Subscribe for events if we're a Consumer.
- if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))
- this->subscribe ();
-
- // No need to disconnect by timeout.
- ACE_Reactor::instance ()->cancel_timer(this);
- // Transition to the action that waits for Peer events.
- this->do_action_ = &Peer_Handler::await_events;
-
- // Reset standard input.
- ACE_OS::rewind (stdin);
-
- // Call register_stdin_handler only once, until the stdin-thread
- // closed which caused by transmit_stdin error.
- if (first_time_)
- {
- // Register this handler to receive test events on stdin.
- if (ACE_Event_Handler::register_stdin_handler
- (this,
- ACE_Reactor::instance (),
- ACE_Thread_Manager::instance ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("register_stdin_handler")),
- -1);
-
- // Next time in await_connection_id(), I'll don't call
- // register_stdin_handler().
- first_time_ = 0;
- }
- return 0;
-}
-
-int
-Peer_Handler::subscribe (void)
-{
- ACE_Message_Block *mb;
-
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- Subscription *subscription =
- (Subscription *) ((Event *) mb->rd_ptr ())->data_;
- subscription->connection_id_ =
- Options::instance ()->connection_id ();
-
- return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);
-}
-
-// Action that receives events from the Gateway.
-
-int
-Peer_Handler::await_events (void)
-{
- ACE_Message_Block *mb = 0;
-
- ssize_t n = this->recv (mb);
-
- switch (n)
- {
- case 0:
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("gatewayd has closed down\n")),
- -1);
- /* NOTREACHED */
- case -1:
- if (errno == EWOULDBLOCK)
- // A short-read, we'll come back and finish it up later on!
- return 0;
- else
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("recv")),
- -1);
- /* NOTREACHED */
- default:
- {
- // We got a valid event, so let's process it now! At the
- // moment, we just print out the event contents...
-
- Event *event = (Event *) mb->rd_ptr ();
- this->total_bytes_ += mb->length ();
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"),
- event->header_.connection_id_,
- event->header_.len_,
- this->total_bytes_));
- if (Options::instance ()->enabled (Options::VERBOSE))
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("data_ = %*s\n"),
- event->header_.len_ - 2,
- event->data_));
- mb->release ();
- return 0;
- }
- }
-}
-
-// Periodically send events via ACE_Reactor timer mechanism.
-
-int
-Peer_Handler::handle_timeout (const ACE_Time_Value &,
- const void *)
-{
- // Shut down the handler.
- return this->handle_close ();
-}
-
-Peer_Handler::~Peer_Handler (void)
-{
- // Shut down the handler.
- this->handle_close ();
-}
-
-// Handle shutdown of the Peer object.
-
-int
-Peer_Handler::handle_close (ACE_HANDLE,
- ACE_Reactor_Mask)
-{
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("shutting down Peer on handle %d\n"),
- this->get_handle ()));
-
- ACE_Reactor_Mask mask =
- ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK;
-
- // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor>
- // removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL>
- // instructs the ACE_Reactor *not* to call <handle_close>, which
- // would otherwise lead to infinite recursion!).
- ACE_Reactor::instance ()->remove_handler
- (ACE_STDIN, mask);
-
- // Deregister this handler with the ACE_Reactor.
- if (ACE_Reactor::instance ()->remove_handler
- (this, mask) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("handle = %d: %p\n"),
- this->get_handle (),
- ACE_TEXT ("remove_handler")),
- -1);
- // Close down the peer.
- this->peer ().close ();
- }
- return 0;
-}
-
-int
-Peer_Acceptor::start (u_short port)
-{
- // This object only gets allocated once and is just recycled
- // forever.
- ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1);
-
- this->addr_.set (port);
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("opening acceptor at port %d\n"),
- port));
-
- // Call down to the <Acceptor::open> method.
- if (this->inherited::open (this->addr_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("open")),
- -1);
- else if (this->acceptor ().get_local_addr (this->addr_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("get_local_addr")),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("accepting at port %d\n"),
- this->addr_.get_port_number ()));
- return 0;
-}
-
-Peer_Acceptor::Peer_Acceptor (void)
- : peer_handler_ (0)
-{
-}
-
-int
-Peer_Acceptor::close (void)
-{
- // Will trigger a delete.
- if (this->peer_handler_ != 0)
- this->peer_handler_->destroy ();
-
- // Close down the base class.
- return this->inherited::close ();
-}
-
-// Note how this method just passes back the pre-allocated
-// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new
-// one each time!
-
-int
-Peer_Acceptor::make_svc_handler (Peer_Handler *&sh)
-{
- sh = this->peer_handler_;
- return 0;
-}
-
-int
-Peer_Connector::open_connector (Peer_Handler *&peer_handler,
- u_short port)
-{
- // This object only gets allocated once and is just recycled
- // forever.
- ACE_NEW_RETURN (peer_handler,
- Peer_Handler,
- -1);
-
- ACE_INET_Addr addr (port,
- Options::instance ()->connector_host ());
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("connecting to %s:%d\n"),
- addr.get_host_name (),
- addr.get_port_number ()));
-
- if (this->connect (peer_handler, addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("connect")),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("connected to %C:%d\n"),
- addr.get_host_name (),
- addr.get_port_number ()));
- return 0;
-}
-
-int
-Peer_Connector::open (ACE_Reactor *, int)
-{
- this->supplier_peer_handler_ = 0;
- this->consumer_peer_handler_ = 0;
-
- if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)
- && this->open_connector (this->supplier_peer_handler_,
- Options::instance ()->supplier_connector_port ()) == -1)
- return -1;
-
- if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)
- && this->open_connector (this->consumer_peer_handler_,
- Options::instance ()->consumer_connector_port ()) == -1)
- return -1;
-
- return 0;
-}
-
-int
-Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *)
-{
- if (signum != SIGPIPE)
- {
- // Shut down the main event loop.
- ACE_DEBUG((LM_NOTICE, ACE_TEXT ("Exit case signal\n"))); // Why do I exit?
- ACE_Reactor::instance ()->end_reactor_event_loop();
- }
-
- return 0;
-}
-
-// Returns information on the currently active service.
-
-int
-Peer_Factory::info (ACE_TCHAR **strp, size_t length) const
-{
- ACE_TCHAR buf[BUFSIZ];
- ACE_TCHAR consumer_addr_str[BUFSIZ];
- ACE_TCHAR supplier_addr_str[BUFSIZ];
-
- ACE_INET_Addr addr;
-
- if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1)
- return -1;
- else if (addr.addr_to_string (consumer_addr_str,
- sizeof addr) == -1)
- return -1;
- else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1)
- return -1;
- else if (addr.addr_to_string (supplier_addr_str,
- sizeof addr) == -1)
- return -1;
-
- ACE_OS::strcpy (buf, ACE_TEXT ("peerd\t C:"));
- ACE_OS::strcat (buf, consumer_addr_str);
- ACE_OS::strcat (buf, ACE_TEXT ("|S:"));
- ACE_OS::strcat (buf, supplier_addr_str);
- ACE_OS::strcat
- (buf, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n"));
-
- if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
- return -1;
- else
- ACE_OS::strncpy (*strp, buf, length);
- return ACE_OS::strlen (buf);
-}
-
-// Hook called by the explicit dynamic linking facility to terminate
-// the peer.
-
-int
-Peer_Factory::fini (void)
-{
- this->consumer_acceptor_.close ();
- this->supplier_acceptor_.close ();
- return 0;
-}
-
-// Hook called by the explicit dynamic linking facility to initialize
-// the peer.
-
-int
-Peer_Factory::init (int argc, ACE_TCHAR *argv[])
-{
- Options::instance ()->parse_args (argc, argv);
-
- ACE_Sig_Set sig_set;
-
- sig_set.sig_add (SIGINT);
- sig_set.sig_add (SIGQUIT);
- sig_set.sig_add (SIGPIPE);
-
- // Register ourselves to receive signals so we can shut down
- // gracefully.
-
- if (ACE_Reactor::instance ()->register_handler (sig_set,
- this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("register_handler")),
- -1);
-
- if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)
- && this->supplier_acceptor_.start
- (Options::instance ()->supplier_acceptor_port ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Acceptor::open")),
- -1);
- else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
- && this->consumer_acceptor_.start
- (Options::instance ()->consumer_acceptor_port ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Acceptor::open")),
- -1);
- else if (this->connector_.open () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Connector::open")),
- -1);
- return 0;
-}
-
-// The following is a "Factory" used by the <ACE_Service_Config> and
-// svc.conf file to dynamically initialize the <Peer_Acceptor> and
-// <Peer_Connector>.
-
-ACE_SVC_FACTORY_DEFINE (Peer_Factory)
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;
-template class ACE_Connector_Base<Peer_Handler>;
-template class ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>;
-template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;
-template class ACE_NonBlocking_Connect_Handler<Peer_Handler>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
-#pragma instantiate ACE_Connector_Base<Peer_Handler>
-#pragma instantiate ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>
-#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
-#pragma instantiate ACE_NonBlocking_Connect_Handler<Peer_Handler>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */