diff options
Diffstat (limited to 'apps/Gateway/Peer')
-rw-r--r-- | apps/Gateway/Peer/Event.h | 125 | ||||
-rw-r--r-- | apps/Gateway/Peer/Gateway_Handler.cpp | 652 | ||||
-rw-r--r-- | apps/Gateway/Peer/Gateway_Handler.h | 154 | ||||
-rw-r--r-- | apps/Gateway/Peer/Peer_Message.h | 44 |
4 files changed, 0 insertions, 975 deletions
diff --git a/apps/Gateway/Peer/Event.h b/apps/Gateway/Peer/Event.h deleted file mode 100644 index 5e288edf910..00000000000 --- a/apps/Gateway/Peer/Event.h +++ /dev/null @@ -1,125 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// apps -// -// = FILENAME -// Event.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (EVENT) -#define EVENT - -#include "ace/OS.h" - -// This is the unique connection identifier that denotes a particular -// Proxy_Handler in the Gateway. -typedef ACE_INT32 ACE_INT32; - -class Event_Key - // = TITLE - // Address used to identify the source/destination of an event. - // - // = DESCRIPTION - // This is really a "virtual forwarding address" thatis used to - // decouple the filtering and forwarding logic of the Event - // Channel from the format of the data. -{ -public: - Event_Key (ACE_INT32 cid = -1, - u_char sid = 0, - u_char type = 0) - : conn_id_ (cid), - supplier_id_ (sid), - type_ (type) {} - - int operator== (const Event_Key &event_addr) const - { - return this->conn_id_ == event_addr.conn_id_ - && this->supplier_id_ == event_addr.supplier_id_ - && this->type_ == event_addr.type_; - } - - ACE_INT32 conn_id_; - // Unique connection identifier that denotes a particular - // Proxy_Handler. - - ACE_INT32 supplier_id_; - // Logical ID. - - ACE_INT32 type_; - // Event type. -}; - -class Event_Header - // = TITLE - // Fixed sized header. - // - // = DESCRIPTION - // This is designed to have a sizeof (16) to avoid alignment - // problems on most platforms. -{ -public: - typedef ACE_INT32 SUPPLIER_ID; - // Type used to forward events from gatewayd. - - enum - { - INVALID_ID = -1 // No peer can validly use this number. - }; - - void decode (void) - { - this->len_ = ntohl (this->len_); - this->supplier_id_ = ntohl (this->supplier_id_); - this->type_ = ntohl (this->type_); - this->priority_ = ntohl (this->priority_); - } - // Decode from network byte order to host byte order. - - void encode (void) - { - this->len_ = htonl (this->len_); - this->supplier_id_ = htonl (this->supplier_id_); - this->type_ = htonl (this->type_); - this->priority_ = htonl (this->priority_); - } - // Encode from host byte order to network byte order. - - size_t len_; - // Length of the data_ payload, in bytes. - - SUPPLIER_ID supplier_id_; - // Source ID. - - ACE_INT32 type_; - // Event type. - - ACE_INT32 priority_; - // Event priority. -}; - -class Event - // = TITLE - // Variable-sized event (data_ may be variable-sized between - // 0 and MAX_PAYLOAD_SIZE). -{ -public: - enum { MAX_PAYLOAD_SIZE = 1024 }; - // The maximum size of an Event. - - Event_Header header_; - // Event header. - - char data_[MAX_PAYLOAD_SIZE]; - // Event data. -}; - -#endif /* EVENT */ diff --git a/apps/Gateway/Peer/Gateway_Handler.cpp b/apps/Gateway/Peer/Gateway_Handler.cpp deleted file mode 100644 index cfc9a7dad6f..00000000000 --- a/apps/Gateway/Peer/Gateway_Handler.cpp +++ /dev/null @@ -1,652 +0,0 @@ -#include "ace/Get_Opt.h" -// $Id$ - - -#include "Gateway_Handler.h" - -Gateway_Handler::Gateway_Handler (ACE_Thread_Manager *) - : routing_id_ (0), - msg_frag_ (0), - total_bytes_ (0) -{ - this->msg_queue ()->high_water_mark (Gateway_Handler::QUEUE_SIZE); -} - -int -Gateway_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum)); - - // Shut down the main event loop. - ACE_Service_Config::end_reactor_event_loop (); - return 0; -} - -// Cache a binding to the HANDLER_MAP. - -void -Gateway_Handler::map (HANDLER_MAP *m) -{ - this->map_ = m; -} - -// Upcall from the ACE_Acceptor::handle_input() that turns control -// over to our application-specific Gateway handler. - -int -Gateway_Handler::open (void *a) -{ - ACE_DEBUG ((LM_DEBUG, "Gateway handler's fd = %d\n", - this->peer ().get_handle ())); - - // Call down to the base class to activate and register this - // handler. - if (this->inherited::open (a) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - - if (this->peer ().enable (ACE_NONBLOCK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enable"), -1); - - Gateway_Handler *this_ = this; - - // Add ourselves to the map so we can be removed later on. - if (this->map_->bind (this->get_handle (), this_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind"), -1); - - char *to = ACE_OS::getenv ("TIMEOUT"); - int timeout = to == 0 ? 100000 : ACE_OS::atoi (to); - - // Schedule the time between disconnects. This should really be a - // "tunable" parameter. - if (ACE_Service_Config::reactor ()->schedule_timer (this, 0, timeout) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "schedule_timer")); - - // If there are messages left in the queue, make sure we - // enable the ACE_Reactor appropriately to get them sent out. - if (this->msg_queue ()->is_empty () == 0 - && ACE_Service_Config::reactor ()->schedule_wakeup (this, - ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_wakeup"), -1); - - // First action is to wait to be notified of our routing id. - this->do_action_ = &Gateway_Handler::await_route_id; - return 0; -} - -// Read messages from stdin and send them to the gatewayd. - -int -Gateway_Handler::xmit_stdin (void) -{ - if (this->routing_id_ != -1) - { - ssize_t n; - ACE_Message_Block *mb; - - ACE_NEW_RETURN (mb, - ACE_Message_Block (sizeof (Event)), - -1); - - Event *peer_msg = (Event *) mb->rd_ptr (); - peer_msg->header_.routing_id_ = this->routing_id_; - - n = ACE_OS::read (ACE_STDIN, peer_msg->buf_, sizeof peer_msg->buf_); - - switch (n) - { - case 0: - ACE_DEBUG ((LM_DEBUG, "stdin closing down\n")); - - // Take stdin out of the ACE_Reactor so we stop trying to - // send messages. - if (ACE_Service_Config::reactor ()->remove_handler - (0, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "remove_handler")); - delete mb; - break; - case -1: - delete mb; - ACE_ERROR ((LM_ERROR, "%p\n", "read")); - break; - default: - peer_msg->header_.len_ = htonl (n); - mb->wr_ptr (sizeof (Peer_Header) + n); - - if (this->put (mb) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "%p\n", - "gateway is flow controlled, so we're dropping messages")); - else - ACE_ERROR ((LM_ERROR, "%p\n", "transmission failure in xmit_stdin")); - - // Caller is responsible for freeing a ACE_Message_Block - // if failures occur. - delete mb; - } - } - } - return 0; -} - -// Perform a non-blocking put() of message MB. If we are unable to -// send the entire message the remainder is re-Taskd at the *front* of -// the Message_List. - -int -Gateway_Handler::nonblk_put (ACE_Message_Block *mb) -{ - // Try to send the message. If we don't send it all (e.g., due to - // flow control), then re-ACE_Task the remainder at the head of the - // Message_List and ask the ACE_Reactor to inform us (via - // handle_output()) when it is possible to try again. - - ssize_t n; - - if ((n = this->send_peer (mb)) == -1) - return -1; - else if (errno == EWOULDBLOCK) // Didn't manage to send everything. - { - ACE_DEBUG ((LM_DEBUG, - "queueing activated on handle %d to routing id %d\n", - this->get_handle (), this->routing_id_)); - - // ACE_Queue in *front* of the list to preserve order. - if (this->msg_queue ()->enqueue_head - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enqueue_head"), -1); - - // Tell ACE_Reactor to call us back when we can send again. - if (ACE_Service_Config::reactor ()->schedule_wakeup - (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_wakeup"), -1); - return 0; - } - else - return n; -} - -// Finish sending a message when flow control conditions abate. This -// method is automatically called by the ACE_Reactor. - -int -Gateway_Handler::handle_output (ACE_HANDLE) -{ - ACE_Message_Block *mb = 0; - - ACE_DEBUG ((LM_DEBUG, "in handle_output\n")); - // The list had better not be empty, otherwise there's a bug! - - if (this->msg_queue ()->dequeue_head - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) - { - switch (this->nonblk_put (mb)) - { - case 0: // Partial send. - ACE_ASSERT (errno == EWOULDBLOCK); - // Didn't write everything this time, come back later... - break; - - case -1: - // Caller is responsible for freeing a ACE_Message_Block if - // failures occur. - delete mb; - ACE_ERROR ((LM_ERROR, "%p\n", - "transmission failure in handle_output")); - - /* FALLTHROUGH */ - default: // Sent the whole thing. - - // If we succeed in writing the entire message (or we did - // not fail due to EWOULDBLOCK) then check if there are more - // messages on the Message_List. If there aren't, tell the - // ACE_Reactor not to notify us anymore (at least until - // there are new messages queued up). - - if (this->msg_queue ()->is_empty ()) - { - ACE_DEBUG ((LM_DEBUG, - "queue now empty on handle %d to routing id %d\n", - this->get_handle (), - this->routing_id_)); - - if (ACE_Service_Config::reactor ()->cancel_wakeup - (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "cancel_wakeup")); - } - } - } - else - ACE_ERROR ((LM_ERROR, "%p\n", "dequeue_head")); - return 0; -} - -// Send a message to a peer (may ACE_Task if necessary). - -int -Gateway_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) -{ - if (this->msg_queue ()->is_empty ()) - // Try to send the message *without* blocking! - return this->nonblk_put (mb); - else - // If we have queued up messages due to flow control then just - // enqueue and return. - return this->msg_queue ()->enqueue_tail - (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); -} - -// Send an Peer message to gatewayd. - -int -Gateway_Handler::send_peer (ACE_Message_Block *mb) -{ - ssize_t n; - size_t len = mb->length (); - - if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0) - return errno == EWOULDBLOCK ? 0 : n; - else if (n < (ssize_t) len) - { - // Re-adjust pointer to skip over the part we did send. - mb->rd_ptr (n); - this->total_bytes_ += n; - } - else /* if (n == length) */ - { - // The whole message is sent, we can now safely deallocate the - // buffer. Note that this should decrement a reference count... - this->total_bytes_ += n; - delete mb; - errno = 0; - } - ACE_DEBUG ((LM_DEBUG, "sent %d bytes, total bytes sent = %d\n", - n, this->total_bytes_)); - return n; -} - -// Receive an Peer message from gatewayd. Handles fragmentation. - -int -Gateway_Handler::recv_peer (ACE_Message_Block *&mb) -{ - Event *peer_msg; - size_t len; - ssize_t n; - size_t offset = 0; - - if (this->msg_frag_ == 0) - { - ACE_NEW_RETURN (this->msg_frag_, - ACE_Message_Block (sizeof (Event)), - -1); - - // No existing fragment... - if (this->msg_frag_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, "out of memory\n"), -1); - - peer_msg = (Event *) this->msg_frag_->rd_ptr (); - - switch (n = this->peer ().recv (peer_msg, sizeof (Peer_Header))) - { - case sizeof (Peer_Header): - len = ntohl (peer_msg->header_.len_); - if (len <= sizeof peer_msg->buf_) - { - this->msg_frag_->wr_ptr (sizeof (Peer_Header)); - break; // The message is within the maximum size range. - } - else - ACE_ERROR ((LM_ERROR, "message too long = %d\n", len)); - /* FALLTHROUGH */ - default: - ACE_ERROR ((LM_ERROR, "invalid length = %d\n", n)); - n = -1; - /* FALLTHROUGH */ - case -1: - /* FALLTHROUGH */ - case 0: - // Make sure to free up memory on error returns. - delete this->msg_frag_; - this->msg_frag_ = 0; - return n; - } - } - else - { - offset = this->msg_frag_->length () - sizeof (Peer_Header); - len = peer_msg->header_.len_ - offset; - } - - switch (n = this->peer ().recv (peer_msg->buf_ + offset, len)) - { - case -1: - if (errno == EWOULDBLOCK) - { - // This shouldn't happen since the ACE_Reactor - // just triggered us to handle pending I/O! - ACE_DEBUG ((LM_DEBUG, "unexpected recv failure\n")); - // Since ACE_DEBUG might change errno, we need to reset it - // here. - errno = EWOULDBLOCK; - return -1; - } - else - /* FALLTHROUGH */; - - case 0: // EOF. - delete this->msg_frag_; - this->msg_frag_ = 0; - return n; - - default: - if (n != (ssize_t) len) - // Re-adjust pointer to skip over the part we've read. - { - this->msg_frag_->wr_ptr (n); - errno = EWOULDBLOCK; - // Inform caller that we didn't get the whole message. - return -1; - } - else - { - // Set the write pointer at 1 past the end of the message. - this->msg_frag_->wr_ptr (n); - - // Set the read pointer to the beginning of the message. - this->msg_frag_->rd_ptr (this->msg_frag_->base ()); - - mb = this->msg_frag_; - - // Reset the pointer to indicate we've got an entire - // message. - this->msg_frag_ = 0; - } - return n; - } -} - -// Receive various types of input (e.g., Peer message from the -// gatewayd, as well as stdio). - -int -Gateway_Handler::handle_input (ACE_HANDLE sd) -{ - ACE_DEBUG ((LM_DEBUG, "in handle_input, sd = %d\n", sd)); - if (sd == ACE_STDIN) // Handle message from stdin. - return this->xmit_stdin (); - else - // Perform the appropriate action depending on the state we are - // in. - return (this->*do_action_) (); -} - -// Action that receives the route id. - -int -Gateway_Handler::await_route_id (void) -{ - ssize_t n = this->peer ().recv (&this->routing_id_, - sizeof this->routing_id_); - - if (n != sizeof this->routing_id_) - { - if (n == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "gatewayd has closed down unexpectedly\n"), -1); - else - ACE_ERROR_RETURN ((LM_ERROR, - "%p, bytes received on handle %d = %d\n", - "recv", this->get_handle (), n), -1); - } - else - ACE_DEBUG ((LM_DEBUG, "assigned routing id %d\n", - this->routing_id_)); - - // Transition to the action that waits for Peer messages. - this->do_action_ = &Gateway_Handler::await_messages; - - // Reset standard input. - ACE_OS::rewind (stdin); - - // Register this handler to receive test messages on stdin. - if (ACE_Service_Config::reactor ()->register_handler - (ACE_STDIN, this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); - return 0; -} - -// Action that receives messages. - -int -Gateway_Handler::await_messages (void) -{ - ACE_Message_Block *mb = 0; - ssize_t n = this->recv_peer (mb); - - switch (n) - { - case 0: - ACE_ERROR_RETURN ((LM_ERROR, "gatewayd has closed down\n"), -1); - /* NOTREACHED */ - case -1: - if (errno == EWOULDBLOCK) - // A short-read, we'll come back and finish it up later on! - return 0; - else - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "recv_peer"), -1); - /* NOTREACHED */ - default: - { - // We got a valid message, so let's process it now! At the - // moment, we just print out the message contents... - - Event *peer_msg = (Event *) mb->rd_ptr (); - this->total_bytes_ += mb->length (); - -#if defined (VERBOSE) - ACE_DEBUG ((LM_DEBUG, - "route id = %d, len = %d, payload = %*s", - peer_msg->header_.routing_id_, peer_msg->header_.len_, - peer_msg->header_.len_, peer_msg->buf_)); -#else - ACE_DEBUG ((LM_DEBUG, - "route id = %d, cur len = %d, total len = %d\n", - peer_msg->header_.routing_id_, - peer_msg->header_.len_, - this->total_bytes_)); -#endif - delete mb; - return 0; - } - } -} - -// Periodically send messages via ACE_Reactor timer mechanism. - -int -Gateway_Handler::handle_timeout (const ACE_Time_Value &, const void *) -{ - // Skip over deactivated descriptors. - if (this->get_handle () != -1) - { - // Unbind ourselves from the map. - if (this->map_->unbind (this->get_handle ()) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "unbind")); - - // Shut down the handler. - this->handle_close (); - } - return 0; -} - -// Handle shutdown of the Gateway_Handler object. - -int -Gateway_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - if (this->get_handle () != ACE_INVALID_HANDLE) - { - ACE_DEBUG ((LM_DEBUG, "shutting down Gateway_Handler on handle %d\n", - this->get_handle ())); - - // Explicitly remove ourselves for handle 0 (the ACE_Reactor - // removes this->handle (), note that - // ACE_Event_Handler::DONT_CALL instructs the ACE_Reactor *not* - // to call this->handle_close(), which would otherwise lead to - // recursion!). - if (ACE_Service_Config::reactor ()->remove_handler - (0, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR ((LM_ERROR, "handle = %d: %p\n", - 0, "remove_handler")); - - // Deregister this handler with the ACE_Reactor. - if (ACE_Service_Config::reactor ()->remove_handler - (this, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::RWE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "handle = %d: %p\n", - this->get_handle (), "remove_handler"), -1); - - // Close down the peer. - this->peer ().close (); - } - return 0; -} - -Gateway_Acceptor::Gateway_Acceptor (Gateway_Handler *handler) - : gateway_handler_ (handler) -{ - this->gateway_handler_->map (&this->map_); -} - -// Note how this method just passes back the pre-allocated -// Gateway_Handler instead of having the ACE_Acceptor allocate a new -// one each time! - -Gateway_Handler * -Gateway_Acceptor::make_svc_handler (void) -{ - return this->gateway_handler_; -} - -int -Gateway_Acceptor::handle_signal (int signum, siginfo_t *, ucontext_t *) -{ - ACE_DEBUG ((LM_DEBUG, "signal %S occurred\n", signum)); - return 0; -} - -/* Returns information on the currently active service. */ - -int -Gateway_Acceptor::info (char **strp, size_t length) const -{ - char buf[BUFSIZ]; - char addr_str[BUFSIZ]; - - ACE_INET_Addr addr; - - if (this->acceptor ().get_local_addr (addr) == -1) - return -1; - else if (addr.addr_to_string (addr_str, sizeof addr) == -1) - return -1; - - ACE_OS::sprintf (buf, "%s\t %s/%s %s", - "Gateway peer daemon", addr_str, "tcp", - "# IRIDIUM SRP traffic generator and data sink\n"); - - if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0) - return -1; - else - ACE_OS::strncpy (*strp, buf, length); - return ACE_OS::strlen (buf); -} - -// Hook called by the explicit dynamic linking facility to terminate -// the peer. - -int -Gateway_Acceptor::fini (void) -{ - HANDLER_ITERATOR mi (this->map_); - - for (MAP_ENTRY *me = 0; - mi.next (me) != 0; - mi.advance ()) - { - if (me->int_id_->get_handle () != -1) - { - ACE_DEBUG ((LM_DEBUG, "closing down handle %d\n", - me->int_id_->get_handle ())); - me->int_id_->handle_close (); - } - else - ACE_DEBUG ((LM_DEBUG, "already closed %d\n")); - me->int_id_->destroy (); // Will trigger a delete. - } - - this->gateway_handler_->destroy (); // Will trigger a delete. - return inherited::fini (); -} - -// Hook called by the explicit dynamic linking facility to initialize -// the peer. - -int -Gateway_Acceptor::init (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "dp:", 0); - ACE_INET_Addr addr; - - for (int c; (c = get_opt ()) != -1; ) - { - switch (c) - { - case 'p': - addr.set (ACE_OS::atoi (get_opt.optarg)); - break; - case 'd': - break; - default: - break; - } - } - - if (ACE_Service_Config::reactor ()->register_handler (SIGPIPE, this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); - - if (this->open (addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - else if (ACE_Service_Config::reactor ()->register_handler - (this, ACE_Event_Handler::READ_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "registering service with ACE_Reactor\n"), -1); - - ACE_Sig_Set sig_set; - sig_set.sig_add (SIGINT); - sig_set.sig_add (SIGQUIT); - - // Register ourselves to receive SIGINT and SIGQUIT so we can shut - // down gracefully via signals. - - if (ACE_Service_Config::reactor ()->register_handler (sig_set, - this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - return 0; -} - -// Dynamically linked factory function that dynamically allocates a -// new Gateway_Acceptor object. - -ACE_Service_Object * -_alloc_peerd (void) -{ - // This function illustrates how we can associate a ACE_Svc_Handler - // with the ACE_Acceptor at initialization time. - Gateway_Handler *handler; - - ACE_NEW_RETURN (handler, Gateway_Handler, 0); - ACE_Service_Object *temp; - - ACE_NEW_RETURN (temp, Gateway_Acceptor (handler), 0); - return temp; -} diff --git a/apps/Gateway/Peer/Gateway_Handler.h b/apps/Gateway/Peer/Gateway_Handler.h deleted file mode 100644 index 6dc4539e6b7..00000000000 --- a/apps/Gateway/Peer/Gateway_Handler.h +++ /dev/null @@ -1,154 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - - -/* These Gateway handler classes process Peer messages sent from the - communication gateway daemon (gatewayd) to its various peers, e.g., - CF and ETS, (represented collectively in this prototype as peerd). - . These classes works as follows: - - 1. Gateway_Acceptor creates a listener endpoint and waits passively - for gatewayd to connect with it. - - 2. When gatewayd connects, Gateway_Acceptor creates an - Gateway_Handler object that sends/receives messages from - gatewayd. - - 3. Gateway_Handler waits for gatewayd to inform it of its routing - ID, which is prepended to all outgoing messages send from peerd. - - 4. Once the routing ID is set, peerd periodically sends messages to - gatewayd. Peerd also receives and "processes" messages - forwarded to it from gatewayd. In this program, peerd - "processes" messages by writing them to stdout. */ - -#if !defined (GATEWAY_HANDLER) -#define GATEWAY_HANDLER - -#include "ace/Service_Config.h" -#include "ace/Svc_Handler.h" -#include "ace/Acceptor.h" -#include "ace/SOCK_Stream.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/INET_Addr.h" -#include "ace/Map_Manager.h" -#include "Peer_Message.h" - -// Forward declaration. -class Gateway_Handler; - -// Maps a ACE_HANDLE onto a Gateway_Handler *. -typedef ACE_Map_Manager <ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_MAP; -typedef ACE_Map_Iterator<ACE_HANDLE, Gateway_Handler *, ACE_Null_Mutex> HANDLER_ITERATOR; -typedef ACE_Map_Entry <ACE_HANDLE, Gateway_Handler *> MAP_ENTRY; - -// Handle Peer messages arriving as events. - -class Gateway_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> -{ -public: - Gateway_Handler (ACE_Thread_Manager * = 0); - - virtual int open (void * = 0); - // Initialize the handler (called by ACE_Acceptor::handle_input()) - - virtual int handle_input (ACE_HANDLE); - // Receive and process peer messages. - - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); - // Send a message to a gateway (may be queued if necessary). - - virtual int handle_output (ACE_HANDLE); - // Finish sending a message when flow control conditions abate. - - virtual int handle_timeout (const ACE_Time_Value &, - const void *arg); - // Periodically send messages via ACE_Reactor timer mechanism. - - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); - // Perform object termination. - - void map (HANDLER_MAP *); - // Cache a binding to the HANDLER_MAP. - -protected: - typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited; - - // We'll allow up to 16 megabytes to be queued per-output - // channel!!!! This is clearly a policy in search of refinement... - enum { QUEUE_SIZE = 1024 * 1024 * 16 }; - - int handle_signal (int signum, siginfo_t *, ucontext_t *); - - Peer_Header::ROUTING_ID routing_id_; - // Routing ID of the peer (obtained from gatewayd). - - virtual int nonblk_put (ACE_Message_Block *mb); - // Perform a non-blocking put(). - - virtual int recv_peer (ACE_Message_Block *&); - // Receive an Peer message from gatewayd. - - virtual int send_peer (ACE_Message_Block *); - // Send an Peer message to gatewayd. - - int xmit_stdin (void); - // Receive a message from stdin and send it to the gateway. - - int (Gateway_Handler::*do_action_) (void); - // Pointer-to-member-function for the current action to run in this state. - - int await_route_id (void); - // Action that receives the route id. - - int await_messages (void); - // Action that receives messages. - - ACE_Message_Block *msg_frag_; - // Keep track of message fragment to handle non-blocking recv's from gateway. - - size_t total_bytes_; - // The total number of bytes sent/received to the gateway. - - HANDLER_MAP *map_; - // Maps the ACE_HANDLE onto the Gateway_Handler *. -}; - -// A factory class that accept connections from gatewayd and -// dynamically creates a new Gateway_Handler object to do the dirty work. - -class Gateway_Acceptor : public ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR> -{ -public: - // = Initialization methods, called when dynamically linked. - Gateway_Acceptor (Gateway_Handler *handler); - virtual int init (int argc, char *argv[]); - // Initialize the acceptor. - - virtual int info (char **, size_t) const; - // Return info about this service. - - virtual int fini (void); - // Perform termination. - - virtual Gateway_Handler *make_svc_handler (void); - // Factory method that creates the Gateway_Handler once. - - virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); - // Handle various signals (e.g., SIGPIPE) - - HANDLER_MAP map_; - // Maps the ACE_HANDLE onto the Gateway_Handler *. - - Gateway_Handler *gateway_handler_; - // Pointer to memory allocated exactly once. - - typedef ACE_Acceptor<Gateway_Handler, ACE_SOCK_ACCEPTOR> inherited; -}; - -// Factory function that allocates a new Peer daemon. -extern "C" ACE_Service_Object *_alloc_peerd (void); - -#endif /* GATEWAY_HANDLER */ - diff --git a/apps/Gateway/Peer/Peer_Message.h b/apps/Gateway/Peer/Peer_Message.h deleted file mode 100644 index 67f57f148cb..00000000000 --- a/apps/Gateway/Peer/Peer_Message.h +++ /dev/null @@ -1,44 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// Define the Peer message schema (this may change). - -#if !defined (PEER_MESSAGE) -#define PEER_MESSAGE - -// Fixed sized header. - -class Peer_Header -{ -public: -// Type used to route messages from gatewayd. - typedef short ROUTING_ID; - - enum - { - INVALID_ID = -1 // No peer may use this number. - }; - - // Source ID. - ROUTING_ID routing_id_; - - // Length of the message in bytes. - size_t len_; -}; - -// Variable-sized message (buf_ may be variable-sized between -// 0 and MAX_PAYLOAD_SIZE). - -class Peer_Message -{ -public: - // The maximum size of an Peer message (see Peer protocol specs for exact #). - enum { MAX_PAYLOAD_SIZE = 1024 }; - - Peer_Header header_; - - // Message payload - char buf_[MAX_PAYLOAD_SIZE]; -}; - -#endif /* PEER_MESSAGE */ |