diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-29 02:38:49 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-29 02:38:49 +0000 |
commit | 2e37bbe9d6c92e2fc862de0f88735ed9eb8c17ae (patch) | |
tree | f27c16bdb12793db00cb39ef36940c476484701f /apps | |
parent | d0878b2722b89f0053cd26d7429dd279217f5fd1 (diff) | |
download | ATCD-2e37bbe9d6c92e2fc862de0f88735ed9eb8c17ae.tar.gz |
foo
Diffstat (limited to 'apps')
-rw-r--r-- | apps/Gateway/Gateway/consumer_config | 30 | ||||
-rw-r--r-- | apps/Gateway/Gateway/proxy_config | 17 | ||||
-rw-r--r-- | apps/Gateway/Peer/Peer.cpp | 135 |
3 files changed, 84 insertions, 98 deletions
diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config index fa3e63a0b26..d8a24001095 100644 --- a/apps/Gateway/Gateway/consumer_config +++ b/apps/Gateway/Gateway/consumer_config @@ -1,7 +1,23 @@ -# Consumer configuration file -# Conn ID Supplier ID Type Consumers -# ------- ----------- ------- ------------ - 1 1 0 3,4 - 2 2 0 3 -# 3 3 0 4 -# 4 4 0 5 +# Configuration file for specifying which Consumers will receive +# messages from which Suppliers. +# +# Here's an explanation of the fields in this file, and how they +# relate to fields in the "proxy_config" file. +# +# 1. Proxy ID +# The conn +# +# 2. Supplier ID +# +# +# 3. Type +# +# +# 4. Consumers +# +# Proxy ID Supplier ID Type Consumers +# -------- ----------- ------- ------------ + 1 1 0 2 +# 2 2 0 3,4 +# 3 3 0 4 +# 4 4 0 5 diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config index 2f26c2c430b..66b57770a54 100644 --- a/apps/Gateway/Gateway/proxy_config +++ b/apps/Gateway/Gateway/proxy_config @@ -1,11 +1,16 @@ -# Connection configuration file. -# Conn Host Remote Proxy Max Retry Local Priority +# Configuration file for specifying connection information about +# proxies. +# +# Here's an explanation of the fields in this file, and how they +# relate to fields in the "consumer_config" file. +# +# Proxy Host Remote Proxy Max Retry Local Priority # ID Port Role Timeout Port # ---- -------- ------ ------ ---------- ----- -------- - 1 merengue.cs 10002 S 32 0 1 - 2 flamenco.cs 10002 S 32 0 1 - 3 mambo.cs 10002 C 32 0 1 - 4 lambada.cs 10002 C 32 0 1 + 1 tango.cs 10003 S 32 0 1 + 2 tango.cs 10002 C 32 0 1 +# 3 mambo.cs 10002 C 32 0 1 +# 4 lambada.cs 10002 C 32 0 1 # 5 lambada.cs 10002 C 32 0 1 # 6 tango.cs 10002 C 32 0 1 # 7 tango.cs 5001 S 32 0 1 diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp index 2d366ef600b..a2fab817b37 100644 --- a/apps/Gateway/Peer/Peer.cpp +++ b/apps/Gateway/Peer/Peer.cpp @@ -7,11 +7,11 @@ // 1. Gateway_Acceptor creates a listener endpoint and waits passively // for gatewayd to connect with it. // -// 2. When gatewayd connects, Gateway_Acceptor creates an -// Peer object that sends/receives events from +// 2. When gatewayd connects, Peer_Acceptor creates an +// Peer_Handler object that sends/receives events from // gatewayd. // -// 3. The Peer waits for gatewayd to inform it of its supplier +// 3. The Peer_Handler waits for gatewayd to inform it of its supplier // ID, which is prepended to all outgoing events sent from peerd. // // 4. Once the supplier ID is set, peerd periodically sends events to @@ -26,24 +26,19 @@ #include "ace/SOCK_Stream.h" #include "ace/SOCK_Acceptor.h" #include "ace/INET_Addr.h" -#include "ace/Map_Manager.h" #include "Event.h" -// Forward declaration. -class Peer; - -// Maps a ACE_HANDLE onto a Peer *. -typedef ACE_Map_Manager <ACE_HANDLE, Peer *, ACE_Null_Mutex> HANDLER_MAP; -typedef ACE_Map_Iterator<ACE_HANDLE, Peer *, ACE_Null_Mutex> HANDLER_ITERATOR; -typedef ACE_Map_Entry <ACE_HANDLE, Peer *> MAP_ENTRY; - // Handle Peer events arriving as events. -class Peer : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> { public: - Peer (HANDLER_MAP * = 0); - // Initialize the peer and cache a reference to the HANDLER_MAP. + // = Initialization and termination methods. + Peer_Handler (void); + // Initialize the peer. + + ~Peer_Handler (void); + // Shutdown the Peer. virtual int open (void * = 0); // Initialize the handler (called by ACE_Acceptor::handle_input()) @@ -87,7 +82,7 @@ protected: int xmit_stdin (void); // Receive a event from stdin and send it to the gateway. - int (Peer::*do_action_) (void); + int (Peer_Handler::*do_action_) (void); // Pointer-to-member-function for the current action to run in this state. int await_supplier_id (void); @@ -101,27 +96,23 @@ protected: size_t total_bytes_; // The total number of bytes sent/received to the gateway. - - HANDLER_MAP *map_; - // Maps the ACE_HANDLE onto the Peer *. }; -Peer::Peer (HANDLER_MAP *map) +Peer_Handler::Peer_Handler (void) : connection_id_ (0), msg_frag_ (0), - total_bytes_ (0), - map_ (map) + total_bytes_ (0) { - this->msg_queue ()->high_water_mark (Peer::MAX_QUEUE_SIZE); + this->msg_queue ()->high_water_mark (Peer_Handler::MAX_QUEUE_SIZE); } // Upcall from the ACE_Acceptor::handle_input() that turns control // over to our application-specific Gateway handler. int -Peer::open (void *a) +Peer_Handler::open (void *a) { - ACE_DEBUG ((LM_DEBUG, "Gateway handler's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, "Gateway handler's handle = %d\n", this->peer ().get_handle ())); // Call down to the base class to activate and register this @@ -132,10 +123,6 @@ Peer::open (void *a) if (this->peer ().enable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "enable"), -1); - // Add ourselves to the map so we can be removed later on. - if (this->map_->bind (this->get_handle (), this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind"), -1); - char *to = ACE_OS::getenv ("TIMEOUT"); int timeout = to == 0 ? 100000 : ACE_OS::atoi (to); @@ -152,14 +139,14 @@ Peer::open (void *a) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_wakeup"), -1); // First action is to wait to be notified of our supplier id. - this->do_action_ = &Peer::await_supplier_id; + this->do_action_ = &Peer_Handler::await_supplier_id; return 0; } // Read events from stdin and send them to the gatewayd. int -Peer::xmit_stdin (void) +Peer_Handler::xmit_stdin (void) { if (this->connection_id_ != -1) { @@ -224,7 +211,7 @@ Peer::xmit_stdin (void) // Message_Queue. int -Peer::nonblk_put (ACE_Message_Block *mb) +Peer_Handler::nonblk_put (ACE_Message_Block *mb) { // Try to send the event. If we don't send it all (e.g., due to // flow control), then re-ACE_Task the remainder at the head of the @@ -260,7 +247,7 @@ Peer::nonblk_put (ACE_Message_Block *mb) // method is automatically called by the ACE_Reactor. int -Peer::handle_output (ACE_HANDLE) +Peer_Handler::handle_output (ACE_HANDLE) { ACE_Message_Block *mb = 0; @@ -314,7 +301,7 @@ Peer::handle_output (ACE_HANDLE) // Send an event to a peer (may block if necessary). int -Peer::put (ACE_Message_Block *mb, ACE_Time_Value *) +Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) { if (this->msg_queue ()->is_empty ()) // Try to send the event *without* blocking! @@ -329,7 +316,7 @@ Peer::put (ACE_Message_Block *mb, ACE_Time_Value *) // Send an Peer event to gatewayd. int -Peer::send (ACE_Message_Block *mb) +Peer_Handler::send (ACE_Message_Block *mb) { ssize_t n; size_t len = mb->length (); @@ -358,7 +345,7 @@ Peer::send (ACE_Message_Block *mb) // Receive an Event from gatewayd. Handles fragmentation. int -Peer::recv (ACE_Message_Block *&mb) +Peer_Handler::recv (ACE_Message_Block *&mb) { if (this->msg_frag_ == 0) // No existing fragment... @@ -486,7 +473,7 @@ Peer::recv (ACE_Message_Block *&mb) // gatewayd, as well as stdio). int -Peer::handle_input (ACE_HANDLE sd) +Peer_Handler::handle_input (ACE_HANDLE sd) { ACE_DEBUG ((LM_DEBUG, "in handle_input, sd = %d\n", sd)); if (sd == ACE_STDIN) // Handle event from stdin. @@ -500,7 +487,7 @@ Peer::handle_input (ACE_HANDLE sd) // Action that receives our supplier id from the Gateway. int -Peer::await_supplier_id (void) +Peer_Handler::await_supplier_id (void) { ssize_t n = this->peer ().recv (&this->connection_id_, sizeof this->connection_id_); @@ -523,7 +510,7 @@ Peer::await_supplier_id (void) } // Transition to the action that waits for Peer events. - this->do_action_ = &Peer::await_events; + this->do_action_ = &Peer_Handler::await_events; // Reset standard input. ACE_OS::rewind (stdin); @@ -538,7 +525,7 @@ Peer::await_supplier_id (void) // Action that receives events. int -Peer::await_events (void) +Peer_Handler::await_events (void) { ACE_Message_Block *mb = 0; ssize_t n = this->recv (mb); @@ -584,25 +571,23 @@ Peer::await_events (void) // Periodically send events via ACE_Reactor timer mechanism. int -Peer::handle_timeout (const ACE_Time_Value &, const void *) +Peer_Handler::handle_timeout (const ACE_Time_Value &, const void *) { - // Skip over deactivated descriptors. - if (this->get_handle () != -1) - { - // Unbind ourselves from the map. - if (this->map_->unbind (this->get_handle ()) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "unbind")); + // Shut down the handler. + return this->handle_close (); +} - // Shut down the handler. - this->handle_close (); - } - return 0; +Peer_Handler::~Peer_Handler (void) +{ + // Shut down the handler. + this->handle_close (); } // Handle shutdown of the Peer object. int -Peer::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +Peer_Handler::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) { if (this->get_handle () != ACE_INVALID_HANDLE) { @@ -634,12 +619,12 @@ Peer::handle_close (ACE_HANDLE, ACE_Reactor_Mask) // A factory class that accept connections from gatewayd and // dynamically creates a new Peer object to do the dirty work. -class Peer_Acceptor : public ACE_Acceptor<Peer, ACE_SOCK_ACCEPTOR> +class Peer_Acceptor : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> { public: - // = Initialization method. + // = Initialization and termination methods. Peer_Acceptor (void); - // Create the Peer singleton. + // Create the Peer. virtual int init (int argc, char *argv[]); // Initialize the acceptor. @@ -650,8 +635,8 @@ public: virtual int fini (void); // Perform termination. - virtual Peer *make_svc_handler (void); - // Factory method that creates the Peer once. + virtual Peer_Handler *make_svc_handler (void); + // Factory method that creates the Peer_Handler once. virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); // Handle various signals (e.g., SIGPIPE, SIGINT, and SIGQUIT) @@ -660,31 +645,28 @@ public: // Parse the command-line arguments. private: - HANDLER_MAP map_; - // Maps the ACE_HANDLE onto the Peer *. - - Peer *peer_; + Peer_Handler *peer_handler_; // Pointer to memory allocated exactly once. ACE_INET_Addr addr_; // Our addr. - typedef ACE_Acceptor<Peer, ACE_SOCK_ACCEPTOR> inherited; + typedef ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> inherited; }; Peer_Acceptor::Peer_Acceptor (void) : addr_ (ACE_DEFAULT_PEER_SERVER_PORT) { - ACE_NEW (peer_, Peer (&this->map_)); + ACE_NEW (peer_handler_, Peer_Handler); } -// Note how this method just passes back the pre-allocated Peer +// Note how this method just passes back the pre-allocated Peer_Handler // instead of having the ACE_Acceptor allocate a new one each time! -Peer * +Peer_Handler * Peer_Acceptor::make_svc_handler (void) { - return this->peer_; + return this->peer_handler_; } int @@ -731,24 +713,7 @@ Peer_Acceptor::info (char **strp, size_t length) const int Peer_Acceptor::fini (void) { - HANDLER_ITERATOR mi (this->map_); - - for (MAP_ENTRY *me = 0; - mi.next (me) != 0; - mi.advance ()) - { - if (me->int_id_->get_handle () != -1) - { - ACE_DEBUG ((LM_DEBUG, "closing down handle %d\n", - me->int_id_->get_handle ())); - me->int_id_->handle_close (); - } - else - ACE_DEBUG ((LM_DEBUG, "already closed %d\n")); - me->int_id_->destroy (); // Will trigger a delete. - } - - this->peer_->destroy (); // Will trigger a delete. + this->peer_handler_->destroy (); // Will trigger a delete. return inherited::fini (); } @@ -802,6 +767,6 @@ Peer_Acceptor::init (int argc, char *argv[]) } // The following is a "Factory" used by the ACE_Service_Config and -// svc.conf file to dynamically initialize the state of the Peer. +// svc.conf file to dynamically initialize the Peer_Acceptor. ACE_SVC_FACTORY_DEFINE (Peer_Acceptor) |