diff options
Diffstat (limited to 'apps/Gateway/Gateway')
20 files changed, 1711 insertions, 235 deletions
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h index e2fbc934c93..8d1b2979a49 100644 --- a/apps/Gateway/Gateway/Concurrency_Strategies.h +++ b/apps/Gateway/Gateway/Concurrency_Strategies.h @@ -29,7 +29,7 @@ typedef ACE_Null_Mutex MAP_MUTEX; #else /* ACE_HAS_THREADS */ // Note that we only need to make the ACE_Task thread-safe if we are -// using the multi-threaded Thr_Consumer_Handler... +// using the multi-threaded Thr_Consumer_Proxy... #if defined (USE_OUTPUT_MT) #define SYNCH_STRATEGY ACE_MT_SYNCH #else @@ -37,7 +37,7 @@ typedef ACE_Null_Mutex MAP_MUTEX; #endif /* USE_OUTPUT_MT || USE_INPUT_MT */ // Note that we only need to make the ACE_Map_Manager thread-safe if -// we are using the multi-threaded Thr_Supplier_Handler. In this +// we are using the multi-threaded Thr_Supplier_Proxy. In this // case, we use an RW_Mutex since we'll lookup Consumers far more // often than we'll update them. #if defined (USE_INPUT_MT) @@ -48,27 +48,27 @@ typedef ACE_Null_Mutex MAP_MUTEX; #endif /* ACE_HAS_THREADS */ // = Forward decls -class Thr_Consumer_Handler; -class Thr_Supplier_Handler; -class Consumer_Handler; -class Supplier_Handler; +class Thr_Consumer_Proxy; +class Thr_Supplier_Proxy; +class Consumer_Proxy; +class Supplier_Proxy; #if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) #if defined (USE_OUTPUT_MT) -typedef Thr_Consumer_Handler CONSUMER_HANDLER; +typedef Thr_Consumer_Proxy CONSUMER_HANDLER; #else -typedef Consumer_Handler CONSUMER_HANDLER; +typedef Consumer_Proxy CONSUMER_HANDLER; #endif /* USE_OUTPUT_MT */ #if defined (USE_INPUT_MT) -typedef Thr_Supplier_Handler SUPPLIER_HANDLER; +typedef Thr_Supplier_Proxy SUPPLIER_HANDLER; #else -typedef Supplier_Handler SUPPLIER_HANDLER; +typedef Supplier_Proxy SUPPLIER_HANDLER; #endif /* USE_INPUT_MT */ #else // Instantiate a non-multi-threaded Gateway. -typedef Supplier_Handler SUPPLIER_HANDLER; -typedef Consumer_Handler CONSUMER_HANDLER; +typedef Supplier_Proxy SUPPLIER_HANDLER; +typedef Consumer_Proxy CONSUMER_HANDLER; #endif /* ACE_HAS_THREADS */ #endif /* _CONCURRENCY_STRATEGIES */ diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index 4c2648addf0..7e99902b0db 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -28,11 +28,11 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, } // Get the logic id. - if ((read_result = this->getint (entry.logical_id_)) != FP::SUCCESS) + if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS) return read_result; // Get the payload type. - if ((read_result = this->getint (entry.payload_type_)) != FP::SUCCESS) + if ((read_result = this->getint (entry.type_)) != FP::SUCCESS) return read_result; // get all the destinations. @@ -104,7 +104,7 @@ int main (int argc, char *argv[]) { if (argc != 4) { // ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1); - cerr << argv[0] << " CCfilename RTfilename Mapfilename.\n"; + cerr << argv[0] << " CCfilename filename Mapfilename.\n"; exit (1); } FP_RETURN_TYPE result; @@ -130,30 +130,30 @@ int main (int argc, char *argv[]) } CCfile.close(); - Consumer_Config_File_Entry RTentry; - Consumer_Config_File_Parser RTfile; + Consumer_Config_File_Entry entry; + Consumer_Config_File_Parser file; - RTfile.open (argv[2]); + file.open (argv[2]); line_number = 0; printf ("\nConnID\tLogic\tPayload\tDestinations\n"); // Read config file line at a time. - while ((result = RTfile.read_entry (RTentry, line_number)) != EOF) + while ((result = file.read_entry (entry, line_number)) != EOF) { if (result != FP::SUCCESS) cerr << "Error at line " << line_number << endl; else { printf ("%d\t%d\t%d\t%d\t", - RTentry.conn_id_, RTentry.logical_id_, RTentry.payload_type_); - while (--RTentry.total_destinations_ >= 0) - printf ("%d,", RTentry.destinations_[RTentry.total_destinations_]); + entry.conn_id_, entry.supplier_id_, entry.type_); + while (--entry.total_destinations_ >= 0) + printf ("%d,", entry.destinations_[entry.total_destinations_]); printf ("\n"); } } - RTfile.close(); + file.close(); return 0; } diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 145c3233bae..2620301e25b 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -22,11 +22,11 @@ class Connection_Config_File_Entry // = TITLE - // Stores the IO_Handler entry for connection configuration. + // Stores the Proxy_Handler entry for connection configuration. { public: int conn_id_; - // Connection id for this IO_Handler. + // Connection id for this Proxy_Handler. char host_[BUFSIZ]; // Host to connect with. @@ -46,7 +46,7 @@ public: class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry> // = TITLE - // Parser for the IO_Handler Connection file. + // Parser for the Proxy_Handler Connection file. { public: virtual FP::Return_Type @@ -65,10 +65,10 @@ public: int conn_id_; // Connection id for this channel. - int logical_id_; + int supplier_id_; // Logical routing id for this channel. - int payload_type_; + int type_; // Type of payload in the message. int destinations_[MAX_DESTINATIONS]; diff --git a/apps/Gateway/Gateway/Dispatch_Set.h b/apps/Gateway/Gateway/Dispatch_Set.h new file mode 100644 index 00000000000..a867f1ca5ff --- /dev/null +++ b/apps/Gateway/Gateway/Dispatch_Set.h @@ -0,0 +1,28 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Dispatch_Set.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_DISPATCH_SET) +#define _DISPATCH_SET + +#include "ace/Set.h" + +// Forward reference. +class Proxy_Handler; + +typedef ACE_Unbounded_Set<Proxy_Handler *> Dispatch_Set; +typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Dispatch_Set_Iterator; + +#endif /* _DISPATCH_SET */ diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h index a8a9374be3c..24881c3e85b 100644 --- a/apps/Gateway/Gateway/Event.h +++ b/apps/Gateway/Gateway/Event.h @@ -17,33 +17,45 @@ #if !defined (EVENT) #define EVENT +#include "ace/OS.h" + // This is the unique connection identifier that denotes a particular -// IO_Handler in the Gateway. -typedef short CONN_ID; +// Proxy_Handler in the Gateway. +typedef ACE_INT32 ACE_INT32; class Event_Addr // = TITLE // Address used to identify the source/destination of an event. + // + // = DESCRIPTION + // This is really a "virtual forwarding address" thatis used to + // decouple the filtering and forwarding logic of the Event + // Channel from the format of the data. { public: - Event_Addr (CONN_ID cid = -1, unsigned char lid = 0, unsigned char pay = 0) - : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {} - - int operator== (const Event_Addr &pa) const + Event_Addr (ACE_INT32 cid = -1, + u_char sid = 0, + u_char type = 0) + : conn_id_ (cid), + supplier_id_ (sid), + type_ (type) {} + + int operator== (const Event_Addr &event_addr) const { - return this->conn_id_ == pa.conn_id_ - && this->logical_id_ == pa.logical_id_ - && this->payload_ == pa.payload_; + return this->conn_id_ == event_addr.conn_id_ + && this->supplier_id_ == event_addr.supplier_id_ + && this->type_ == event_addr.type_; } - CONN_ID conn_id_; - // Unique connection identifier that denotes a particular IO_Handler. + ACE_INT32 conn_id_; + // Unique connection identifier that denotes a particular + // Proxy_Handler. - unsigned char logical_id_; + ACE_INT32 supplier_id_; // Logical ID. - unsigned char payload_; - // Payload type. + ACE_INT32 type_; + // Event type. }; @@ -52,24 +64,27 @@ class Event_Header // Fixed sized header. { public: - typedef unsigned short SUPPLIER_ID; - // Type used to route messages from gatewayd. + typedef ACE_INT32 SUPPLIER_ID; + // Type used to forward events from gatewayd. enum { INVALID_ID = -1 // No peer can validly use this number. }; - SUPPLIER_ID routing_id_; + SUPPLIER_ID supplier_id_; // Source ID. + ACE_INT32 type_; + // Event type. + size_t len_; - // Length of the message in bytes. + // Length of the entire event (including data payload) in bytes. }; class Event // = TITLE - // Variable-sized message (buf_ may be variable-sized between + // Variable-sized event (data_ may be variable-sized between // 0 and MAX_PAYLOAD_SIZE). { public: @@ -77,10 +92,10 @@ public: // The maximum size of an Event. Event_Header header_; - // Message header. + // Event header. - char buf_[MAX_PAYLOAD_SIZE]; - // Message payload. + char data_[MAX_PAYLOAD_SIZE]; + // Event data. }; #endif /* EVENT */ diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 815755216c7..d146ddfb362 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -1,9 +1,10 @@ /* -*- C++ -*- */ // $Id$ +#define ACE_BUILD_SVC_DLL #include "ace/Get_Opt.h" #include "Config_Files.h" -#include "IO_Handler_Connector.h" +#include "Proxy_Handler_Connector.h" #include "Event_Channel.h" #if !defined (ACE_EVENT_CHANNEL_C) @@ -46,18 +47,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, size_t total_bytes_in = 0; size_t total_bytes_out = 0; - // Iterate through the consumer map connecting all the IO_Handlers. + // Iterate through the consumer map connecting all the Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; cti.next (me) != 0; cti.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; - if (io_handler->direction () == 'C') - total_bytes_out += io_handler->total_bytes (); - else // io_handler->direction () == 'S' - total_bytes_in += io_handler->total_bytes (); + if (proxy_handler->direction () == 'C') + total_bytes_out += proxy_handler->total_bytes (); + else // proxy_handler->direction () == 'S' + total_bytes_in += proxy_handler->total_bytes (); } #if defined (ACE_NLOGGING) @@ -108,16 +109,16 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) else synch_options = ACE_Synch_Options::synch; - // Iterate through the Consumer Map connecting all the IO_Handlers. + // Iterate through the Consumer Map connecting all the Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; cti.next (me) != 0; cti.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; if (this->connector_->initiate_connection - (io_handler, synch_options) == -1) + (proxy_handler, synch_options) == -1) continue; } @@ -125,7 +126,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) } // This method gracefully shuts down all the Handlers in the -// IO_Handler Connection Map. +// Proxy_Handler Connection Map. template <class SH, class CH> int ACE_Event_Channel<SH, CH>::close (void) @@ -136,26 +137,26 @@ ACE_Event_Channel<SH, CH>::close (void) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); #endif /* USE_INPUT_MT || USE_OUTPUT_MT */ - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // Iterate over all the handlers and shut them down. for (CONNECTION_MAP_ENTRY *me; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", - io_handler->id ())); + proxy_handler->id ())); - if (io_handler->state () != IO_Handler::IDLE) - // Mark IO_Handler as DISCONNECTING so we don't try to + if (proxy_handler->state () != Proxy_Handler::IDLE) + // Mark Proxy_Handler as DISCONNECTING so we don't try to // reconnect... - io_handler->state (IO_Handler::DISCONNECTING); + proxy_handler->state (Proxy_Handler::DISCONNECTING); - // Deallocate IO_Handler resources. - io_handler->destroy (); // Will trigger a delete. + // Deallocate Proxy_Handler resources. + proxy_handler->destroy (); // Will trigger a delete. } // Free up the resources allocated dynamically by the ACE_Connector. @@ -168,7 +169,10 @@ ACE_Event_Channel<SH, CH>::open (int argc, char *argv[]) { this->parse_args (argc, argv); - ACE_NEW_RETURN (this->connector_, IO_Handler_Connector (), -1); + ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1); + + // Ignore SIPPIPE so each Consumer_Proxy can handle it. + ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); if (this->active_connector_role_) { @@ -226,38 +230,38 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) entry.max_retry_delay_, entry.local_poconsumer_)); - IO_Handler *io_handler = 0; + Proxy_Handler *proxy_handler = 0; // The next few lines of code are dependent on whether we are - // making an Supplier_Handler or an Consumer_Handler. + // making an Supplier_Proxy or an Consumer_Proxy. - if (entry.direction_ == 'C') // Configure a Consumer_Handler. - ACE_NEW_RETURN (io_handler, - CONSUMER_HANDLER (&this->consumer_map_, + if (entry.direction_ == 'C') // Configure a Consumer_Proxy. + ACE_NEW_RETURN (proxy_handler, + CONSUMER_HANDLER (&this->efd_, this->connector_, ACE_Service_Config::thr_mgr (), this->socket_queue_size_), -1); - else /* direction == 'S' */ // Configure a Supplier_Handler. - ACE_NEW_RETURN (io_handler, - SUPPLIER_HANDLER (&this->consumer_map_, + else /* direction == 'S' */ // Configure a Supplier_Proxy. + ACE_NEW_RETURN (proxy_handler, + SUPPLIER_HANDLER (&this->efd_, this->connector_, ACE_Service_Config::thr_mgr (), this->socket_queue_size_), -1); - // The following code is common to both Supplier_Handlers_ and - // Consumer_Handlers. + // The following code is common to both Supplier_Proxys_ and + // Consumer_Proxys. // Initialize the routing entry's peer addressing info. - io_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), + proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_); // Initialize max timeout. - io_handler->max_timeout (entry.max_retry_delay_); + proxy_handler->max_timeout (entry.max_retry_delay_); - // Try to bind the new IO_Handler to the connection ID. - switch (this->connection_map_.bind (entry.conn_id_, io_handler)) + // Try to bind the new Proxy_Handler to the connection ID. + switch (this->connection_map_.bind (entry.conn_id_, proxy_handler)) { case -1: ACE_ERROR_RETURN ((LM_ERROR, @@ -277,7 +281,7 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) if (file_empty) ACE_ERROR ((LM_WARNING, - "warning: connection io_handler configuration file was empty\n")); + "warning: connection proxy_handler configuration file was empty\n")); return 0; } @@ -303,43 +307,37 @@ ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void) ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " "number of destinations = %d\n", entry.conn_id_, - entry.logical_id_, - entry.payload_type_, + entry.supplier_id_, + entry.type_, entry.total_destinations_)); for (int i = 0; i < entry.total_destinations_; i++) ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", i, entry.destinations_[i])); } - Consumer_Entry *re; - ACE_NEW_RETURN (re, Consumer_Entry, -1); - - Consumer_Entry::ENTRY_SET *io_handler_set; - ACE_NEW_RETURN (io_handler_set, Consumer_Entry::ENTRY_SET, -1); + Dispatch_Set *dispatch_set; + ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1); Event_Addr event_addr (entry.conn_id_, - entry.logical_id_, - entry.payload_type_); + entry.supplier_id_, + entry.type_); // Add the destinations to the Routing Entry. for (int i = 0; i < entry.total_destinations_; i++) { - IO_Handler *io_handler = 0; + Proxy_Handler *proxy_handler = 0; - // Lookup destination and add to Consumer_Entry set if found. + // Lookup destination and add to Dispatch_Set set if found. if (this->connection_map_.find (entry.destinations_[i], - io_handler) != -1) - io_handler_set->insert (io_handler); + proxy_handler) != -1) + dispatch_set->insert (proxy_handler); else ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", i, entry.destinations_[i])); } - // Attach set of destination io_handlers to routing entry. - re->destinations (io_handler_set); - // Bind with consumer map, keyed by peer address. - switch (this->consumer_map_.bind (event_addr, re)) + switch (this->efd_.bind (event_addr, dispatch_set)) { case -1: ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index 0cb928d7884..4e7afc5d328 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -17,7 +17,7 @@ #if !defined (ACE_EVENT_CHANNEL) #define ACE_EVENT_CHANNEL -#include "IO_Handler_Connector.h" +#include "Proxy_Handler_Connector.h" template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER> class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler @@ -70,22 +70,22 @@ private: int debug_; // Are we debugging? - IO_Handler_Connector *connector_; + Proxy_Handler_Connector *connector_; // This is used to establish the connections actively. int socket_queue_size_; // Size of the socket queue (0 means "use default"). // = Make life easier by defining typedefs. - typedef ACE_Map_Manager<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP; - typedef ACE_Map_Iterator<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; - typedef ACE_Map_Entry<CONN_ID, IO_Handler *> CONNECTION_MAP_ENTRY; + typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP; + typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; + typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY; CONNECTION_MAP connection_map_; - // Table that maps Connection IDs to IO_Handler *'s. + // Table that maps Connection IDs to Proxy_Handler *'s. - Consumer_Map consumer_map_; - // Map that associates event addresses to a set of Consumer_Handler + Event_Forwarding_Discriminator efd_; + // Map that associates event addresses to a set of Consumer_Proxy // *'s. }; diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp new file mode 100644 index 00000000000..8261ea13eb2 --- /dev/null +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp @@ -0,0 +1,61 @@ +/* -*- C++ -*- */ +// $Id$ + +#if !defined (_CONSUMER_MAP_C) +#define _CONSUMER_MAP_C + +#include "Event_Forwarding_Discriminator.h" + +// Bind the Event_Addr to the INT_ID. + +int +Event_Forwarding_Discriminator::bind (Event_Addr event_addr, + Dispatch_Set *Dispatch_Set) +{ + return this->map_.bind (event_addr, Dispatch_Set); +} + +// Find the Dispatch_Set corresponding to the Event_Addr. + +int +Event_Forwarding_Discriminator::find (Event_Addr event_addr, + Dispatch_Set *&Dispatch_Set) +{ + return this->map_.find (event_addr, Dispatch_Set); +} + +// Unbind (remove) the Event_Addr from the map. + +int +Event_Forwarding_Discriminator::unbind (Event_Addr event_addr) +{ + return this->map_.unbind (event_addr); +} + +Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &rt) + : map_iter_ (rt.map_) +{ +} + +int +Event_Forwarding_Discriminator_Iterator::next (Dispatch_Set *&ss) +{ + // Loop in order to skip over inactive entries if necessary. + + for (ACE_Map_Entry<Event_Addr, Dispatch_Set *> *temp = 0; + this->map_iter_.next (temp) != 0; + this->advance ()) + { + // Otherwise, return the next item. + ss = temp->int_id_; + return 1; + } + return 0; +} + +int +Event_Forwarding_Discriminator_Iterator::advance (void) +{ + return this->map_iter_.advance (); +} +#endif /* _CONSUMER_MAP_C */ diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h new file mode 100644 index 00000000000..35a594b61b5 --- /dev/null +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h @@ -0,0 +1,62 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Event_Forwarding_Discriminator.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_CONSUMER_MAP_H) +#define _CONSUMER_MAP_H + +#include "ace/Map_Manager.h" +#include "Concurrency_Strategies.h" +#include "Event.h" +#include "Dispatch_Set.h" + +class Event_Forwarding_Discriminator +{ + // = TITLE + // Define a generic consumer map based on the ACE Map_Manager. + // + // = DESCRIPTION + // This class makes it easier to use the Map_Manager. +public: + int bind (Event_Addr event, Dispatch_Set *Dispatch_Set); + // Associate Event with the Dispatch_Set. + + int find (Event_Addr event, Dispatch_Set *&Dispatch_Set); + // Break any association of EXID. + + int unbind (Event_Addr event); + // Locate EXID and pass out parameter via INID. If found, + // return 0, else -1. + +public: + ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_; + // Map that associates Event Addrs (external ids) with Dispatch_Set *'s + // <internal IDs>. +}; + +class Event_Forwarding_Discriminator_Iterator +{ + // = TITLE + // Define an iterator for the Consumer Map. +public: + Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm); + int next (Dispatch_Set *&); + int advance (void); + +private: + ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_iter_; + // Map we are iterating over. +}; +#endif /* _CONSUMER_MAP_H */ diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index 776d1b2f338..80b768aff84 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -57,7 +57,6 @@ protected: FP::Return_Type readword (char buf[]); int delimiter (char ch); - int endofline (char ch); int comments (char ch); int skipline (void); diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index 82666406070..2c963ff3d7f 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -6,11 +6,14 @@ #include "Gateway.h" class Gateway : public ACE_Service_Object + // = TITLE + // Integrates the whole Gateway application. + // + // = DESCRIPTION + // This implementation uses the <ACE_Event_Channel> as the basis + // for the <Gateway> routing. { public: - // = Initialization method. - Gateway (void); - // = Service configurator hooks. virtual int init (int argc, char *argv[]); // Perform initialization. @@ -34,6 +37,7 @@ protected: // file. ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_; + // The Event Channel routes events from Supplier(s) to Consumer(s). }; // Convenient shorthands. @@ -46,8 +50,6 @@ Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *) if (signum > 0) ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum)); - this->event_channel_.close (); - // Shut down the main event loop. ACE_Service_Config::end_reactor_event_loop (); return 0; @@ -68,22 +70,12 @@ Gateway::handle_input (ACE_HANDLE h) return this->handle_signal (h); } -// Give default values to data members. - - -Gateway::Gateway (void) -{ -} - int Gateway::init (int argc, char *argv[]) { if (this->event_channel_.open (argc, argv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1); - // Ignore SIPPIPE so each Consumer_Handler can handle it. - ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - ACE_Sig_Set sig_set; sig_set.sig_add (SIGINT); sig_set.sig_add (SIGQUIT); @@ -91,13 +83,12 @@ Gateway::init (int argc, char *argv[]) // Register ourselves to receive SIGINT and SIGQUIT // so we can shut down gracefully via signals. - if (ACE_Service_Config::reactor ()->register_handler (sig_set, - this) == -1) + if (ACE_Service_Config::reactor ()->register_handler + (sig_set, this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - if (ACE_Service_Config::reactor ()->register_handler (0, - this, - ACE_Event_Handler::READ_MASK) == -1) + if (ACE_Service_Config::reactor ()->register_handler + (0, this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); return 0; } diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile index f81df3d73af..0f5ddc07eb0 100644 --- a/apps/Gateway/Gateway/Makefile +++ b/apps/Gateway/Gateway/Makefile @@ -1,7 +1,7 @@ #---------------------------------------------------------------------------- # @(#)Makefile 1.1 10/18/96 # -# Makefile for the Gateway prototype. +# Makefile for the Gateway. #---------------------------------------------------------------------------- #---------------------------------------------------------------------------- @@ -12,15 +12,14 @@ BIN = gatewayd LIB = libGateway.a SHLIB = libGateway.so -FILES = Event_Channel \ - IO_Handler \ - IO_Handler_Connector \ - Config_Files \ +FILES = Config_Files \ File_Parser \ Gateway \ - Consumer_Entry \ - Consumer_Map \ - Thr_IO_Handler + Event_Channel \ + Event_Forwarding_Discriminator \ + Proxy_Handler \ + Proxy_Handler_Connector \ + Thr_Proxy_Handler LSRC = $(addsuffix .cpp,$(FILES)) LOBJ = $(addsuffix .o,$(FILES)) @@ -52,7 +51,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU # Default behavior is to use single-threading. See the README # file for information on how to configure this with multiple # strategies for threading the input and output channels. -DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT +# DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT #---------------------------------------------------------------------------- # Dependencies @@ -61,9 +60,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ - $(WRAPPER_ROOT)/ace/Get_Opt.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ +.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ $(WRAPPER_ROOT)/ace/config.h \ @@ -72,13 +69,38 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Msg.h \ $(WRAPPER_ROOT)/ace/Log_Record.h \ $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ $(WRAPPER_ROOT)/ace/Log_Record.i \ + Config_Files.h File_Parser.h +.obj/File_Parser.o .shobj/File_Parser.so: File_Parser.cpp \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/ACE.i \ - Config_Files.h File_Parser.h IO_Handler_Connector.h \ - $(WRAPPER_ROOT)/ace/Connector.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + File_Parser.h +.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/ACE.i \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ @@ -119,19 +141,21 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + Event_Channel.h Proxy_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_IO_Handler.h IO_Handler.h \ + Thr_Proxy_Handler.h Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h \ - Event_Channel.h -.obj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp Consumer_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ + Dispatch_Set.h Gateway.h +.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -143,7 +167,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - IO_Handler_Connector.h \ + Config_Files.h File_Parser.h Proxy_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ @@ -158,6 +182,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ $(WRAPPER_ROOT)/ace/Timer_Queue.h \ @@ -193,16 +218,14 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_IO_Handler.h IO_Handler.h \ + Thr_Proxy_Handler.h Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Consumer_Map.h Concurrency_Strategies.h Event.h -.obj/IO_Handler_Connector.o .shobj/IO_Handler_Connector.so: IO_Handler_Connector.cpp \ - IO_Handler_Connector.h \ - $(WRAPPER_ROOT)/ace/Connector.h \ - $(WRAPPER_ROOT)/ace/Service_Config.h \ - $(WRAPPER_ROOT)/ace/Service_Object.h \ - $(WRAPPER_ROOT)/ace/Shared_Object.h \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ + Dispatch_Set.h Event_Channel.h +.obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \ + Event_Forwarding_Discriminator.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -214,6 +237,34 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ + Concurrency_Strategies.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + Event.h Dispatch_Set.h \ + $(WRAPPER_ROOT)/ace/Set.h +.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Dispatch_Set.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/ACE.i \ + Proxy_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ @@ -224,7 +275,6 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ $(WRAPPER_ROOT)/ace/Timer_Queue.h \ @@ -260,37 +310,13 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_IO_Handler.h IO_Handler.h \ + Thr_Proxy_Handler.h Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h -.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/ACE.i \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - Config_Files.h File_Parser.h -.obj/File_Parser.o .shobj/File_Parser.so: File_Parser.cpp \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/ACE.i \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - File_Parser.h -.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h +.obj/Proxy_Handler_Connector.o .shobj/Proxy_Handler_Connector.so: Proxy_Handler_Connector.cpp \ + Proxy_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -345,36 +371,19 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ - Gateway.h -.obj/Consumer_Entry.o .shobj/Consumer_Entry.so: Consumer_Entry.cpp Consumer_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - $(WRAPPER_ROOT)/ace/ACE.i -.obj/Consumer_Map.o .shobj/Consumer_Map.so: Consumer_Map.cpp Consumer_Map.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - $(WRAPPER_ROOT)/ace/ACE.i \ - Concurrency_Strategies.h Event.h Consumer_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h -.obj/Thr_IO_Handler.o .shobj/Thr_IO_Handler.so: Thr_IO_Handler.cpp Thr_IO_Handler.h IO_Handler.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Task_T.h \ + $(WRAPPER_ROOT)/ace/Connector.i \ + Thr_Proxy_Handler.h Proxy_Handler.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ + Dispatch_Set.h +.obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \ + Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -435,10 +444,10 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - Consumer_Map.h \ + Event_Forwarding_Discriminator.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Concurrency_Strategies.h Event.h Consumer_Entry.h \ - IO_Handler_Connector.h \ + Concurrency_Strategies.h Event.h Dispatch_Set.h \ + Proxy_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Connector.i diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp new file mode 100644 index 00000000000..86e0fff8e41 --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler.cpp @@ -0,0 +1,698 @@ +// $Id$ + +#include "Dispatch_Set.h" +#include "Proxy_Handler_Connector.h" + +// Convenient short-hands. +#define CO CONDITION +#define MU MAP_MUTEX + +// The total number of bytes sent/received on this Proxy. + +size_t +Proxy_Handler::total_bytes (void) +{ + return this->total_bytes_; +} + +void +Proxy_Handler::total_bytes (size_t bytes) +{ + this->total_bytes_ += bytes; +} + +Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), + efd_ (efd), + id_ (-1), + total_bytes_ (0), + state_ (Proxy_Handler::IDLE), + connector_ (ioc), + timeout_ (1), + max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), + socket_queue_size_ (socket_queue_size) +{ +} + +// Set the direction. + +void +Proxy_Handler::direction (char d) +{ + this->direction_ = d; +} + +// Get the direction. + +char +Proxy_Handler::direction (void) +{ + return this->direction_; +} + +// Sets the timeout delay. + +void +Proxy_Handler::timeout (int to) +{ + if (to > this->max_timeout_) + to = this->max_timeout_; + + this->timeout_ = to; +} + +// Recalculate the current retry timeout delay using exponential +// backoff. Returns the original timeout (i.e., before the +// recalculation). + +int +Proxy_Handler::timeout (void) +{ + int old_timeout = this->timeout_; + this->timeout_ *= 2; + + if (this->timeout_ > this->max_timeout_) + this->timeout_ = this->max_timeout_; + + return old_timeout; +} + +// Sets the max timeout delay. + +void +Proxy_Handler::max_timeout (int mto) +{ + this->max_timeout_ = mto; +} + +// Gets the max timeout delay. + +int +Proxy_Handler::max_timeout (void) +{ + return this->max_timeout_; +} + +// Restart connection asynchronously when timeout occurs. + +int +Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n", + this->id (), this->timeout_)); + return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); +} + +// Restart connection (blocking_semantics dicates whether we +// restart synchronously or asynchronously). + +int +Proxy_Handler::reinitiate_connection (void) +{ + // Skip over deactivated descriptors. + if (this->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + this->peer ().close (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of Proxy_Handler %d\n", + this->id ())); + + // Reschedule ourselves to try and connect again. + if (ACE_Service_Config::reactor ()->schedule_timer + (this, 0, this->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + return 0; +} + +// Handle shutdown of the Proxy_Handler object. + +int +Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down Proxy_Handler %d on handle %d\n", + this->id (), this->get_handle ())); + + return this->reinitiate_connection (); +} + +// Set the state of the Proxy. + +void +Proxy_Handler::state (Proxy_Handler::State s) +{ + this->state_ = s; +} + +// Perform the first-time initiation of a connection to the peer. + +int +Proxy_Handler::initialize_connection (void) +{ + this->state_ = Proxy_Handler::ESTABLISHED; + + // Restart the timeout to 1. + this->timeout (1); + + // Action that sends the connection id to the peerd. + + ACE_INT32 id = htonl (this->id ()); + + ssize_t n = this->peer ().send ((const void *) &id, sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + n == 0 ? "peer has closed down unexpectedly" : "send"), + -1); + return 0; +} + +// Set the size of the socket queue. + +void +Proxy_Handler::socket_queue_size (void) +{ + if (this->socket_queue_size_ > 0) + { + int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; + + if (this->peer ().set_option (SOL_SOCKET, option, + &this->socket_queue_size_, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + } +} + +// Upcall from the ACE_Acceptor::handle_input() that +// delegates control to our application-specific Proxy_Handler. + +int +Proxy_Handler::open (void *a) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn on non-blocking I/O. + if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Call down to the base class to activate and register this handler. + if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); + + return this->initialize_connection (); +} + +// Return the current state of the Proxy. + +Proxy_Handler::State +Proxy_Handler::state (void) +{ + return this->state_; +} + +void +Proxy_Handler::id (ACE_INT32 id) +{ + this->id_ = id; +} + +ACE_INT32 +Proxy_Handler::id (void) +{ + return this->id_; +} + +// Set the peer's address information. +int +Proxy_Handler::bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 id) +{ + this->remote_addr_ = remote_addr; + this->local_addr_ = local_addr; + this->id_ = id; + return 0; +} + +ACE_INET_Addr & +Proxy_Handler::remote_addr (void) +{ + return this->remote_addr_; +} + +ACE_INET_Addr & +Proxy_Handler::local_addr (void) +{ + return this->local_addr_; +} + +// Constructor sets the consumer map pointer. + +Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'C'; + this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE); +} + +// This method should be called only when the Consumer shuts down +// unexpectedly. This method simply marks the Proxy_Handler as having +// failed so that handle_close () can reconnect. + +int +Consumer_Proxy::handle_input (ACE_HANDLE) +{ + char buf[1]; + + this->state (Proxy_Handler::FAILED); + + switch (this->peer ().recv (buf, sizeof buf)) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n", + this->id ()), -1); + /* NOTREACHED */ + case 0: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n", + this->id ()), -1); + /* NOTREACHED */ + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n", + this->id ()), -1); + /* NOTREACHED */ + } +} + +// Perform a non-blocking put() of event. If we are unable to send +// the entire event the remainder is re-queued at the *front* of the +// Event_List. + +int +Consumer_Proxy::nonblk_put (ACE_Message_Block *event) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // Event_List and ask the ACE_Reactor to inform us (via + // handle_output()) when it is possible to try again. + + ssize_t n = this->send (event); + + if (n == -1) + { + // Things have gone wrong, let's try to close down and set up a + // new reconnection by calling handle_close(). + this->state (Proxy_Handler::FAILED); + this->handle_close (); + return -1; + } + else if (errno == EWOULDBLOCK) // Didn't manage to send everything. + { + ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + // ACE_Queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); + + // Tell ACE_Reactor to call us back when we can send again. + else if (ACE_Service_Config::reactor ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); + return 0; + } + else + return n; +} + +ssize_t +Consumer_Proxy::send (ACE_Message_Block *event) +{ + ssize_t len = event->length (); + ssize_t n = this->peer ().send (event->rd_ptr (), len); + + if (n <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < len) + // Re-adjust pointer to skip over the part we did send. + event->rd_ptr (n); + else /* if (n == length) */ + { + // The whole event is sent, we can now safely deallocate the + // buffer. Note that this should decrement a reference count... + delete event; + errno = 0; + } + this->total_bytes (n); + return n; +} + +// Finish sending an event when flow control conditions abate. +// This method is automatically called by the ACE_Reactor. + +int +Consumer_Proxy::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *event = 0; + + ACE_DEBUG ((LM_DEBUG, + "(%t) in handle_output on handle %d\n", + this->get_handle ())); + // The list had better not be empty, otherwise there's a bug! + + if (this->msg_queue ()->dequeue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (event)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + // Didn't write everything this time, come back later... + break; + + case -1: + // We are responsible for freeing an ACE_Message_Block if + // failures occur. + delete event; + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); + + /* FALLTHROUGH */ + default: // Sent the whole thing. + + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the Message_Queue. If there aren't, tell the + // ACE_Reactor not to notify us anymore (at least until + // there are new events queued up). + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing deactivated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + + if (ACE_Service_Config::reactor ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup")); + } + } + } + else + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head")); + return 0; +} + +// Send an event to a Consumer (may queue if necessary). + +int +Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (event); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (event, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Constructor sets the consumer map pointer and the connector +// pointer. + +Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : msg_frag_ (0), + Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'S'; + this->msg_queue ()->high_water_mark (0); +} + +// Receive an Event from a Supplier. Handles fragmentation. +// +// The event returned from recv consists of two parts: +// +// 1. The Address part, contains the "virtual" routing id. +// +// 2. The Data part, which contains the actual data to be forwarded. +// +// The reason for having two parts is to shield the higher layers +// of software from knowledge of the event structure. + +int +Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) +{ + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event)), + -1); + + Event *event = (Event *) this->msg_frag_->rd_ptr (); + ssize_t header_received = 0; + + const ssize_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = + HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + header_received = this->peer ().recv + (this->msg_frag_->wr_ptr (), header_bytes_left_to_read); + + if (header_received == -1 /* error */ + || header_received == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, "%p\n", + "Recv error during header read ")); + ACE_DEBUG ((LM_DEBUG, + "attempted to read %d\n", + header_bytes_left_to_read)); + delete this->msg_frag_; + this->msg_frag_ = 0; + return header_received; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (header_received); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + "Partial header received: only %d bytes\n", + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + } + + // At this point there is a complete, valid header in msg_frag_ + ssize_t data_bytes_left_to_read = + sizeof (Event) - this->msg_frag_->length (); + + ssize_t data_received = + this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); + + // Try to receive the remainder of the event. + + switch (data_received) + { + case -1: + if (errno == EWOULDBLOCK) + // This might happen if only the header came through. + return -1; + else + /* FALLTHROUGH */; + + case 0: // Premature EOF. + delete this->msg_frag_; + this->msg_frag_ = 0; + return 0; + + default: + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (data_received); + + if (data_received != data_bytes_left_to_read) + { + errno = EWOULDBLOCK; + // Inform caller that we didn't get the whole event. + return -1; + } + else + { + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + // Allocate an event forwarding header and chain the data + // portion onto its continuation field. + forward_addr = new ACE_Message_Block (sizeof (Event_Addr), + ACE_Message_Block::MB_PROTO, + this->msg_frag_); + if (forward_addr == 0) + { + delete this->msg_frag_; + this->msg_frag_ = 0; + errno = ENOMEM; + return -1; + } + + Event_Addr event_addr (this->id (), + event->header_.supplier_id_, + event->header_.type_); + // Copy the forwarding address from the Event_Addr into + // forward_addr. + forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr)); + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + + this->total_bytes (data_received + header_received); +#if defined (VERBOSE) + ACE_DEBUG ((LM_DEBUG, "(%t) connection id = %d, supplier id = %d, len = %d, payload = %*s", + event_addr.conn_id_, event->header_.supplier_id_, event->header_.len_, + event->header_.len_, event->data_)); +#else + ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", + event->header_.supplier_id_, event->header_.len_, this->total_bytes ())); +#endif + return data_received + header_received; + } +} + +// Receive various types of input (e.g., Peer event from the +// gatewayd, as well as stdio). + +int +Supplier_Proxy::handle_input (ACE_HANDLE) +{ + ACE_Message_Block *forward_addr = 0; + + switch (this->recv (forward_addr)) + { + case 0: + // Note that a peer should never initiate a shutdown by closing + // the connection. Instead, it should reconnect. + this->state (Proxy_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else // A weird problem occurred, shut down and start again. + { + this->state (Proxy_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n", + "Peer has failed unexpectedly", + this->id ()), -1); + } + /* NOTREACHED */ + default: + return this->forward (forward_addr); + } +} + +// Forward an event to its appropriate Consumer(s). + +int +Supplier_Proxy::forward (ACE_Message_Block *forward_addr) +{ + // We got a valid event, so determine its virtual forwarding + // address, which is stored in the first of the two event blocks, + // which are chained together by this->recv(). + + Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr (); + + // Skip over the address portion and get the data. + const ACE_Message_Block *const data = forward_addr->cont (); + + // <dispatch_set> points to the set of Consumers associated with + // this forwarding address. + Dispatch_Set *dispatch_set = 0; + + if (this->efd_->find (*forwarding_addr, dispatch_set) != -1) + { + // Check to see if there are any destinations. + if (dispatch_set->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active destinations for this event currently\n")); + + else // There are destinations, so forward the event. + { + Dispatch_Set_Iterator dsi (*dispatch_set); + + for (Proxy_Handler **proxy_handler = 0; + dsi.next (proxy_handler) != 0; + dsi.advance ()) + { + // Only process active proxy_handlers. + if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) + { + // Clone the event portion (should be doing + // reference counting here...) + ACE_Message_Block *newmsg = data->clone (); + + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", + (*proxy_handler)->id ())); + + if ((*proxy_handler)->put (newmsg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", + "put", (*proxy_handler)->id ())); + + // We are responsible for freeing a + // ACE_Message_Block if failures occur. + delete newmsg; + } + } + } + // Will become superfluous once we have reference + // counting... + delete forward_addr; + return 0; + } + } + delete forward_addr; + // Failure return. + ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", + forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_)); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h new file mode 100644 index 00000000000..d91fa3108ff --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler.h @@ -0,0 +1,215 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Proxy_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_PROXY_HANDLER) +#define _PROXY_HANDLER + +#include "ace/Service_Config.h" +#include "ace/SOCK_Connector.h" +#include "ace/Svc_Handler.h" +#include "Event_Forwarding_Discriminator.h" +#include "Dispatch_Set.h" +#include "Event.h" + +// Forward declaration. +class Proxy_Handler_Connector; + +class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> + // = TITLE + // Proxy_Handler contains info about connection state and addressing. + // + // = DESCRIPTION + // The Proxy_Handler classes process events sent to the Event + // Channel from Suppliers and forward them to Consumers. +{ +public: + Proxy_Handler (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int open (void * = 0); + // Initialize and activate a single-threaded Proxy_Handler (called by + // ACE_Connector::handle_output()). + + int bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32); + // Set the peer's addressing and routing information. + + ACE_INET_Addr &remote_addr (void); + // Returns the peer's routing address. + + ACE_INET_Addr &local_addr (void); + // Returns our local address. + + // = Set/get routing id. + ACE_INT32 id (void); + void id (ACE_INT32); + + // = The current state of the Proxy_Handler. + enum State + { + IDLE = 1, // Prior to initialization. + CONNECTING, // During connection establishment. + ESTABLISHED, // Proxy_Handler is established and active. + DISCONNECTING, // Proxy_Handler is in the process of connecting. + FAILED // Proxy_Handler has failed. + }; + + // = Set/get the current state. + void state (State); + State state (void); + + // = Set/get the current retry timeout delay. + void timeout (int); + int timeout (void); + + // = Set/get the maximum retry timeout delay. + void max_timeout (int); + int max_timeout (void); + + // = Set/get direction (i.e., 'S' for Supplier and 'C' for Consumer + // (necessary for error checking). + void direction (char); + char direction (void); + + // = The total number of bytes sent/received on this channel. + size_t total_bytes (void); + void total_bytes (size_t bytes); + // Increment count by <bytes>. + + virtual int handle_timeout (const ACE_Time_Value &, const void *arg); + // Perform timer-based Proxy_Handler reconnection. + +protected: + enum + { + MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. + }; + + int initialize_connection (void); + // Perform the first-time initiation of a connection to the peer. + + int reinitiate_connection (void); + // Reinitiate a connection asynchronously when peers fail. + + void socket_queue_size (void); + // Set the socket queue size. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + // Perform Proxy_Handler termination. + + Event_Forwarding_Discriminator *efd_; + // Maps Events to a set of Consumers. + + ACE_INET_Addr remote_addr_; + // Address of peer. + + ACE_INET_Addr local_addr_; + // Address of us. + + ACE_INT32 id_; + // The assigned routing ID of this entry. + + size_t total_bytes_; + // The total number of bytes sent/received on this channel. + + State state_; + // The current state of the channel. + + Proxy_Handler_Connector *connector_; + // Back pointer to Proxy_Handler_Connector to reestablish broken + // connections. + + int timeout_; + // Amount of time to wait between reconnection attempts. + + int max_timeout_; + // Maximum amount of time to wait between reconnection attempts. + + char direction_; + // Indicates which direction data flows through the channel ('S' == + // Supplier and 'C' == Consumer). + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). +}; + +class Supplier_Proxy : public Proxy_Handler + // = TITLE + // Handles reception of Events from Suppliers + // + // = DESCRIPTION + // Performs framing and error checking. +{ +public: + Supplier_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + // Constructor sets the consumer map pointer. + +protected: + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); + // Receive and process peer events. + + virtual int recv (ACE_Message_Block *&); + // Receive an event from a Supplier. + + int forward (ACE_Message_Block *event); + // Forward the Event to a Consumer. + + ACE_Message_Block *msg_frag_; + // Keep track of event fragment to handle non-blocking recv's from + // Suppliers. +}; + +class Consumer_Proxy : public Proxy_Handler + // = TITLE + // Handles transmission of events to Consumers. + // + // = DESCRIPTION + // Uses a single-threaded approach. +{ +public: + Consumer_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send an event to a Consumer (may be queued if necessary). + +protected: + // = We'll allow up to 16 megabytes to be queued per-output channel. + enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16}; + + virtual int handle_output (ACE_HANDLE); + // Finish sending event when flow control conditions abate. + + int nonblk_put (ACE_Message_Block *mb); + // Perform a non-blocking put(). + + virtual ssize_t send (ACE_Message_Block *); + // Send an event to a Consumer. + + virtual int handle_input (ACE_HANDLE); + // Receive and process shutdowns from a Consumer. +}; + +#endif /* _PROXY_HANDLER */ diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp new file mode 100644 index 00000000000..7ac0a77a2d4 --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp @@ -0,0 +1,92 @@ +#include "Proxy_Handler_Connector.h" +// $Id$ + + +Proxy_Handler_Connector::Proxy_Handler_Connector (void) +{ +} + +// Override the connection-failure method to add timer support. +// Note that these timers perform "expoential backoff" to +// avoid rapidly trying to reestablish connections when a link +// goes down. + +int +Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) +{ + ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0; + + // Locate the ACE_Svc_Handler corresponding to the socket descriptor. + if (this->handler_map_.find (sd, stp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", + sd, "find"), -1); + + Proxy_Handler *channel = stp->svc_handler (); + + // Schedule a reconnection request at some point in the future + // (note that channel uses an exponential backoff scheme). + if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, + channel->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + return 0; +} + +// Initiate (or reinitiate) a connection to the Proxy_Handler. + +int +Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel, + ACE_Synch_Options &synch_options) +{ + char buf[MAXHOSTNAMELEN]; + + // Mark ourselves as idle so that the various iterators + // will ignore us until we are reconnected. + channel->state (Proxy_Handler::IDLE); + + if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 + || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "can't obtain peer's address"), -1); + + // Try to connect to the Peer. + + if (this->connect (channel, channel->remote_addr (), + synch_options, channel->local_addr ()) == -1) + { + if (errno != EWOULDBLOCK) + { + channel->state (Proxy_Handler::FAILED); + ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", + "connect", buf)); + + // Reschedule ourselves to try and connect again. + if (synch_options[ACE_Synch_Options::USE_REACTOR]) + { + if (ACE_Service_Config::reactor ()->schedule_timer + (channel, 0, channel->timeout ()) == 0) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + else + // Failures on synchronous connects are reported as errors + // so that the caller can decide how to proceed. + return -1; + } + else + { + channel->state (Proxy_Handler::CONNECTING); + ACE_DEBUG ((LM_DEBUG, + "(%t) in the process of connecting %s to %s\n", + synch_options[ACE_Synch_Options::USE_REACTOR] + ? "asynchronously" : "synchronously", buf)); + } + } + else + { + channel->state (Proxy_Handler::ESTABLISHED); + ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", + buf, channel->get_handle ())); + } + return 0; +} diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.h b/apps/Gateway/Gateway/Proxy_Handler_Connector.h new file mode 100644 index 00000000000..3baea75934a --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.h @@ -0,0 +1,40 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Proxy_Handler_Connector.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_IO_HANDLER_CONNECTOR) +#define _IO_HANDLER_CONNECTOR + +#include "ace/Connector.h" +#include "Thr_Proxy_Handler.h" + +class Proxy_Handler_Connector : public ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR> + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new Proxy_Handler object to do the dirty work... +{ +public: + Proxy_Handler_Connector (void); + + // Initiate (or reinitiate) a connection on the Proxy_Handler. + int initiate_connection (Proxy_Handler *, + ACE_Synch_Options & = ACE_Synch_Options::synch); + +protected: + // Override the connection-failure method to add timer support. + virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask); +}; + +#endif /* _IO_HANDLER_CONNECTOR */ diff --git a/apps/Gateway/Gateway/README b/apps/Gateway/Gateway/README index 4e986354aaa..e64ad26b568 100644 --- a/apps/Gateway/Gateway/README +++ b/apps/Gateway/Gateway/README @@ -2,15 +2,15 @@ This application illustrates an application-level Gateway which routes messages between Consumer and Suppliers in a distributed environment. The default configuration is single-threaded, i.e., all -Supplier_Handlers and Consumer_Handlers are multiplexed via the ACE +Supplier_Proxys and Consumer_Proxys are multiplexed via the ACE Reactor within a single thread of control. To obtain a version that -multi-threads both Consumer_Handlers and Supplier_Handlers simply set +multi-threads both Consumer_Proxys and Supplier_Proxys simply set the following flag in the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT -To get a version that uses single-threading for all Supplier_Handlers, -but a separate thread per-Consumer_Handler set the following flag in +To get a version that uses single-threading for all Supplier_Proxys, +but a separate thread per-Consumer_Proxy set the following flag in the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp new file mode 100644 index 00000000000..98722a96295 --- /dev/null +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp @@ -0,0 +1,204 @@ +#include "Thr_Proxy_Handler.h" +// $Id$ + +#include "Proxy_Handler_Connector.h" + +#if defined (ACE_HAS_THREADS) +Thr_Consumer_Proxy::Thr_Consumer_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Consumer_Proxy (efd, ioc, thr_mgr, socket_queue_size) +{ +} + +// This method should be called only when the peer shuts down +// unexpectedly. This method marks the Proxy_Handler as having failed and +// deactivates the ACE_Message_Queue (to wake up the thread blocked on +// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will +// eventually try to reconnect... + +int +Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) +{ + this->Consumer_Proxy::handle_input (h); + ACE_Service_Config::reactor ()->remove_handler (h, + ACE_Event_Handler::RWE_MASK + | ACE_Event_Handler::DONT_CALL); + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + return 0; +} + +// Initialize the threaded Consumer_Proxy object and spawn a new +// thread. + +int +Thr_Consumer_Proxy::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Register ourselves to receive input events (which indicate that + // the Peer has shut down unexpectedly). + if (ACE_Service_Config::reactor ()->register_handler (this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// ACE_Queue up a message for transmission (must not block since all +// Supplier_Proxys are single-threaded). + +int +Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + // Perform non-blocking enqueue. + return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Transmit messages to the peer (note simplification resulting from +// threads...) + +int +Thr_Consumer_Proxy::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Proxy's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread it is OK to block on + // output. + + for (ACE_Message_Block *mb = 0; + this->msg_queue ()->dequeue_head (mb) != -1; ) + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", + this->id (), this->get_handle ())); + + this->peer ().close (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + ACE_OS::sleep (tv); + } + } + + return 0; +} + +Thr_Supplier_Proxy::Thr_Supplier_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Supplier_Proxy (efd, ioc, thr_mgr, socket_queue_size) +{ +} + +int +Thr_Supplier_Proxy::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// Receive messages from a Peer in a separate thread (note reuse of +// existing code!). + +int +Thr_Supplier_Proxy::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Proxy's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread and processes + // messages for one connection it is OK to block on input and + // output. + + while (this->handle_input () != -1) + continue; + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n", + this->id (), + this->get_handle ())); + + this->peer ().close (); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", tv.sec ())); + ACE_OS::sleep (tv); + } + } + return 0; +} + +#endif /* ACE_HAS_THREADS */ diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h new file mode 100644 index 00000000000..8ecced3805c --- /dev/null +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Thr_Proxy_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_THR_IO_HANDLER) +#define _THR_IO_HANDLER + +#include "Proxy_Handler.h" + +#if defined (ACE_HAS_THREADS) +class Thr_Consumer_Proxy : public Consumer_Proxy + // = TITLE + // Runs each Output Proxy_Handler in a separate thread. +{ +public: + Thr_Consumer_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the threaded Consumer_Proxy object and spawn a new + // thread. + + virtual int handle_input (ACE_HANDLE); + // Called when Peer shutdown unexpectedly. + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send a message to a peer. + + virtual int svc (void); + // Transmit peer messages. +}; + +class Thr_Supplier_Proxy : public Supplier_Proxy + // = TITLE + // Runs each Input Proxy_Handler in a separate thread. +{ +public: + Thr_Supplier_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the object and spawn a new thread. + + virtual int svc (void); + // Transmit peer messages. +}; +#endif /* ACE_HAS_THREADS */ +#endif /* _THR_IO_HANDLER */ diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config index d33469ee157..58884340e61 100644 --- a/apps/Gateway/Gateway/consumer_config +++ b/apps/Gateway/Gateway/consumer_config @@ -1,8 +1,8 @@ -# Consumer map configuration file -# Conn ID Logical ID Payload Destinations -# ------- ---------- ------- ------------ -# 1 1 0 3,4,5 - 1 1 0 3 - 3 1 0 3 -# 4 1 0 4 -# 5 1 0 5 +# Consumer configuration file +# Conn ID Supplier ID Type Consumers +# ------- ----------- ------- ------------ +# 1 1 0 3,4,5 + 1 1 0 3 + 3 1 0 3 +# 4 1 0 4 +# 5 1 0 5 |