diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-15 16:38:54 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-15 16:38:54 +0000 |
commit | d0b22560e2f8ee49cb6b62c3cab0260bd29a939f (patch) | |
tree | 5c7b58240f4d2ab28cec729bb1b7d46a5d01f47f /apps/Gateway | |
parent | 38a7f5bae22d9f3b15804dff59cf26196ccd9838 (diff) | |
download | ATCD-d0b22560e2f8ee49cb6b62c3cab0260bd29a939f.tar.gz |
*** empty log message ***
Diffstat (limited to 'apps/Gateway')
26 files changed, 2100 insertions, 666 deletions
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h new file mode 100644 index 00000000000..e2fbc934c93 --- /dev/null +++ b/apps/Gateway/Gateway/Concurrency_Strategies.h @@ -0,0 +1,74 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Concurrency_strategies.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_CONCURRENCY_STRATEGIES) +#define _CONCURRENCY_STRATEGIES + +#include "ace/Synch.h" + +// The following typedefs are used in order to parameterize the +// synchronization policies without changing the source code! + +// If we don't have threads then use the single-threaded synchronization. +#if !defined (ACE_HAS_THREADS) +#define SYNCH_STRATEGY ACE_NULL_SYNCH +typedef ACE_Null_Mutex MAP_MUTEX; +#else /* ACE_HAS_THREADS */ + +// Note that we only need to make the ACE_Task thread-safe if we are +// using the multi-threaded Thr_Consumer_Handler... +#if defined (USE_OUTPUT_MT) +#define SYNCH_STRATEGY ACE_MT_SYNCH +#else +#define SYNCH_STRATEGY ACE_NULL_SYNCH +#endif /* USE_OUTPUT_MT || USE_INPUT_MT */ + +// Note that we only need to make the ACE_Map_Manager thread-safe if +// we are using the multi-threaded Thr_Supplier_Handler. In this +// case, we use an RW_Mutex since we'll lookup Consumers far more +// often than we'll update them. +#if defined (USE_INPUT_MT) +typedef ACE_RW_Mutex MAP_MUTEX; +#else +typedef ACE_Null_Mutex MAP_MUTEX; +#endif /* USE_INPUT_MT */ +#endif /* ACE_HAS_THREADS */ + +// = Forward decls +class Thr_Consumer_Handler; +class Thr_Supplier_Handler; +class Consumer_Handler; +class Supplier_Handler; + +#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) +#if defined (USE_OUTPUT_MT) +typedef Thr_Consumer_Handler CONSUMER_HANDLER; +#else +typedef Consumer_Handler CONSUMER_HANDLER; +#endif /* USE_OUTPUT_MT */ + +#if defined (USE_INPUT_MT) +typedef Thr_Supplier_Handler SUPPLIER_HANDLER; +#else +typedef Supplier_Handler SUPPLIER_HANDLER; +#endif /* USE_INPUT_MT */ +#else +// Instantiate a non-multi-threaded Gateway. +typedef Supplier_Handler SUPPLIER_HANDLER; +typedef Consumer_Handler CONSUMER_HANDLER; +#endif /* ACE_HAS_THREADS */ + +#endif /* _CONCURRENCY_STRATEGIES */ diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index f2466544e5b..4c2648addf0 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -8,25 +8,23 @@ typedef FP::Return_Type FP_RETURN_TYPE; FP_RETURN_TYPE -RT_Config_File_Parser::read_entry (RT_Config_File_Entry &entry, - int &line_number) +Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, + int &line_number) { FP_RETURN_TYPE read_result; - // increment the line count + + // Increment the line count. line_number++; - // Ignore comments, check for EOF and EOLINE - // if this succeeds, we have our connection id + // Ignore comments, check for EOF and EOLINE if this succeeds, we + // have our connection id. while ((read_result = this->getint (entry.conn_id_)) != FP::SUCCESS) { if (read_result == FP::EOFILE) return FP::EOFILE; else if (read_result == FP::EOLINE || read_result == FP::COMMENT) - { - // increment the line count - line_number++; - } + line_number++; } // Get the logic id. @@ -51,8 +49,8 @@ RT_Config_File_Parser::read_entry (RT_Config_File_Entry &entry, } FP_RETURN_TYPE -CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, - int &line_number) +Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, + int &line_number) { char buf[BUFSIZ]; FP_RETURN_TYPE read_result; @@ -67,10 +65,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, return FP::EOFILE; else if (read_result == FP::EOLINE || read_result == FP::COMMENT) - { - // increment the line count - line_number++; - } + line_number++; } // get the hostname @@ -83,7 +78,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.remote_port_ = (u_short) port; + entry.remote_poconsumer_ = (u_short) port; // Get the direction. if ((read_result = this->getword (buf)) != FP::SUCCESS) @@ -99,7 +94,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.local_port_ = (u_short) port; + entry.local_poconsumer_ = (u_short) port; return FP::SUCCESS; } @@ -113,8 +108,8 @@ int main (int argc, char *argv[]) exit (1); } FP_RETURN_TYPE result; - CC_Config_File_Entry CCentry; - CC_Config_File_Parser CCfile; + Connection_Config_File_Entry CCentry; + Connection_Config_File_Parser CCfile; CCfile.open (argv[1]); @@ -130,13 +125,13 @@ int main (int argc, char *argv[]) cerr << "Error at line " << line_number << endl; else printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n", - CCentry.conn_id_, CCentry.host_, CCentry.remote_port_, CCentry.direction_, - CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_port_); + CCentry.conn_id_, CCentry.host_, CCentry.remote_poconsumer_, CCentry.direction_, + CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_poconsumer_); } CCfile.close(); - RT_Config_File_Entry RTentry; - RT_Config_File_Parser RTfile; + Consumer_Config_File_Entry RTentry; + Consumer_Config_File_Parser RTfile; RTfile.open (argv[2]); @@ -165,6 +160,6 @@ int main (int argc, char *argv[]) #endif /* DEBUGGING */ #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class File_Parser<CC_Config_File_Entry>; -template class File_Parser<RT_Config_File_Entry>; +template class File_Parser<Connection_Config_File_Entry>; +template class File_Parser<Consumer_Config_File_Entry>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 9418815ff09..145c3233bae 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -21,42 +20,42 @@ #include "ace/OS.h" #include "File_Parser.h" -class CC_Config_File_Entry +class Connection_Config_File_Entry // = TITLE - // Stores the information in a Channel Connection entry. + // Stores the IO_Handler entry for connection configuration. { public: int conn_id_; - // Connection id for this Channel. + // Connection id for this IO_Handler. char host_[BUFSIZ]; // Host to connect with. - u_short remote_port_; + u_short remote_poconsumer_; // Port to connect with. char direction_; - // 'I' (input) or 'O' (output) + // 'S' (supplier) or 'C' (consumer). int max_retry_delay_; // Maximum amount of time to wait for reconnecting. - u_short local_port_; + u_short local_poconsumer_; // Our local port number. }; -class CC_Config_File_Parser : public File_Parser<CC_Config_File_Entry> +class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry> // = TITLE - // Parser for the Channel Connection file. + // Parser for the IO_Handler Connection file. { public: virtual FP::Return_Type - read_entry (CC_Config_File_Entry &entry, int &line_number); + read_entry (Connection_Config_File_Entry &entry, int &line_number); }; -class RT_Config_File_Entry +class Consumer_Config_File_Entry // = TITLE - // Stores the information in a Routing Table entry. + // Stores the information in a Consumer Map entry. { public: enum { @@ -79,13 +78,13 @@ public: // Total number of these destinations. }; -class RT_Config_File_Parser : public File_Parser<RT_Config_File_Entry> +class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry> // = TITLE - // Parser for the Routing Table file. + // Parser for the Consumer Map file. { public: virtual FP::Return_Type - read_entry (RT_Config_File_Entry &entry, int &line_number); + read_entry (Consumer_Config_File_Entry &entry, int &line_number); }; #endif /* _CONFIG_FILES */ diff --git a/apps/Gateway/Gateway/Consumer_Entry.cpp b/apps/Gateway/Gateway/Consumer_Entry.cpp new file mode 100644 index 00000000000..c3dcd96ebbf --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Entry.cpp @@ -0,0 +1,31 @@ +// Defines an entry in the Consumer Map. +// $Id$ + +#include "Consumer_Entry.h" + +Consumer_Entry::Consumer_Entry (void) +{ + ACE_NEW (this->destinations_, Consumer_Entry::ENTRY_SET); +} + +Consumer_Entry::~Consumer_Entry (void) +{ + delete this->destinations_; +} + +// Get the associated set of destinations. + +Consumer_Entry::ENTRY_SET * +Consumer_Entry::destinations (void) +{ + return this->destinations_; +} + +// Set the associated set of destinations. + +void +Consumer_Entry::destinations (Consumer_Entry::ENTRY_SET *s) +{ + this->destinations_ = s; +} + diff --git a/apps/Gateway/Gateway/Consumer_Entry.h b/apps/Gateway/Gateway/Consumer_Entry.h new file mode 100644 index 00000000000..fe502991514 --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Entry.h @@ -0,0 +1,45 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Consumer_Entry.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_ROUTING_ENTRY) +#define _ROUTING_ENTRY + +#include "ace/Set.h" + +// Forward reference. +class IO_Handler; + +class Consumer_Entry +{ + // = TITLE + // Defines an entry in the Consumer_Map. +public: + Consumer_Entry (void); + ~Consumer_Entry (void); + + typedef ACE_Unbounded_Set<IO_Handler *> ENTRY_SET; + typedef ACE_Unbounded_Set_Iterator<IO_Handler *> ENTRY_ITERATOR; + + // = Set/get the associated set of destinations. + ENTRY_SET *destinations (void); + void destinations (ENTRY_SET *); + +protected: + ENTRY_SET *destinations_; + // The set of destinations; +}; + +#endif /* _ROUTING_ENTRY */ diff --git a/apps/Gateway/Gateway/Consumer_Map.cpp b/apps/Gateway/Gateway/Consumer_Map.cpp new file mode 100644 index 00000000000..6d16601f949 --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Map.cpp @@ -0,0 +1,61 @@ +/* -*- C++ -*- */ +// $Id$ + +#if !defined (_CONSUMER_MAP_C) +#define _CONSUMER_MAP_C + +#include "Consumer_Map.h" + +// Bind the Event_Addr to the INT_ID. + +int +Consumer_Map::bind (Event_Addr event_addr, + Consumer_Entry *Consumer_Entry) +{ + return this->map_.bind (event_addr, Consumer_Entry); +} + +// Find the Consumer_Entry corresponding to the Event_Addr. + +int +Consumer_Map::find (Event_Addr event_addr, + Consumer_Entry *&Consumer_Entry) +{ + return this->map_.find (event_addr, Consumer_Entry); +} + +// Unbind (remove) the Event_Addr from the map. + +int +Consumer_Map::unbind (Event_Addr event_addr) +{ + return this->map_.unbind (event_addr); +} + +Consumer_Map_Iterator::Consumer_Map_Iterator (Consumer_Map &rt) + : map_iter_ (rt.map_) +{ +} + +int +Consumer_Map_Iterator::next (Consumer_Entry *&ss) +{ + // Loop in order to skip over inactive entries if necessary. + + for (ACE_Map_Entry<Event_Addr, Consumer_Entry *> *temp = 0; + this->map_iter_.next (temp) != 0; + this->advance ()) + { + // Otherwise, return the next item. + ss = temp->int_id_; + return 1; + } + return 0; +} + +int +Consumer_Map_Iterator::advance (void) +{ + return this->map_iter_.advance (); +} +#endif /* _CONSUMER_MAP_C */ diff --git a/apps/Gateway/Gateway/Consumer_Map.h b/apps/Gateway/Gateway/Consumer_Map.h new file mode 100644 index 00000000000..fd392afaf6e --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Map.h @@ -0,0 +1,62 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Consumer_Map.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_CONSUMER_MAP_H) +#define _CONSUMER_MAP_H + +#include "ace/Map_Manager.h" +#include "Concurrency_Strategies.h" +#include "Event.h" +#include "Consumer_Entry.h" + +class Consumer_Map +{ + // = TITLE + // Define a generic consumer map based on the ACE Map_Manager. + // + // = DESCRIPTION + // This class makes it easier to use the Map_Manager. +public: + int bind (Event_Addr event, Consumer_Entry *Consumer_Entry); + // Associate Event with the Consumer_Entry. + + int find (Event_Addr event, Consumer_Entry *&Consumer_Entry); + // Break any association of EXID. + + int unbind (Event_Addr event); + // Locate EXID and pass out parameter via INID. If found, + // return 0, else -1. + +public: + ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_; + // Map that associates Event Addrs (external ids) with Consumer_Entry *'s + // <internal IDs>. +}; + +class Consumer_Map_Iterator +{ + // = TITLE + // Define an iterator for the Consumer Map. +public: + Consumer_Map_Iterator (Consumer_Map &mm); + int next (Consumer_Entry *&); + int advance (void); + +private: + ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_iter_; + // Map we are iterating over. +}; +#endif /* _CONSUMER_MAP_H */ diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h new file mode 100644 index 00000000000..a8a9374be3c --- /dev/null +++ b/apps/Gateway/Gateway/Event.h @@ -0,0 +1,86 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Event.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (EVENT) +#define EVENT + +// This is the unique connection identifier that denotes a particular +// IO_Handler in the Gateway. +typedef short CONN_ID; + +class Event_Addr + // = TITLE + // Address used to identify the source/destination of an event. +{ +public: + Event_Addr (CONN_ID cid = -1, unsigned char lid = 0, unsigned char pay = 0) + : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {} + + int operator== (const Event_Addr &pa) const + { + return this->conn_id_ == pa.conn_id_ + && this->logical_id_ == pa.logical_id_ + && this->payload_ == pa.payload_; + } + + CONN_ID conn_id_; + // Unique connection identifier that denotes a particular IO_Handler. + + unsigned char logical_id_; + // Logical ID. + + unsigned char payload_; + // Payload type. +}; + + +class Event_Header + // = TITLE + // Fixed sized header. +{ +public: + typedef unsigned short SUPPLIER_ID; + // Type used to route messages from gatewayd. + + enum + { + INVALID_ID = -1 // No peer can validly use this number. + }; + + SUPPLIER_ID routing_id_; + // Source ID. + + size_t len_; + // Length of the message in bytes. +}; + +class Event + // = TITLE + // Variable-sized message (buf_ may be variable-sized between + // 0 and MAX_PAYLOAD_SIZE). +{ +public: + enum { MAX_PAYLOAD_SIZE = 1024 }; + // The maximum size of an Event. + + Event_Header header_; + // Message header. + + char buf_[MAX_PAYLOAD_SIZE]; + // Message payload. +}; + +#endif /* EVENT */ diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h new file mode 100644 index 00000000000..47dd8572012 --- /dev/null +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -0,0 +1,99 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Event_Channel.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (ACE_EVENT_CHANNEL) +#define ACE_EVENT_CHANNEL + +#include "IO_Handler_Connector.h" + +template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER> +class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler + // = TITLE + // Define a generic Event_Channel. +{ +public: + // = Initialization and termination methods. + ACE_Event_Channel (void); + + int open (int argc, char *argv[]); + // Initialize the Channel. + + int close (void); + // Close down the Channel. + +private: + int parse_args (int argc, char *argv[]); + // Parse the command-line arguments. + + int parse_connection_config_file (void); + // Parse the connection configuration file. + + int parse_consumer_config_file (void); + // Parse the consumer map configuration file. + + int initiate_connections (void); + // Initiate connections to the peers. + + virtual int handle_timeout (const ACE_Time_Value &, const void *arg); + // Perform timer-based performance profiling. + + const char *connection_config_file_; + // Name of the connection configuration file. + + const char *consumer_config_file_; + // Name of the consumer map configuration file. + + int active_connector_role_; + // Enabled if we are playing the role of the active Connector. + + int performance_window_; + // Number of seconds after connection establishment to report + // throughput. + + int blocking_semantics_; + // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + + int debug_; + // Are we debugging? + + IO_Handler_Connector *connector_; + // This is used to establish the connections actively. + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). + + // = Make life easier by defining typedefs. + typedef ACE_Map_Manager<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP; + typedef ACE_Map_Iterator<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; + typedef ACE_Map_Entry<CONN_ID, IO_Handler *> CONNECTION_MAP_ENTRY; + + CONNECTION_MAP connection_map_; + // Table that maps Connection IDs to IO_Handler *'s. + + Consumer_Map consumer_map_; + // Map that associates event addresses to a set of Consumer_Handler + // *'s. +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Event_Channel.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Event_Channel.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_EVENT_CHANNEL */ diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index e36ae71ca94..776d1b2f338 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -38,8 +37,8 @@ public: template <class ENTRY> class File_Parser // = TITLE - // Class used to parse the configuration file for the routing - // table. + // Class used to parse the configuration file for the Consumer + // Map. { public: // = Open and Close the file specified diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index f249eb2f37d..82666406070 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -1,315 +1,87 @@ /* -*- C++ -*- */ // $Id$ - - -#include "ace/Get_Opt.h" #include "ace/Service_Config.h" -#include "Config_Files.h" +#include "Event_Channel.h" #include "Gateway.h" -#include "Channel_Connector.h" -template <class INPUT_CHANNEL, class OUTPUT_CHANNEL> class Gateway : public ACE_Service_Object { public: - Gateway (ACE_Thread_Manager * = 0); + // = Initialization method. + Gateway (void); + // = Service configurator hooks. virtual int init (int argc, char *argv[]); // Perform initialization. virtual int fini (void); // Perform termination. + virtual int info (char **, size_t) const; + // Return info about this service. + protected: int handle_input (ACE_HANDLE); + // Shut down the Gateway when input comes in from the controlling + // console. int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); - - typedef ACE_Map_Manager<CONN_ID, Channel *, MUTEX> CONFIG_TABLE; - typedef ACE_Map_Iterator<CONN_ID, Channel *, MUTEX> CONFIG_ITERATOR; - - CONFIG_TABLE config_table_; - // Table that maps Connection IDs to Channel *'s. - - ROUTING_TABLE routing_table_; - // Table that maps Peer addresses to a set of Channel *'s for output. - - virtual int info (char **, size_t) const; - // Return info about this service. + // Shut down the Gateway when a signal arrives. int parse_args (int argc, char *argv[]); - // Parse gateway configuration arguments obtained from svc.conf file. - - int parse_cc_config_file (void); - // Parse the channel connection configuration file. - - int parse_rt_config_file (void); - // Parse the routing table configuration file. - - int initiate_connections (void); - // Initiate connections to the peers. - - virtual int handle_timeout (const ACE_Time_Value &, const void *arg); - // Perform timer-based performance profiling. - - const char *cc_config_file_; - // Name of the channel connection configuration file. + // Parse gateway configuration arguments obtained from svc.conf + // file. - const char *rt_config_file_; - // Name of the routing table configuration file. - - int performance_window_; - // Number of seconds after connection establishment to report throughput. - - int blocking_semantics_; - // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. - - int debug_; - // Are we debugging? - - Channel_Connector *connector_; - // This is used to establish the connections actively. - - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). - - // = Manage output and input channel threads (if used.) - // if both input and output mt is used, they will share thr_mgr_, - // thr_mgr_ will always reference the thread manager being used - // regardless of whether input, output, or both channels are using mt. - ACE_Thread_Manager *thr_mgr_; - ACE_Thread_Manager *input_thr_mgr_; - ACE_Thread_Manager *output_thr_mgr_; + ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_; }; // Convenient shorthands. +// #define IC SUPPLIER_HANDLER +// #define OC CONSUMER_HANDLER -#define IC INPUT_CHANNEL -#define OC OUTPUT_CHANNEL - -template <class IC, class OC> int -Gateway<IC, OC>::handle_signal (int signum, siginfo_t *, ucontext_t *) +int +Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *) { if (signum > 0) ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum)); - if (this->thr_mgr_ != 0) - { -#if defined (ACE_HAS_THREADS) - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); - if (this->thr_mgr_->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); -#endif /* ACE_HAS_THREADS */ - } + this->event_channel_.close (); // Shut down the main event loop. ACE_Service_Config::end_reactor_event_loop (); return 0; } -template <class IC, class OC> int -Gateway<IC, OC>::handle_input (ACE_HANDLE h) +int +Gateway::handle_input (ACE_HANDLE h) { - if (ACE_Service_Config::reactor ()->remove_handler (0, - ACE_Event_Handler::READ_MASK - | ACE_Event_Handler::DONT_CALL) == -1) + if (ACE_Service_Config::reactor ()->remove_handler + (0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "remove_handler"), -1); + char buf[BUFSIZ]; // Consume the input... ACE_OS::read (h, buf, sizeof (buf)); - return this->handle_signal (h); -} -template <class IC, class OC> int -Gateway<IC, OC>::handle_timeout (const ACE_Time_Value &, const void *) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); - CONFIG_ITERATOR cti (this->config_table_); - - // If we've got a ACE_Thread Manager then use it to suspend all - // the threads. This will enable us to get an accurate count. - - if (this->thr_mgr_ != 0) - { -#if defined (ACE_HAS_THREADS) - if (this->thr_mgr_->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads...")); -#endif /* ACE_HAS_THREADS */ - } - - size_t total_bytes_in = 0; - size_t total_bytes_out = 0; - - // Iterate through the routing table connecting all the channels. - - for (ACE_Map_Entry <CONN_ID, Channel *> *me = 0; - cti.next (me) != 0; - cti.advance ()) - { - Channel *channel = me->int_id_; - if (channel->direction () == 'O') - total_bytes_out += channel->total_bytes (); - else - total_bytes_in += channel->total_bytes (); - } - -#if defined (ACE_NLOGGING) - ACE_OS::fprintf (stderr, "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - performance_window_, - total_bytes_in, - total_bytes_out); - - ACE_OS::fprintf (stderr, "%f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_))); - - ACE_OS::fprintf (stderr, "%f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))); -#else - ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - this->performance_window_, - total_bytes_in, - total_bytes_out)); - ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)))); - ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)))); -#endif /* ACE_NLOGGING */ - // Resume all the threads again. - if (this->thr_mgr_ != 0) - { -#if defined (ACE_HAS_THREADS) - this->thr_mgr_->resume_all (); - ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads...")); -#endif /* ACE_HAS_THREADS */ - } - return 0; + // Shut us down. + return this->handle_signal (h); } // Give default values to data members. -template <class IC, class OC> -Gateway<IC, OC>::Gateway (ACE_Thread_Manager *thr_mgr) - : cc_config_file_ ("cc_config"), - rt_config_file_ ("rt_config"), - performance_window_ (0), - blocking_semantics_ (ACE_NONBLOCK), - debug_ (0), - connector_ (0), - socket_queue_size_ (0), - thr_mgr_ (thr_mgr), - input_thr_mgr_ (thr_mgr), - output_thr_mgr_ (thr_mgr) -{ -} -// Parse the "command-line" arguments and set the corresponding flags. - -template <class IC, class OC> int -Gateway<IC, OC>::parse_args (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "bc:dr:q:w:", 0); - - for (int c; (c = get_opt ()) != -1; ) - { - switch (c) - { - case 'b': // Use blocking connection establishment. - this->blocking_semantics_ = 0; - break; - case 'c': - this->cc_config_file_ = get_opt.optarg; - break; - case 'd': - this->debug_ = 1; - break; - case 'q': - this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg); - break; - case 'r': - this->rt_config_file_ = get_opt.optarg; - break; - case 'w': // Time performance for a designated amount of time. - this->performance_window_ = ACE_OS::atoi (get_opt.optarg); - // Use blocking connection semantics so that we get accurate - // timings (since all connections start at once). - this->blocking_semantics_ = 0; - break; - default: - break; - } - } - return 0; -} - -// Initiate connections with the peers. - -template <class IC, class OC> int -Gateway<IC, OC>::initiate_connections (void) -{ - CONFIG_ITERATOR cti (this->config_table_); - - // Iterate through the routing table connecting all the channels. - - for (ACE_Map_Entry <CONN_ID, Channel *> *me = 0; - cti.next (me) != 0; - cti.advance ()) - { - Channel *channel = me->int_id_; - if (this->connector_->initiate_connection - (channel, this->blocking_semantics_ == ACE_NONBLOCK - ? ACE_Synch_Options::asynch : ACE_Synch_Options::synch) == -1) - continue; - } - - return 0; -} - -// This method is automatically called when the gateway -// is shutdown. It gracefully shuts down all the Channels -// in the Channel connection Config_Table. - -template <class IC, class OC> int -Gateway<IC, OC>::fini (void) +Gateway::Gateway (void) { - // Question: do we need to do anything special about the Routing_Table? - - CONFIG_ITERATOR cti (this->config_table_); - - for (ACE_Map_Entry <CONN_ID, Channel *> *me; - cti.next (me) != 0; - cti.advance ()) - { - Channel *channel = me->int_id_; - ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", - channel->id ())); - if (channel->state () != Channel::IDLE) - // Mark channel as DISCONNECTING so we don't try to reconnect... - channel->state (Channel::DISCONNECTING); - - // Deallocate Channel resources. - channel->destroy (); // Will trigger a delete. - } - - // Free up the resources allocated dynamically by the ACE_Connector. - delete this->connector_; - delete this->thr_mgr_; - - return 0; } -template <class IC, class OC> int -Gateway<IC, OC>::init (int argc, char *argv[]) +int +Gateway::init (int argc, char *argv[]) { - this->parse_args (argc, argv); - - ACE_NEW_RETURN (this->connector_, Channel_Connector (), -1); - - if (this->connector_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "out of memory"), -1); + if (this->event_channel_.open (argc, argv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1); - // Ignore SIPPIPE so each Output_Channel can handle it. + // Ignore SIPPIPE so each Consumer_Handler can handle it. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); ACE_Sig_Set sig_set; @@ -327,46 +99,23 @@ Gateway<IC, OC>::init (int argc, char *argv[]) this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + return 0; +} - if (this->thr_mgr_ == 0) - // Create a thread manager if using some combination of multi-threaded channels. -#if defined (USE_OUTPUT_MT) && defined (USE_INPUT_MT) - this->thr_mgr_ = this->output_thr_mgr_ = - this->input_thr_mgr_ = ACE_Service_Config::thr_mgr (); -#elif defined (USE_OUTPUT_MT) - this->thr_mgr_ = this->output_thr_mgr_ = ACE_Service_Config::thr_mgr (); -#elif defined (USE_INPUT_MT) - this->thr_mgr_ = this->input_thr_mgr_ = ACE_Service_Config::thr_mgr (); -#endif - - // Parse the connection configuration file. - this->parse_cc_config_file (); - - // Parse the routing table config file and build the routing table. - this->parse_rt_config_file (); - - // Initiate connections with the peers. - this->initiate_connections (); - - // If this->performance_window_ > 0 start a timer. - - if (this->performance_window_ > 0) - { - if (ACE_Service_Config::reactor ()->schedule_timer (this, 0, - this->performance_window_) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); - else - ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", - this->performance_window_)); - } +// This method is automatically called when the gateway is shutdown. +// It closes down the Event Channel. +int +Gateway::fini (void) +{ + this->event_channel_.close (); return 0; } // Returns information on the currently active service. -template <class IC, class OC> int -Gateway<IC, OC>::info (char **strp, size_t length) const +int +Gateway::info (char **strp, size_t length) const { char buf[BUFSIZ]; @@ -380,182 +129,7 @@ Gateway<IC, OC>::info (char **strp, size_t length) const return ACE_OS::strlen (buf); } -// Parse and build the connection table. - -template <class IC, class OC> int -Gateway<IC, OC>::parse_cc_config_file (void) -{ - // File that contains the routing table configuration information. - CC_Config_File_Parser cc_file; - CC_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (cc_file.open (this->cc_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->cc_config_file_), -1); - - // Read config file line at a time. - while (cc_file.read_entry (entry, line_number) != FP::EOFILE) - { - file_empty = 0; - - if (this->debug_) - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, " - "direction = %c, max retry timeout = %d, local port = %d\n", - entry.conn_id_, entry.host_, entry.remote_port_, entry.direction_, - entry.max_retry_delay_, entry.local_port_)); - - Channel *channel = 0; - - // The next few lines of code are dependent on whether we are making - // an Input_Channel or an Output_Channel. - - if (entry.direction_ == 'O') // Configure an output channel. - ACE_NEW_RETURN (channel, - OUTPUT_CHANNEL (&this->routing_table_, - this->connector_, - this->output_thr_mgr_, - this->socket_queue_size_), - -1); - else /* direction == 'I' */ // Configure an input channel. - ACE_NEW_RETURN (channel, - INPUT_CHANNEL (&this->routing_table_, - this->connector_, - this->input_thr_mgr_, - this->socket_queue_size_), - -1); - if (channel == 0) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) out of memory\n"), -1); - - // The following code is common to both Input_ and Output_Channels. - - // Initialize the routing entry's peer addressing info. - channel->bind (ACE_INET_Addr (entry.remote_port_, entry.host_), - ACE_INET_Addr (entry.local_port_), entry.conn_id_); - - // Initialize max timeout. - channel->max_timeout (entry.max_retry_delay_); - - // Try to bind the new Channel to the connection ID. - switch (this->config_table_.bind (entry.conn_id_, channel)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, "(%t) duplicate connection %d, already bound\n", - entry.conn_id_)); - break; - case 0: - // Success. - break; - } - } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: connection channel configuration file was empty\n")); - - return 0; -} - -template <class IC, class OC> int -Gateway<IC, OC>::parse_rt_config_file (void) -{ - // File that contains the routing table configuration information. - RT_Config_File_Parser rt_file; - RT_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (rt_file.open (this->rt_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->rt_config_file_), -1); - - // Read config file line at a time. - while (rt_file.read_entry (entry, line_number) != FP::EOFILE) - { - file_empty = 0; - - if (this->debug_) - { - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " - "number of destinations = %d\n", - entry.conn_id_, entry.logical_id_, entry.payload_type_, - entry.total_destinations_)); - for (int i = 0; i < entry.total_destinations_; i++) - ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - Routing_Entry *re; - ACE_NEW_RETURN (re, Routing_Entry, -1); - Routing_Entry::ENTRY_SET *channel_set = new Routing_Entry::ENTRY_SET; - Peer_Addr peer_addr (entry.conn_id_, entry.logical_id_, - entry.payload_type_); - - // Add the destinations to the Routing Entry. - for (int i = 0; i < entry.total_destinations_; i++) - { - Channel *channel = 0; - - // Lookup destination and add to Routing_Entry set if found. - if (this->config_table_.find (entry.destinations_[i], - channel) != -1) - channel_set->insert (channel); - else - ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - // Attach set of destination channels to routing entry. - re->destinations (channel_set); - - // Bind with routing table, keyed by peer address. - switch (this->routing_table_.bind (peer_addr, re)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, "(%t) duplicate routing table entry %d, " - "already bound\n", entry.conn_id_)); - break; - case 0: - // Success. - break; - } - } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: routing table configuration file was empty\n")); - - return 0; -} - -#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) -#if defined (USE_OUTPUT_MT) -typedef Thr_Output_Channel OUTPUT_CHANNEL; -#else -typedef Output_Channel OUTPUT_CHANNEL; -#endif /* USE_OUTPUT_MT */ - -#if defined (USE_INPUT_MT) -typedef Thr_Input_Channel INPUT_CHANNEL; -#else -typedef Input_Channel INPUT_CHANNEL; -#endif /* USE_INPUT_MT */ -#else -// Instantiate a non-multi-threaded Gateway. -typedef Input_Channel INPUT_CHANNEL; -typedef Output_Channel OUTPUT_CHANNEL; -#endif /* ACE_HAS_THREADS */ - -typedef Gateway<INPUT_CHANNEL, OUTPUT_CHANNEL> ACE_Gateway; - // The following is a "Factory" used by the ACE_Service_Config and // svc.conf file to dynamically initialize the state of the Gateway. -ACE_SVC_FACTORY_DEFINE (ACE_Gateway) +ACE_SVC_FACTORY_DEFINE (Gateway) diff --git a/apps/Gateway/Gateway/Gateway.h b/apps/Gateway/Gateway/Gateway.h index b00d87628de..057ce981701 100644 --- a/apps/Gateway/Gateway/Gateway.h +++ b/apps/Gateway/Gateway/Gateway.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -20,11 +19,7 @@ #include "ace/OS.h" -ACE_SVC_FACTORY_DECLARE (ACE_Gateway) +ACE_SVC_FACTORY_DECLARE (Gateway) #endif /* ACE_GATEWAY */ - - - - diff --git a/apps/Gateway/Gateway/IO_Handler.cpp b/apps/Gateway/Gateway/IO_Handler.cpp new file mode 100644 index 00000000000..94997955979 --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler.cpp @@ -0,0 +1,710 @@ +// $Id$ + +#include "Consumer_Entry.h" +#include "IO_Handler_Connector.h" + +// Convenient short-hands. +#define CO CONDITION +#define MU MAP_MUTEX + +// The total number of bytes sent/received on this channel. + +size_t +IO_Handler::total_bytes (void) +{ + return this->total_bytes_; +} + +void +IO_Handler::total_bytes (size_t bytes) +{ + this->total_bytes_ += bytes; +} + +IO_Handler::IO_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), + consumer_map_ (consumer_map), + id_ (-1), + total_bytes_ (0), + state_ (IO_Handler::IDLE), + connector_ (ioc), + timeout_ (1), + max_timeout_ (IO_Handler::MAX_RETRY_TIMEOUT), + socket_queue_size_ (socket_queue_size) +{ +} + +// Set the associated channel. + +void +IO_Handler::active (int a) +{ + this->state (a == 0 ? IO_Handler::IDLE : IO_Handler::ESTABLISHED); +} + +// Get the associated channel. + +int +IO_Handler::active (void) +{ + return this->state () == IO_Handler::ESTABLISHED; +} + +// Set the direction. + +void +IO_Handler::direction (char d) +{ + this->direction_ = d; +} + +// Get the direction. + +char +IO_Handler::direction (void) +{ + return this->direction_; +} + +// Sets the timeout delay. + +void +IO_Handler::timeout (int to) +{ + if (to > this->max_timeout_) + to = this->max_timeout_; + + this->timeout_ = to; +} + +// Recalculate the current retry timeout delay using exponential +// backoff. Returns the original timeout (i.e., before the +// recalculation). + +int +IO_Handler::timeout (void) +{ + int old_timeout = this->timeout_; + this->timeout_ *= 2; + + if (this->timeout_ > this->max_timeout_) + this->timeout_ = this->max_timeout_; + + return old_timeout; +} + +// Sets the max timeout delay. + +void +IO_Handler::max_timeout (int mto) +{ + this->max_timeout_ = mto; +} + +// Gets the max timeout delay. + +int +IO_Handler::max_timeout (void) +{ + return this->max_timeout_; +} + +// Restart connection asynchronously when timeout occurs. + +int +IO_Handler::handle_timeout (const ACE_Time_Value &, const void *) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) attempting to reconnect IO_Handler %d with timeout = %d\n", + this->id (), this->timeout_)); + return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); +} + +// Restart connection (blocking_semantics dicates whether we +// restart synchronously or asynchronously). + +int +IO_Handler::reinitiate_connection (void) +{ + // Skip over deactivated descriptors. + if (this->get_handle () != -1) + { + // Make sure to close down peer to reclaim descriptor. + this->peer ().close (); + +#if 0 +// if (this->state () == FAILED) +// { + // Reinitiate timeout to improve reconnection time. +// this->timeout (1); +#endif + + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of IO_Handler %d\n", + this->id ())); + + // Reschedule ourselves to try and connect again. + if (ACE_Service_Config::reactor ()->schedule_timer + (this, 0, this->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + return 0; +} + +// Handle shutdown of the IO_Handler object. + +int +IO_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down IO_Handler %d on handle %d\n", + this->id (), this->get_handle ())); + + return this->reinitiate_connection (); +} + +// Set the state of the channel. + +void +IO_Handler::state (IO_Handler::State s) +{ + this->state_ = s; +} + +// Perform the first-time initiation of a connection to the peer. + +int +IO_Handler::initialize_connection (void) +{ + this->state_ = IO_Handler::ESTABLISHED; + + // Restart the timeout to 1. + this->timeout (1); + +#if defined (ASSIGN_SUPPLIER_ID) + // Action that sends the route id to the peerd. + + CONN_ID id = htons (this->id ()); + + ssize_t n = this->peer ().send ((const void *) &id, sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + n == 0 ? "gatewayd has closed down unexpectedly" : "send"), + -1); +#endif /* ASSIGN_SUPPLIER_ID */ + return 0; +} + +// Set the size of the socket queue. + +void +IO_Handler::socket_queue_size (void) +{ + if (this->socket_queue_size_ > 0) + { + int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; + + if (this->peer ().set_option (SOL_SOCKET, option, + &this->socket_queue_size_, sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + } +} + +// Upcall from the ACE_Acceptor::handle_input() that +// delegates control to our application-specific IO_Handler. + +int +IO_Handler::open (void *a) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) IO_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn on non-blocking I/O. + if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Call down to the base class to activate and register this handler. + if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); + + return this->initialize_connection (); +} + +// Return the current state of the channel. + +IO_Handler::State +IO_Handler::state (void) +{ + return this->state_; +} + +void +IO_Handler::id (CONN_ID id) +{ + this->id_ = id; +} + +CONN_ID +IO_Handler::id (void) +{ + return this->id_; +} + +// Set the peer's address information. +int +IO_Handler::bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + CONN_ID id) +{ + this->remote_addr_ = remote_addr; + this->local_addr_ = local_addr; + this->id_ = id; + return 0; +} + +ACE_INET_Addr & +IO_Handler::remote_addr (void) +{ + return this->remote_addr_; +} + +ACE_INET_Addr & +IO_Handler::local_addr (void) +{ + return this->local_addr_; +} + +// Constructor sets the consumer map pointer. + +Consumer_Handler::Consumer_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'C'; + this->msg_queue ()->high_water_mark (Consumer_Handler::QUEUE_SIZE); +} + +// This method should be called only when the peer shuts down +// unexpectedly. This method simply marks the IO_Handler as +// having failed so that handle_close () can reconnect. + +int +Consumer_Handler::handle_input (ACE_HANDLE) +{ + char buf[1]; + + this->state (IO_Handler::FAILED); + + switch (this->peer ().recv (buf, sizeof buf)) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has failed unexpectedly for Output IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + case 0: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has shutdown unexpectedly for Output IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer is sending input on Output IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + } +} + +// Perform a non-blocking put() of event MB. If we are unable to +// send the entire event the remainder is re-queued at the *front* of +// the Event_List. + +int +Consumer_Handler::nonblk_put (ACE_Message_Block *mb) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // Event_List and ask the ACE_Reactor to inform us (via + // handle_output()) when it is possible to try again. + + ssize_t n; + + if ((n = this->send (mb)) == -1) + { + // Things have gone wrong, let's try to close down and set up a new reconnection. + this->state (IO_Handler::FAILED); + this->handle_close (); + return -1; + } + else if (errno == EWOULDBLOCK) // Didn't manage to send everything. + { + ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + // ACE_Queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); + + // Tell ACE_Reactor to call us back when we can send again. + else if (ACE_Service_Config::reactor ()-> + schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); + return 0; + } + else + return n; +} + +int +Consumer_Handler::send (ACE_Message_Block *mb) +{ + ssize_t n; + size_t len = mb->length (); + + if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < len) + // Re-adjust pointer to skip over the part we did send. + mb->rd_ptr (n); + else /* if (n == length) */ + { + // The whole event is sent, we can now safely deallocate the + // buffer. Note that this should decrement a reference count... + delete mb; + errno = 0; + } + this->total_bytes (n); + return n; +} + +// Finish sending an event when flow control conditions abate. +// This method is automatically called by the ACE_Reactor. + +int +Consumer_Handler::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *mb = 0; + int status = 0; + + ACE_DEBUG ((LM_DEBUG, + "(%t) in handle_output on handle %d\n", + this->get_handle ())); + // The list had better not be empty, otherwise there's a bug! + + if (this->msg_queue ()->dequeue_head + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (mb)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + // Didn't write everything this time, come back later... + break; + + case -1: + // Caller is responsible for freeing a ACE_Message_Block if failures occur. + delete mb; + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); + + /* FALLTHROUGH */ + default: // Sent the whole thing. + + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the Event_List. If there aren't, tell the + // ACE_Reactor not to notify us anymore (at least until + // there are new events queued up). + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing deactivated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + + if (ACE_Service_Config::reactor ()-> + cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup")); + } + } + } + else + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head")); + return 0; +} + +// Send an event to a peer (may queue if necessary). + +int +Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (mb); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Constructor sets the consumer map pointer and the connector +// pointer. + +Supplier_Handler::Supplier_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : msg_frag_ (0), + IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'S'; + this->msg_queue ()->high_water_mark (0); +} + +// Receive a Peer event from peerd. Handles fragmentation. +// +// The routing event returned from recv consists of two parts: +// 1. The Address part, contains the virtual routing id. +// 2. The Data part, which contains the actual data to be routed. +// +// The reason for having two parts is to shield the higher layers +// of software from knowledge of the event structure. + +int +Supplier_Handler::recv (ACE_Message_Block *&forward_addr) +{ + Event *event; + size_t len; + ssize_t n = 0; + ssize_t m = 0; + size_t offset = 0; + + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event)), + -1); + + event = (Event *) this->msg_frag_->rd_ptr (); + + const ssize_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + n = this->peer ().recv (this->msg_frag_->wr_ptr (), + header_bytes_left_to_read); + + if (n == -1 /* error */ + || n == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, "%p\n", + "Recv error during header read ")); + ACE_DEBUG ((LM_DEBUG, + "attempted to read %d\n", + header_bytes_left_to_read)); + delete this->msg_frag_; + this->msg_frag_ = 0; + return n; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (n); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + "Partial header received: only %d bytes\n", + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + } + + // At this point there is a complete, valid header in msg_frag_ + len = sizeof event->buf_ + HEADER_SIZE - this->msg_frag_->length (); + + // Try to receive the remainder of the event + + switch (m = this->peer ().recv (event->buf_ + offset, len)) + { + case -1: + if (errno == EWOULDBLOCK) + { + // This shouldn't happen since the ACE_Reactor + // just triggered us to handle pending I/O! + ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n")); + errno = EWOULDBLOCK; + return -1; + } + else + /* FALLTHROUGH */; + + case 0: // Premature EOF. + delete this->msg_frag_; + this->msg_frag_ = 0; + return 0; + + default: + if (m != len) + // Re-adjust pointer to skip over the part we've read. + { + this->msg_frag_->wr_ptr (m); + errno = EWOULDBLOCK; + return -1; // Inform caller that we didn't get the whole event. + } + else + { + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (m); + + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + // Allocate an event forwarding header and chain the data + // portion onto its continuation field. + ACE_NEW_RETURN (forward_addr, + ACE_Message_Block (sizeof (Event_Addr), + ACE_Message_Block::MB_PROTO, + this->msg_frag_), + -1); + + Event_Addr event_addr (this->id (), event->header_.routing_id_, 0); + // Copy the forwarding address from the Event_Addr into + // forward_addr. + forward_addr->copy ((char *) &event_addr, sizeof (Event)); + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + this->total_bytes (m + n); +#if defined (VERBOSE) + ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s", + event_addr.conn_id_, event->header_.routing_id_, event->header_.len_, + event->header_.len_, event->buf_)); +#else + ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n", + event->header_.routing_id_, event->header_.len_, this->total_bytes ())); +#endif + return m + n; + } +} + +// Receive various types of input (e.g., Peer event from the +// gatewayd, as well as stdio). + +int +Supplier_Handler::handle_input (ACE_HANDLE) +{ + ACE_Message_Block *forward_addr = 0; + + switch (this->recv (forward_addr)) + { + case 0: + // Note that a peer should never initiate a shutdown. + this->state (IO_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has closed down unexpectedly for Input IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else // A weird problem occurred, shut down and start again. + { + this->state (IO_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input IO_Handler %d\n", + "Peer has failed unexpectedly", + this->id ()), -1); + } + /* NOTREACHED */ + default: + return this->forward (forward_addr); + } +} + +// Route an event to its appropriate destination. + +int +Supplier_Handler::forward (ACE_Message_Block *forward_addr) +{ + // We got a valid event, so determine its virtual routing id, + // which is stored in the first of the two event blocks chained + // together. + + Event_Addr *forwarding_key = (Event_Addr *) forward_addr->rd_ptr (); + + // Skip over the address portion. + const ACE_Message_Block *const data = forward_addr->cont (); + + // RE points to the routing entry located for this routing id. + Consumer_Entry *re = 0; + + if (this->consumer_map_->find (*forwarding_key, re) != -1) + { + // Check to see if there are any destinations. + if (re->destinations ()->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active destinations for this event currently\n")); + + else // There are destinations, so forward the event. + { + Consumer_Entry::ENTRY_SET *esp = re->destinations (); + Consumer_Entry::ENTRY_ITERATOR si (*esp); + + for (IO_Handler **channel = 0; si.next (channel) != 0; si.advance ()) + { + // Only process active channels. + if ((*channel)->active ()) + { + // Clone the event portion (should be doing reference counting here...) + ACE_Message_Block *newmsg = data->clone (); + + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ())); + + if ((*channel)->put (newmsg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n", + "put", (*channel)->id ())); + + // Caller is responsible for freeing a ACE_Message_Block if failures occur. + delete newmsg; + } + } + } + // Will become superfluous once we have reference counting... + delete forward_addr; + return 0; + } + } + delete forward_addr; + // Failure return. + ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", + forwarding_key->conn_id_, forwarding_key->logical_id_, forwarding_key->payload_)); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX>; +template class ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX>; +template class ACE_Map_Entry<Event_Addr, Consumer_Entry *>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/IO_Handler.h b/apps/Gateway/Gateway/IO_Handler.h new file mode 100644 index 00000000000..c22f1a3df26 --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler.h @@ -0,0 +1,224 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// IO_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_IO_HANDLER) +#define _IO_HANDLER + +#include "ace/Service_Config.h" +#include "ace/SOCK_Connector.h" +#include "ace/Svc_Handler.h" +#include "Consumer_Map.h" +#include "Consumer_Entry.h" +#include "Event.h" + +// Forward declaration. +class IO_Handler_Connector; + +class IO_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> + // = TITLE + // IO_Handler contains info about connection state and addressing. + // + // = DESCRIPTION + // The IO_Handler classes process events sent from the peers to the + // gateway. These classes works as follows: + // + // 1. IO_Handler_Connector creates a number of connections with the set of + // peers specified in a configuration file. + // + // 2. For each peer that connects successfully, IO_Handler_Connector + // creates an IO_Handler object. Each object assigns a unique routing + // id to its associated peer. The Handlers are used by gatewayd + // that to receive, route, and forward events from source peer(s) + // to destination peer(s). +{ +public: + IO_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int open (void * = 0); + // Initialize and activate a single-threaded IO_Handler (called by + // ACE_Connector::handle_output()). + + int bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + CONN_ID); + // Set the peer's addressing and routing information. + + ACE_INET_Addr &remote_addr (void); + // Returns the peer's routing address. + + ACE_INET_Addr &local_addr (void); + // Returns our local address. + + // = Set/get routing id. + CONN_ID id (void); + void id (CONN_ID); + + // = Set/get the current state of the IO_Handler. + enum State + { + IDLE = 1, // Prior to initialization. + CONNECTING, // During connection establishment. + ESTABLISHED, // IO_Handler is established and active. + DISCONNECTING, // IO_Handler is in the process of connecting. + FAILED // IO_Handler has failed. + }; + + // = Set/get the current state. + State state (void); + void state (State); + + // = Set/get the current retry timeout delay. + int timeout (void); + void timeout (int); + + // = Set/get the maximum retry timeout delay. + int max_timeout (void); + void max_timeout (int); + + // = Set/get IO_Handler activity status. + int active (void); + void active (int); + + // = Set/get direction (necessary for error checking). + char direction (void); + void direction (char); + + // = The total number of bytes sent/received on this channel. + size_t total_bytes (void); + void total_bytes (size_t bytes); + // Increment count by <bytes>. + + virtual int handle_timeout (const ACE_Time_Value &, const void *arg); + // Perform timer-based IO_Handler reconnection. + +protected: + enum + { + MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. + }; + + int initialize_connection (void); + // Perform the first-time initiation of a connection to the peer. + + int reinitiate_connection (void); + // Reinitiate a connection asynchronously when peers fail. + + void socket_queue_size (void); + // Set the socket queue size. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + // Perform IO_Handler termination. + + Consumer_Map *consumer_map_; + // Pointer to table that maps an event + // to a Set of IO_Handler *'s for output. + + ACE_INET_Addr remote_addr_; + // Address of peer. + + ACE_INET_Addr local_addr_; + // Address of us. + + CONN_ID id_; + // The assigned routing ID of this entry. + + size_t total_bytes_; + // The total number of bytes sent/received on this channel. + + State state_; + // The current state of the channel. + + IO_Handler_Connector *connector_; + // Back pointer to IO_Handler_Connector to reestablish broken + // connections. + + int timeout_; + // Amount of time to wait between reconnection attempts. + + int max_timeout_; + // Maximum amount of time to wait between reconnection attempts. + + char direction_; + // Indicates which direction data flows through the channel ('O' == + // output and 'I' == input). + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). +}; + +class Supplier_Handler : public IO_Handler + // = TITLE + // Handle reception of Peer events arriving as events. +{ +public: + Supplier_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + // Constructor sets the consumer map pointer. + + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); + // Receive and process peer events. + +protected: + virtual int recv (ACE_Message_Block *&); + // Receive an event from a Supplier. + + int forward (ACE_Message_Block *event); + // Forward the Event to a Consumer. + + ACE_Message_Block *msg_frag_; + // Keep track of event fragment to handle non-blocking recv's from + // Suppliers. +}; + +class Consumer_Handler : public IO_Handler + // = TITLE + // Handle transmission of events to other Peers using a + // single-threaded approach. +{ +public: + Consumer_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send an event to a Consumer (may be queued if necessary). + +protected: + // = We'll allow up to 16 megabytes to be queued per-output + // channel. + enum {QUEUE_SIZE = 1024 * 1024 * 16}; + + virtual int handle_input (ACE_HANDLE); + // Receive and process shutdowns from a Consumer. + + virtual int handle_output (ACE_HANDLE); + // Finish sending event when flow control conditions abate. + + int nonblk_put (ACE_Message_Block *mb); + // Perform a non-blocking put(). + + virtual int send (ACE_Message_Block *); + // Send an event to a Consumer. +}; + +#endif /* _IO_HANDLER */ diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.cpp b/apps/Gateway/Gateway/IO_Handler_Connector.cpp new file mode 100644 index 00000000000..712b348951d --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler_Connector.cpp @@ -0,0 +1,92 @@ +#include "IO_Handler_Connector.h" +// $Id$ + + +IO_Handler_Connector::IO_Handler_Connector (void) +{ +} + +// Override the connection-failure method to add timer support. +// Note that these timers perform "expoential backoff" to +// avoid rapidly trying to reestablish connections when a link +// goes down. + +int +IO_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) +{ + ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0; + + // Locate the ACE_Svc_Handler corresponding to the socket descriptor. + if (this->handler_map_.find (sd, stp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", + sd, "find"), -1); + + IO_Handler *channel = stp->svc_handler (); + + // Schedule a reconnection request at some point in the future + // (note that channel uses an exponential backoff scheme). + if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, + channel->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + return 0; +} + +// Initiate (or reinitiate) a connection to the IO_Handler. + +int +IO_Handler_Connector::initiate_connection (IO_Handler *channel, + ACE_Synch_Options &synch_options) +{ + char buf[MAXHOSTNAMELEN]; + + // Mark ourselves as idle so that the various iterators + // will ignore us until we are reconnected. + channel->state (IO_Handler::IDLE); + + if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 + || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "can't obtain peer's address"), -1); + + // Try to connect to the Peer. + + if (this->connect (channel, channel->remote_addr (), + synch_options, channel->local_addr ()) == -1) + { + if (errno != EWOULDBLOCK) + { + channel->state (IO_Handler::FAILED); + ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", + "connect", buf)); + + // Reschedule ourselves to try and connect again. + if (synch_options[ACE_Synch_Options::USE_REACTOR]) + { + if (ACE_Service_Config::reactor ()->schedule_timer + (channel, 0, channel->timeout ()) == 0) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + else + // Failures on synchronous connects are reported as errors + // so that the caller can decide how to proceed. + return -1; + } + else + { + channel->state (IO_Handler::CONNECTING); + ACE_DEBUG ((LM_DEBUG, + "(%t) in the process of connecting %s to %s\n", + synch_options[ACE_Synch_Options::USE_REACTOR] + ? "asynchronously" : "synchronously", buf)); + } + } + else + { + channel->state (IO_Handler::ESTABLISHED); + ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", + buf, channel->get_handle ())); + } + return 0; +} diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.h b/apps/Gateway/Gateway/IO_Handler_Connector.h new file mode 100644 index 00000000000..585428c88ee --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler_Connector.h @@ -0,0 +1,40 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// IO_Handler_Connector.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_IO_HANDLER_CONNECTOR) +#define _IO_HANDLER_CONNECTOR + +#include "ace/Connector.h" +#include "Thr_IO_Handler.h" + +class IO_Handler_Connector : public ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR> + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new IO_Handler object to do the dirty work... +{ +public: + IO_Handler_Connector (void); + + // Initiate (or reinitiate) a connection on the IO_Handler. + int initiate_connection (IO_Handler *, + ACE_Synch_Options & = ACE_Synch_Options::synch); + +protected: + // Override the connection-failure method to add timer support. + virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask); +}; + +#endif /* _IO_HANDLER_CONNECTOR */ diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile index b2115abbd59..3b54556c4f6 100644 --- a/apps/Gateway/Gateway/Makefile +++ b/apps/Gateway/Gateway/Makefile @@ -9,17 +9,18 @@ #---------------------------------------------------------------------------- BIN = gatewayd -LIB = libGateway.a +#LIB = libGateway.a SHLIB = libGateway.so -FILES = Channel \ - Channel_Connector \ +FILES = Event_Channel \ + IO_Handler \ + IO_Handler_Connector \ Config_Files \ File_Parser \ Gateway \ - Routing_Entry \ - Routing_Table \ - Thr_Channel + Consumer_Entry \ + Consumer_Map \ + Thr_IO_Handler LSRC = $(addsuffix .cpp,$(FILES)) LOBJ = $(addsuffix .o,$(FILES)) @@ -51,7 +52,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU # Default behavior is to use single-threading. See the README # file for information on how to configure this with multiple # strategies for threading the input and output channels. -DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT +DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT #---------------------------------------------------------------------------- # Dependencies @@ -60,8 +61,8 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Channel.o .shobj/Channel.so: Channel.cpp Routing_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h \ +.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -73,7 +74,7 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - Channel_Connector.h \ + Config_Files.h File_Parser.h IO_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ @@ -87,19 +88,80 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Proactor.h \ $(WRAPPER_ROOT)/ace/Message_Block.h \ $(WRAPPER_ROOT)/ace/Malloc.h \ $(WRAPPER_ROOT)/ace/Malloc_T.h \ $(WRAPPER_ROOT)/ace/Memory_Pool.h \ - $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Task_T.h \ + $(WRAPPER_ROOT)/ace/Connector.i \ + Thr_IO_Handler.h IO_Handler.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ + Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h \ + Event_Channel.h +.obj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp Consumer_Entry.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/ACE.i \ + IO_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -113,21 +175,30 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_Channel.h Channel.h \ + Thr_IO_Handler.h IO_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Routing_Table.h Peer_Message.h -.obj/Channel_Connector.o .shobj/Channel_Connector.so: Channel_Connector.cpp Channel_Connector.h \ + Consumer_Map.h Concurrency_Strategies.h Event.h +.obj/IO_Handler_Connector.o .shobj/IO_Handler_Connector.so: IO_Handler_Connector.cpp \ + IO_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ @@ -152,20 +223,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -179,20 +242,28 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_Channel.h Channel.h \ + Thr_IO_Handler.h IO_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Routing_Table.h Routing_Entry.h Peer_Message.h + Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h .obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -220,7 +291,9 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Record.i \ File_Parser.h .obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \ - $(WRAPPER_ROOT)/ace/Get_Opt.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -232,9 +305,6 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - $(WRAPPER_ROOT)/ace/Service_Config.h \ - $(WRAPPER_ROOT)/ace/Service_Object.h \ - $(WRAPPER_ROOT)/ace/Shared_Object.h \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ @@ -244,20 +314,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -271,23 +333,20 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ - $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ - Config_Files.h File_Parser.h Gateway.h Channel_Connector.h \ - $(WRAPPER_ROOT)/ace/Connector.h \ - $(WRAPPER_ROOT)/ace/Map_Manager.h \ - $(WRAPPER_ROOT)/ace/Svc_Handler.h \ - $(WRAPPER_ROOT)/ace/Synch_Options.h \ - $(WRAPPER_ROOT)/ace/Task.h \ - $(WRAPPER_ROOT)/ace/Task_T.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ $(WRAPPER_ROOT)/ace/Message_Queue.h \ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ $(WRAPPER_ROOT)/ace/Strategies.h \ - $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_Channel.h Channel.h \ - $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ - $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Routing_Table.h Routing_Entry.h Peer_Message.h -.obj/Routing_Entry.o .shobj/Routing_Entry.so: Routing_Entry.cpp Routing_Entry.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + Gateway.h +.obj/Consumer_Entry.o .shobj/Consumer_Entry.so: Consumer_Entry.cpp Consumer_Entry.h \ $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -300,7 +359,7 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i -.obj/Routing_Table.o .shobj/Routing_Table.so: Routing_Table.cpp Routing_Table.h \ +.obj/Consumer_Map.o .shobj/Consumer_Map.so: Consumer_Map.cpp Consumer_Map.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -312,8 +371,10 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Record.h \ $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ - $(WRAPPER_ROOT)/ace/ACE.i -.obj/Thr_Channel.o .shobj/Thr_Channel.so: Thr_Channel.cpp Thr_Channel.h Channel.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + Concurrency_Strategies.h Event.h Consumer_Entry.h \ + $(WRAPPER_ROOT)/ace/Set.h +.obj/Thr_IO_Handler.o .shobj/Thr_IO_Handler.so: Thr_IO_Handler.cpp Thr_IO_Handler.h IO_Handler.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -337,20 +398,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -364,6 +417,17 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ @@ -371,13 +435,11 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - Routing_Table.h \ + Consumer_Map.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Routing_Entry.h Peer_Message.h Channel_Connector.h \ + Concurrency_Strategies.h Event.h Consumer_Entry.h \ + IO_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i # IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/apps/Gateway/Gateway/README b/apps/Gateway/Gateway/README index ceb17528d0d..4e986354aaa 100644 --- a/apps/Gateway/Gateway/README +++ b/apps/Gateway/Gateway/README @@ -1,22 +1,23 @@ -This application illustrates an application-level Gateway which -routes messages between a set of Peers in a distributed environment. +This application illustrates an application-level Gateway which routes +messages between Consumer and Suppliers in a distributed environment. -The default configuration is single-threaded, i.e., all Input_Channels -and Output_Channels are multiplexed via the Reactor on a single thread -of control. To obtain a version that multi-threads both input and -output simply set the following flag in the Makefile: +The default configuration is single-threaded, i.e., all +Supplier_Handlers and Consumer_Handlers are multiplexed via the ACE +Reactor within a single thread of control. To obtain a version that +multi-threads both Consumer_Handlers and Supplier_Handlers simply set +the following flag in the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT -To get a version that uses single-threading for all Input_Channels, -but a separate thread per-Output_Channel set the following flag in the -Makefile: +To get a version that uses single-threading for all Supplier_Handlers, +but a separate thread per-Consumer_Handler set the following flag in +the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT If you examine the source code, you'll see that very few changes are required in the source code to switch between single-threading and multi-threading. The ACE Task class is primarily responsible for -enabling the flexible modification of concurrency strategies with -little modification to the source code, design, and system +enabling the flexible modification of concurrency strategies with only +minor changes required to the source code, design, and system architecture. diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.cpp b/apps/Gateway/Gateway/Thr_IO_Handler.cpp new file mode 100644 index 00000000000..109cfad9c3f --- /dev/null +++ b/apps/Gateway/Gateway/Thr_IO_Handler.cpp @@ -0,0 +1,204 @@ +#include "Thr_IO_Handler.h" +// $Id$ + +#include "IO_Handler_Connector.h" + +#if defined (ACE_HAS_THREADS) +Thr_Consumer_Handler::Thr_Consumer_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Consumer_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ +} + +// This method should be called only when the peer shuts down +// unexpectedly. This method marks the IO_Handler as having failed and +// deactivates the ACE_Message_Queue (to wake up the thread blocked on +// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will +// eventually try to reconnect... + +int +Thr_Consumer_Handler::handle_input (ACE_HANDLE h) +{ + this->Consumer_Handler::handle_input (h); + ACE_Service_Config::reactor ()->remove_handler (h, + ACE_Event_Handler::RWE_MASK + | ACE_Event_Handler::DONT_CALL); + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + return 0; +} + +// Initialize the threaded Consumer_Handler object and spawn a new +// thread. + +int +Thr_Consumer_Handler::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Register ourselves to receive input events (which indicate that + // the Peer has shut down unexpectedly). + if (ACE_Service_Config::reactor ()->register_handler (this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// ACE_Queue up a message for transmission (must not block since all +// Supplier_Handlers are single-threaded). + +int +Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + // Perform non-blocking enqueue. + return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Transmit messages to the peer (note simplification resulting from +// threads...) + +int +Thr_Consumer_Handler::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread it is OK to block on + // output. + + for (ACE_Message_Block *mb = 0; + this->msg_queue ()->dequeue_head (mb) != -1; ) + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Handler %d on handle %d\n", + this->id (), this->get_handle ())); + + this->peer ().close (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + ACE_OS::sleep (tv); + } + } + + return 0; +} + +Thr_Supplier_Handler::Thr_Supplier_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Supplier_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ +} + +int +Thr_Supplier_Handler::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// Receive messages from a Peer in a separate thread (note reuse of +// existing code!). + +int +Thr_Supplier_Handler::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread and processes + // messages for one connection it is OK to block on input and + // output. + + while (this->handle_input () != -1) + continue; + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Supplier_Handler %d on handle %d\n", + this->id (), + this->get_handle ())); + + this->peer ().close (); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", tv.sec ())); + ACE_OS::sleep (tv); + } + } + return 0; +} + +#endif /* ACE_HAS_THREADS */ diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.h b/apps/Gateway/Gateway/Thr_IO_Handler.h new file mode 100644 index 00000000000..ee056b35361 --- /dev/null +++ b/apps/Gateway/Gateway/Thr_IO_Handler.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Thr_IO_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_THR_IO_HANDLER) +#define _THR_IO_HANDLER + +#include "IO_Handler.h" + +#if defined (ACE_HAS_THREADS) +class Thr_Consumer_Handler : public Consumer_Handler + // = TITLE + // Runs each Output IO_Handler in a separate thread. +{ +public: + Thr_Consumer_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the threaded Consumer_Handler object and spawn a new + // thread. + + virtual int handle_input (ACE_HANDLE); + // Called when Peer shutdown unexpectedly. + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send a message to a peer. + + virtual int svc (void); + // Transmit peer messages. +}; + +class Thr_Supplier_Handler : public Supplier_Handler + // = TITLE + // Runs each Input IO_Handler in a separate thread. +{ +public: + Thr_Supplier_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the object and spawn a new thread. + + virtual int svc (void); + // Transmit peer messages. +}; +#endif /* ACE_HAS_THREADS */ +#endif /* _THR_IO_HANDLER */ diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config new file mode 100644 index 00000000000..d33469ee157 --- /dev/null +++ b/apps/Gateway/Gateway/consumer_config @@ -0,0 +1,8 @@ +# Consumer map configuration file +# Conn ID Logical ID Payload Destinations +# ------- ---------- ------- ------------ +# 1 1 0 3,4,5 + 1 1 0 3 + 3 1 0 3 +# 4 1 0 4 +# 5 1 0 5 diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf index 7ae8d4b0080..6c41415a9ce 100644 --- a/apps/Gateway/Gateway/svc.conf +++ b/apps/Gateway/Gateway/svc.conf @@ -1,3 +1,3 @@ #static Svc_Manager "-d -p 2913" -dynamic Gateway Service_Object * ./libGateway:_make_ACE_Gateway() active "-d -c cc_config -f rt_config" +dynamic Gateway Service_Object * ./libGateway:_make_ACE_Gateway() active "-d -c connection_config -f consumer_config" diff --git a/apps/Gateway/Peer/Gateway_Handler.cpp b/apps/Gateway/Peer/Gateway_Handler.cpp index 15ca0a58807..cfc9a7dad6f 100644 --- a/apps/Gateway/Peer/Gateway_Handler.cpp +++ b/apps/Gateway/Peer/Gateway_Handler.cpp @@ -84,10 +84,10 @@ Gateway_Handler::xmit_stdin (void) ACE_Message_Block *mb; ACE_NEW_RETURN (mb, - ACE_Message_Block (sizeof (Peer_Message)), + ACE_Message_Block (sizeof (Event)), -1); - Peer_Message *peer_msg = (Peer_Message *) mb->rd_ptr (); + Event *peer_msg = (Event *) mb->rd_ptr (); peer_msg->header_.routing_id_ = this->routing_id_; n = ACE_OS::read (ACE_STDIN, peer_msg->buf_, sizeof peer_msg->buf_); @@ -270,7 +270,7 @@ Gateway_Handler::send_peer (ACE_Message_Block *mb) int Gateway_Handler::recv_peer (ACE_Message_Block *&mb) { - Peer_Message *peer_msg; + Event *peer_msg; size_t len; ssize_t n; size_t offset = 0; @@ -278,14 +278,14 @@ Gateway_Handler::recv_peer (ACE_Message_Block *&mb) if (this->msg_frag_ == 0) { ACE_NEW_RETURN (this->msg_frag_, - ACE_Message_Block (sizeof (Peer_Message)), + ACE_Message_Block (sizeof (Event)), -1); // No existing fragment... if (this->msg_frag_ == 0) ACE_ERROR_RETURN ((LM_ERROR, "out of memory\n"), -1); - peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr (); + peer_msg = (Event *) this->msg_frag_->rd_ptr (); switch (n = this->peer ().recv (peer_msg, sizeof (Peer_Header))) { @@ -441,7 +441,7 @@ Gateway_Handler::await_messages (void) // We got a valid message, so let's process it now! At the // moment, we just print out the message contents... - Peer_Message *peer_msg = (Peer_Message *) mb->rd_ptr (); + Event *peer_msg = (Event *) mb->rd_ptr (); this->total_bytes_ += mb->length (); #if defined (VERBOSE) diff --git a/apps/Gateway/Peer/Gateway_Handler.h b/apps/Gateway/Peer/Gateway_Handler.h index 6dc4539e6b7..82477264c4f 100644 --- a/apps/Gateway/Peer/Gateway_Handler.h +++ b/apps/Gateway/Peer/Gateway_Handler.h @@ -32,7 +32,7 @@ #include "ace/SOCK_Acceptor.h" #include "ace/INET_Addr.h" #include "ace/Map_Manager.h" -#include "Peer_Message.h" +#include "Event.h" // Forward declaration. class Gateway_Handler; diff --git a/apps/Gateway/Peer/Makefile b/apps/Gateway/Peer/Makefile index f88aaf14926..9909eb2ef2a 100644 --- a/apps/Gateway/Peer/Makefile +++ b/apps/Gateway/Peer/Makefile @@ -14,9 +14,9 @@ FILES = Gateway_Handler LSRC = $(addsuffix .cpp,$(FILES)) LOBJ = $(addsuffix .o,$(FILES)) -VSHOBJS = $(LSRC:%.cpp=$(VSHDIR)%.so) +SHOBJ = $(addsuffix .so,$(FILES)) -LDLIBS = $(VSHOBJS) +LDLIBS = $(addprefix .shobj/,$(SHOBJ)) VLDLIBS = $(LDLIBS:%=%$(VAR)) @@ -111,6 +111,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Acceptor.i \ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Peer_Message.h + Event.h # IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/apps/Gateway/README b/apps/Gateway/README index 892fe43ba6d..ffd7e52bdf4 100644 --- a/apps/Gateway/README +++ b/apps/Gateway/README @@ -2,8 +2,12 @@ OVERVIEW This directory contains source code for a prototype application-level gateway implemented with ACE. This prototype was developed in my -cs422 grad OS class at Washington University. You can get a paper -that explains the patterns used in this implementation at URL +cs422 OS class at Washington University. It illustrates the use of +Event Channels to forward events from Suppliers to Consumers in a +distributed system. + +You can get a paper that explains the patterns used in this +implementation at the following WWW URL: http://www.cs.wustl.edu/~schmidt/TAPOS-95.ps.gz @@ -15,28 +19,33 @@ Gateway -- The application Gateway, which must be started *after* all the Peers described below). This process reads the - cc_config and rt_config files. The cc_config file tells - the Gateway what connections to establish with which hosts - on which ports, etc. The rt_config file tells the Gateway - how to route data coming from "sources" to the appropriate - "destinations." - + connection_config and consumer_config files: + + 1. The connection_config file is used to establish the "physical + configuration." It tells the Gateway what connections + to establish with particular hosts using particular + ports. + + 2. The consumer_config file is used to establish the "logical + configuration." It tells the Gateway how to forward + data coming from "sources" to the appropriate + "destinations." Peer -- The test driver programs that must be started *before* the - Gateway. To do anything interesting you'll need at - least two Peers: one for supplying events and one for consuming - them. In the configuration files, these two types of Peers - are designated as follows: + Gateway. To do anything interesting you'll need at least + two Peers: one to supply events and one to consume events. + In the configuration files, these two types of Peers are + designated as follows: - (1) Input Peers (designated by an "I" in the Gateway's - cc_config configuration file). These Peers are "sources" - of messages to the Gateway. + 1. Supplier Peers (designated by an 'S' in the Gateway's + connection_config configuration file). These Peers are + "suppliers" of events to the Gateway. - (2) Output Peers (designated by an "O" in the Gateway's - cc_config file). These Peers are "destinations" of - messages routed by the Gateway (routing is based on - the settings in the rt_config configuration file). + 2. Consumer Peers (designated by an 'C' in the Gateway's + connection_config file). These Peers are "consumers" of + events forwarded by the Gateway (forwarding is based on + the settings in the consumer_config configuration file). RUNNING THE TESTS @@ -45,39 +54,39 @@ To run the tests do the following: 1. Compile everything (i.e., first compile the ACE libraries, then compile the the Gateway directories). -2. Edit the rt_config and cc_config files as discussed above. +2. Edit the consumer_config and connection_config files as discussed + above to indicate the desired physical and logical mappings. 3. Start up the Peers (peerd). You can start up as many as you - like, as per the cc_config file, but you'll need at least - two (one for supplying input and one for consuming output). I - typically start up each peer in a different window on a different - machine. The peers should print out some diagnostic info and then - block awaiting connections from the Gateway. - -4. Start up the Gateway (gatewayd). This will print out - a bunch of messages as it reads the config files and connects - to all the Peers. Assuming everything works, then all the - Peers will be connected. If some of the Peers aren't set up - correctly then the Gateway will use an exponential backoff - algorithm to attempt to reestablish those connections. + like, as per the connection_config file, but you'll need at least + two (one to supply and one to consume). I typically start up each + Peer in a different window on a different machine. The Peers + should print out some diagnostic info and then block awaiting + connections from the Gateway. + +4. Start up the Gateway (gatewayd). This will print out a bunch of + events as it reads the config files and connects to all the Peers. + Assuming everything works, then all the Peers will be connected. + If some of the Peers aren't set up correctly then the Gateway will + use an exponential backoff algorithm to attempt to reestablish + those connections. 5. Once the Gateway has connected with all the Peers you can send - messages from Input Peers by typing commands in the Peer window. - This input will be sent to the Gateway, which will forward - the message to all Output Peers that have "subscribed" to receive - these messages. + events from Supplier Peers by typing commands in the Peer window. + This Supplier will be sent to the Gateway, which will forward the + event to all Consumer Peers that have "subscribed" to receive these + events. Note that if you type ^C in a Peer window the Peer will shutdown - its handlers and exit. The Gateway will detect this and will - start trying to reestablish the connection using the same - exponential backoff algorithm it used for the initial connection - establishment. + its handlers and exit. The Gateway will detect this and will start + trying to reestablish the connection using the same exponential + backoff algorithm it used for the initial connection establishment. -7. When you want to terminate a Gateway, just type ^C - and the process will shut down gracefully. +7. When you want to terminate a Gateway, just type ^C and the process + will shut down gracefully. Please let me know if there are any questions. Doug -schmidt@cs.wustl.edu + schmidt@cs.wustl.edu |