diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-01 08:00:34 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-01 08:00:34 +0000 |
commit | b39c5cbab2373421b94c5361eb1524b38fa1eab0 (patch) | |
tree | 91b695852b885a5f44f9be8c3a22bbf7f5a96b8d /apps/Gateway/Gateway | |
parent | 01ab418ddb8fd40e47d3fe9d676076f287f5b5bc (diff) | |
download | ATCD-b39c5cbab2373421b94c5361eb1524b38fa1eab0.tar.gz |
foo
Diffstat (limited to 'apps/Gateway/Gateway')
-rw-r--r-- | apps/Gateway/Gateway/Concurrency_Strategies.h | 12 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Config_Files.cpp | 36 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Config_Files.h | 34 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Consumer_Dispatch_Set.h | 28 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event.h | 38 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 498 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.h | 113 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp | 40 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Forwarding_Discriminator.h | 28 | ||||
-rw-r--r-- | apps/Gateway/Gateway/File_Parser.cpp | 22 | ||||
-rw-r--r-- | apps/Gateway/Gateway/File_Parser.h | 2 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Gateway.cpp | 231 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Makefile | 12 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Proxy_Handler.cpp | 345 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Proxy_Handler.h | 91 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Proxy_Handler_Connector.cpp | 41 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Thr_Proxy_Handler.cpp | 157 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Thr_Proxy_Handler.h | 24 | ||||
-rw-r--r-- | apps/Gateway/Gateway/gatewayd.cpp | 7 |
19 files changed, 949 insertions, 810 deletions
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h index 8d1b2979a49..28e59a4b2e6 100644 --- a/apps/Gateway/Gateway/Concurrency_Strategies.h +++ b/apps/Gateway/Gateway/Concurrency_Strategies.h @@ -55,20 +55,20 @@ class Supplier_Proxy; #if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) #if defined (USE_OUTPUT_MT) -typedef Thr_Consumer_Proxy CONSUMER_HANDLER; +typedef Thr_Consumer_Proxy CONSUMER_PROXY; #else -typedef Consumer_Proxy CONSUMER_HANDLER; +typedef Consumer_Proxy CONSUMER_PROXY; #endif /* USE_OUTPUT_MT */ #if defined (USE_INPUT_MT) -typedef Thr_Supplier_Proxy SUPPLIER_HANDLER; +typedef Thr_Supplier_Proxy SUPPLIER_PROXY; #else -typedef Supplier_Proxy SUPPLIER_HANDLER; +typedef Supplier_Proxy SUPPLIER_PROXY; #endif /* USE_INPUT_MT */ #else // Instantiate a non-multi-threaded Gateway. -typedef Supplier_Proxy SUPPLIER_HANDLER; -typedef Consumer_Proxy CONSUMER_HANDLER; +typedef Supplier_Proxy SUPPLIER_PROXY; +typedef Consumer_Proxy CONSUMER_PROXY; #endif /* ACE_HAS_THREADS */ #endif /* _CONCURRENCY_STRATEGIES */ diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index 7e99902b0db..5b95dc4fbf0 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -27,7 +27,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, line_number++; } - // Get the logic id. + // Get the logical id. if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS) return read_result; @@ -35,12 +35,12 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, if ((read_result = this->getint (entry.type_)) != FP::SUCCESS) return read_result; - // get all the destinations. - entry.total_destinations_ = 0; + // get all the consumers. + entry.total_consumers_ = 0; - while ((read_result = this->getint (entry.destinations_[entry.total_destinations_])) + while ((read_result = this->getint (entry.consumers_[entry.total_consumers_])) == FP::SUCCESS) - ++entry.total_destinations_; // do nothing + ++entry.total_consumers_; // do nothing (should check against max...) if (read_result == FP::EOLINE || read_result == FP::EOFILE) return FP::SUCCESS; @@ -63,8 +63,8 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, { if (read_result == FP::EOFILE) return FP::EOFILE; - else if (read_result == FP::EOLINE || - read_result == FP::COMMENT) + else if (read_result == FP::EOLINE + || read_result == FP::COMMENT) line_number++; } @@ -72,19 +72,19 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, if ((read_result = this->getword (entry.host_)) != FP::SUCCESS) return read_result; - int port; + ACE_INT32 port; // Get the port number. if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.remote_poconsumer_ = (u_short) port; + entry.remote_port_ = (u_short) port; - // Get the direction. + // Get the proxy role. if ((read_result = this->getword (buf)) != FP::SUCCESS) return read_result; else - entry.direction_ = buf[0]; + entry.proxy_role_ = buf[0]; // Get the max retry delay. if ((read_result = this->getint (entry.max_retry_delay_)) != FP::SUCCESS) @@ -94,7 +94,7 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.local_poconsumer_ = (u_short) port; + entry.local_port_ = (u_short) port; return FP::SUCCESS; } @@ -108,7 +108,7 @@ int main (int argc, char *argv[]) exit (1); } FP_RETURN_TYPE result; - Connection_Config_File_Entry CCentry; + Connection_Config_File_Entry entry; Connection_Config_File_Parser CCfile; CCfile.open (argv[1]); @@ -118,15 +118,15 @@ int main (int argc, char *argv[]) printf ("ConnID\tHost\t\tRPort\tDir\tRetry\tLPort\n"); // Read config file line at a time. - while ((result = CCfile.read_entry (CCentry, line_number)) != EOF) + while ((result = CCfile.read_entry (entry, line_number)) != EOF) { if (result != FP::SUCCESS) // ACE_DEBUG ((LM_DEBUG, "Error line %d.\n", line_number)); cerr << "Error at line " << line_number << endl; else printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n", - CCentry.conn_id_, CCentry.host_, CCentry.remote_poconsumer_, CCentry.direction_, - CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_poconsumer_); + entry.conn_id_, entry.host_, entry.remote_port_, entry.proxy_role_, + entry.max_retry_delay_, entry.transform_, entry.local_port_); } CCfile.close(); @@ -148,8 +148,8 @@ int main (int argc, char *argv[]) { printf ("%d\t%d\t%d\t%d\t", entry.conn_id_, entry.supplier_id_, entry.type_); - while (--entry.total_destinations_ >= 0) - printf ("%d,", entry.destinations_[entry.total_destinations_]); + while (--entry.total_consumers_ >= 0) + printf ("%d,", entry.consumers_[entry.total_consumers_]); printf ("\n"); } } diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 2620301e25b..eae0248eb8c 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -22,25 +22,25 @@ class Connection_Config_File_Entry // = TITLE - // Stores the Proxy_Handler entry for connection configuration. + // Stores connection configuration information. { public: - int conn_id_; + ACE_INT32 conn_id_; // Connection id for this Proxy_Handler. char host_[BUFSIZ]; // Host to connect with. - u_short remote_poconsumer_; + u_short remote_port_; // Port to connect with. - char direction_; + char proxy_role_; // 'S' (supplier) or 'C' (consumer). - int max_retry_delay_; + ACE_INT32 max_retry_delay_; // Maximum amount of time to wait for reconnecting. - u_short local_poconsumer_; + u_short local_port_; // Our local port number. }; @@ -59,23 +59,23 @@ class Consumer_Config_File_Entry { public: enum { - MAX_DESTINATIONS = 1000 // Total number of multicast destinations. + MAX_CONSUMERS = 1000 // Total number of multicast consumers. }; - int conn_id_; - // Connection id for this channel. + ACE_INT32 conn_id_; + // Connection id for this proxy. - int supplier_id_; - // Logical routing id for this channel. + ACE_INT32 supplier_id_; + // Logical supplier id for this proxy. - int type_; - // Type of payload in the message. + ACE_INT32 type_; + // Message type. - int destinations_[MAX_DESTINATIONS]; - // Connection ids for destinations that we're routing to. + ACE_INT32 consumers_[MAX_CONSUMERS]; + // Connection ids for consumers that we're routing to. - int total_destinations_; - // Total number of these destinations. + int total_consumers_; + // Total number of these consumers. }; class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry> diff --git a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h new file mode 100644 index 00000000000..71e2046b56e --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h @@ -0,0 +1,28 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Consumer_Dispatch_Set.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_DISPATCH_SET) +#define _DISPATCH_SET + +#include "ace/Set.h" + +// Forward reference. +class Proxy_Handler; + +typedef ACE_Unbounded_Set<Proxy_Handler *> Consumer_Dispatch_Set; +typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Consumer_Dispatch_Set_Iterator; + +#endif /* _DISPATCH_SET */ diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h index 24881c3e85b..5e288edf910 100644 --- a/apps/Gateway/Gateway/Event.h +++ b/apps/Gateway/Gateway/Event.h @@ -23,7 +23,7 @@ // Proxy_Handler in the Gateway. typedef ACE_INT32 ACE_INT32; -class Event_Addr +class Event_Key // = TITLE // Address used to identify the source/destination of an event. // @@ -33,14 +33,14 @@ class Event_Addr // Channel from the format of the data. { public: - Event_Addr (ACE_INT32 cid = -1, + Event_Key (ACE_INT32 cid = -1, u_char sid = 0, u_char type = 0) : conn_id_ (cid), supplier_id_ (sid), type_ (type) {} - int operator== (const Event_Addr &event_addr) const + int operator== (const Event_Key &event_addr) const { return this->conn_id_ == event_addr.conn_id_ && this->supplier_id_ == event_addr.supplier_id_ @@ -58,10 +58,13 @@ public: // Event type. }; - class Event_Header // = TITLE - // Fixed sized header. + // Fixed sized header. + // + // = DESCRIPTION + // This is designed to have a sizeof (16) to avoid alignment + // problems on most platforms. { public: typedef ACE_INT32 SUPPLIER_ID; @@ -72,14 +75,35 @@ public: INVALID_ID = -1 // No peer can validly use this number. }; + void decode (void) + { + this->len_ = ntohl (this->len_); + this->supplier_id_ = ntohl (this->supplier_id_); + this->type_ = ntohl (this->type_); + this->priority_ = ntohl (this->priority_); + } + // Decode from network byte order to host byte order. + + void encode (void) + { + this->len_ = htonl (this->len_); + this->supplier_id_ = htonl (this->supplier_id_); + this->type_ = htonl (this->type_); + this->priority_ = htonl (this->priority_); + } + // Encode from host byte order to network byte order. + + size_t len_; + // Length of the data_ payload, in bytes. + SUPPLIER_ID supplier_id_; // Source ID. ACE_INT32 type_; // Event type. - size_t len_; - // Length of the entire event (including data payload) in bytes. + ACE_INT32 priority_; + // Event priority. }; class Event diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index d146ddfb362..02f2cd465f8 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -2,38 +2,35 @@ // $Id$ #define ACE_BUILD_SVC_DLL -#include "ace/Get_Opt.h" -#include "Config_Files.h" #include "Proxy_Handler_Connector.h" #include "Event_Channel.h" -#if !defined (ACE_EVENT_CHANNEL_C) -#define ACE_EVENT_CHANNEL_C +ACE_Event_Channel_Options::ACE_Event_Channel_Options (void) + : performance_window_ (0), + blocking_semantics_ (ACE_NONBLOCK), + socket_queue_size_ (0) +{ +} -template <class SH, class CH> -ACE_Event_Channel<SH, CH>::~ACE_Event_Channel (void) +ACE_Event_Channel::~ACE_Event_Channel (void) { } -template <class SH, class CH> -ACE_Event_Channel<SH, CH>::ACE_Event_Channel (void) - : connection_config_file_ ("connection_config"), - consumer_config_file_ ("consumer_config"), - active_connector_role_ (1), - performance_window_ (0), - blocking_semantics_ (ACE_NONBLOCK), - debug_ (0), - connector_ (0), - socket_queue_size_ (0) +ACE_Event_Channel::ACE_Event_Channel (void) +{ +} + +ACE_Event_Channel_Options & +ACE_Event_Channel::options (void) { + return this->options_; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, - const void *) +ACE_Event_Channel::handle_timeout (const ACE_Time_Value &, + const void *) { ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // If we've got a ACE_Thread Manager then use it to suspend all the // threads. This will enable us to get an accurate count. @@ -47,17 +44,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, size_t total_bytes_in = 0; size_t total_bytes_out = 0; - // Iterate through the consumer map connecting all the Proxy_Handlers. + // Iterate through the connection map summing up the number of bytes + // sent/received. for (CONNECTION_MAP_ENTRY *me = 0; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { Proxy_Handler *proxy_handler = me->int_id_; - if (proxy_handler->direction () == 'C') + if (proxy_handler->proxy_role () == 'C') total_bytes_out += proxy_handler->total_bytes (); - else // proxy_handler->direction () == 'S' + else // proxy_handler->proxy_role () == 'S' total_bytes_in += proxy_handler->total_bytes (); } @@ -74,13 +72,13 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))); #else ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - this->performance_window_, - total_bytes_in, + this->options ().performance_window_, + total_bytes_in, total_bytes_out)); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)))); + (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)))); + (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); #endif /* ACE_NLOGGING */ #if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT) @@ -95,31 +93,177 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, return 0; } +ACE_Event_Channel::put (ACE_Message_Block *forward_addr, + ACE_Time_Value *) +{ + // We got a valid event, so determine its virtual forwarding + // address, which is stored in the first of the two event blocks, + // which are chained together by this->recv(). + + Event_Key *forwarding_addr = (Event_Key *) forward_addr->rd_ptr (); + + // Skip over the address portion and get the data. + ACE_Message_Block *data = forward_addr->cont (); + + // <dispatch_set> points to the set of Consumers associated with + // this forwarding address. + Consumer_Dispatch_Set *dispatch_set = 0; + + if (this->efd_.find (*forwarding_addr, dispatch_set) == -1) + // Failure. + ACE_ERROR ((LM_DEBUG, + "(%t) find failed on conn id = %d, logical id = %d, type = %d\n", + forwarding_addr->conn_id_, + forwarding_addr->supplier_id_, + forwarding_addr->type_)); + else + { + // Check to see if there are any consumers. + if (dispatch_set->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active consumers for this event currently\n")); + + else // There are consumers, so forward the event. + { + Consumer_Dispatch_Set_Iterator dsi (*dispatch_set); + + // At this point, we should assign a thread-safe locking + // strategy to the Message_Block is we're running in a + // multi-threaded configuration. + // data->locking_strategy (MB_Locking_Strategy::instance ()); + + for (Proxy_Handler **proxy_handler = 0; + dsi.next (proxy_handler) != 0; + dsi.advance ()) + { + // Only process active proxy_handlers. + if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) + { + // Duplicate the event portion via reference + // counting. + ACE_Message_Block *dup_msg = data->duplicate (); + + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", + (*proxy_handler)->id ())); + + if ((*proxy_handler)->put (dup_msg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", + "put", (*proxy_handler)->id ())); + + // We are responsible for releasing an + // ACE_Message_Block if failures occur. + dup_msg->release (); + } + } + } + } + } + + // Release the memory in the message block. + forward_addr->release (); + return 0; +} + +ACE_Event_Channel::svc (void) +{ + return 0; +} + +int +ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler, + ACE_Synch_Options &synch_options) +{ + return this->connector_.initiate_connection (proxy_handler, + synch_options); +} + +int +ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler) +{ + int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; + int socket_queue_size = this->options ().socket_queue_size_; + + if (proxy_handler->peer ().set_option (SOL_SOCKET, + option, + &socket_queue_size, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + + proxy_handler->thr_mgr (ACE_Service_Config::thr_mgr ()); + + // Our state is now "established." + proxy_handler->state (Proxy_Handler::ESTABLISHED); + + // Restart the timeout to 1. + proxy_handler->timeout (1); + + ACE_INT32 id = htonl (proxy_handler->id ()); + + // Send the connection id to the peerd. + + ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + n == 0 ? "peer has closed down unexpectedly" : "send"), + -1); +} + +// Restart connection (blocking_semantics dicates whether we restart +// synchronously or asynchronously). + +int +ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler) +{ + // Skip over deactivated descriptors. + if (proxy_handler->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + proxy_handler->peer ().close (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of Proxy_Handler %d\n", + proxy_handler->id ())); + + // Reschedule ourselves to try and connect again. + if (ACE_Service_Config::reactor ()->schedule_timer + (proxy_handler, 0, proxy_handler->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + return 0; +} + // Initiate connections with the Consumer and Supplier Peers. -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::initiate_connections (void) +ACE_Event_Channel::initiate_connections (void) { - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); ACE_Synch_Options synch_options; - if (this->blocking_semantics_ == ACE_NONBLOCK) + if (this->options ().blocking_semantics_ == ACE_NONBLOCK) synch_options = ACE_Synch_Options::asynch; else synch_options = ACE_Synch_Options::synch; - // Iterate through the Consumer Map connecting all the Proxy_Handlers. + // Iterate through the Consumer Map connecting all the + // Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { Proxy_Handler *proxy_handler = me->int_id_; - if (this->connector_->initiate_connection + if (this->initiate_proxy_connection (proxy_handler, synch_options) == -1) - continue; + continue; // Failures are handled elsewhere... } return 0; @@ -128,8 +272,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) // This method gracefully shuts down all the Handlers in the // Proxy_Handler Connection Map. -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::close (void) +ACE_Event_Channel::close (u_long) { #if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT) ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); @@ -147,7 +290,7 @@ ACE_Event_Channel<SH, CH>::close (void) { Proxy_Handler *proxy_handler = me->int_id_; - ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", + ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n", proxy_handler->id ())); if (proxy_handler->state () != Proxy_Handler::IDLE) @@ -159,247 +302,76 @@ ACE_Event_Channel<SH, CH>::close (void) proxy_handler->destroy (); // Will trigger a delete. } - // Free up the resources allocated dynamically by the ACE_Connector. - delete this->connector_; return 0; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::open (int argc, char *argv[]) +int +ACE_Event_Channel::find_proxy (ACE_INT32 conn_id, + Proxy_Handler *&proxy_handler) { - this->parse_args (argc, argv); - - ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1); - - // Ignore SIPPIPE so each Consumer_Proxy can handle it. - ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - - if (this->active_connector_role_) - { - // Parse the connection configuration file. - this->parse_connection_config_file (); - - // Parse the consumer map config file and build the consumer map. - this->parse_consumer_config_file (); - - // Initiate connections with the peers. - this->initiate_connections (); - } - - // If this->performance_window_ > 0 start a timer. - - if (this->performance_window_ > 0) - { - if (ACE_Service_Config::reactor ()->schedule_timer - (this, 0, this->performance_window_) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); - else - ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", - this->performance_window_)); - } - - return 0; + return this->connection_map_.find (conn_id, proxy_handler); } -// Parse and build the connection table. - -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) +int +ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler) { - // File that contains the consumer map configuration information. - Connection_Config_File_Parser connection_file; - Connection_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (connection_file.open (this->connection_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1); - - // Read config file one line at a time. - while (connection_file.read_entry (entry, line_number) != FP::EOFILE) + switch (this->connection_map_.bind (proxy_handler->id (), proxy_handler)) { - file_empty = 0; - - if (this->debug_) - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, " - "direction = %c, max retry timeout = %d, local port = %d\n", - entry.conn_id_, - entry.host_, - entry.remote_poconsumer_, - entry.direction_, - entry.max_retry_delay_, - entry.local_poconsumer_)); - - Proxy_Handler *proxy_handler = 0; - - // The next few lines of code are dependent on whether we are - // making an Supplier_Proxy or an Consumer_Proxy. - - if (entry.direction_ == 'C') // Configure a Consumer_Proxy. - ACE_NEW_RETURN (proxy_handler, - CONSUMER_HANDLER (&this->efd_, - this->connector_, - ACE_Service_Config::thr_mgr (), - this->socket_queue_size_), - -1); - else /* direction == 'S' */ // Configure a Supplier_Proxy. - ACE_NEW_RETURN (proxy_handler, - SUPPLIER_HANDLER (&this->efd_, - this->connector_, - ACE_Service_Config::thr_mgr (), - this->socket_queue_size_), - -1); - - // The following code is common to both Supplier_Proxys_ and - // Consumer_Proxys. - - // Initialize the routing entry's peer addressing info. - proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), - ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_); - - // Initialize max timeout. - proxy_handler->max_timeout (entry.max_retry_delay_); - - // Try to bind the new Proxy_Handler to the connection ID. - switch (this->connection_map_.bind (entry.conn_id_, proxy_handler)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, - "(%t) duplicate connection %d, already bound\n", - entry.conn_id_)); - break; - case 0: - // Success. - break; - } + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) bind failed for connection %d\n", + proxy_handler->id ()), -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) duplicate connection %d, already bound\n", + proxy_handler->id ()), -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: connection proxy_handler configuration file was empty\n")); - return 0; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void) +int +ACE_Event_Channel::subscribe (const Event_Key &event_addr, + Consumer_Dispatch_Set *cds) { - // File that contains the consumer map configuration information. - Consumer_Config_File_Parser consumer_file; - Consumer_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (consumer_file.open (this->consumer_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1); - - // Read config file line at a time. - while (consumer_file.read_entry (entry, line_number) != FP::EOFILE) + // Bind with consumer map, keyed by peer address. + switch (this->efd_.bind (event_addr, cds)) { - file_empty = 0; - - if (this->debug_) - { - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " - "number of destinations = %d\n", - entry.conn_id_, - entry.supplier_id_, - entry.type_, - entry.total_destinations_)); - for (int i = 0; i < entry.total_destinations_; i++) - ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - Dispatch_Set *dispatch_set; - ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1); - - Event_Addr event_addr (entry.conn_id_, - entry.supplier_id_, - entry.type_); - - // Add the destinations to the Routing Entry. - for (int i = 0; i < entry.total_destinations_; i++) - { - Proxy_Handler *proxy_handler = 0; - - // Lookup destination and add to Dispatch_Set set if found. - if (this->connection_map_.find (entry.destinations_[i], - proxy_handler) != -1) - dispatch_set->insert (proxy_handler); - else - ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - // Bind with consumer map, keyed by peer address. - switch (this->efd_.bind (event_addr, dispatch_set)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " - "already bound\n", entry.conn_id_)); - break; - case 0: - // Success. - break; - } + case -1: + ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", + event_addr.conn_id_), -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " + "already bound\n", event_addr.conn_id_), -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: consumer map configuration file was empty\n")); - return 0; } -// Parse the "command-line" arguments and set the corresponding flags. - -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_args (int argc, char *argv[]) +ACE_Event_Channel::open (void *) { - ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0); + // Ignore SIPPIPE so each Consumer_Proxy can handle it. + ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - for (int c; (c = get_opt ()) != -1; ) +#if 0 + // If this->performance_window_ > 0 start a timer. + + if (this->options ().performance_window_ > 0) { - switch (c) - { - case 'b': // Use blocking connection establishment. - this->blocking_semantics_ = 0; - break; - case 'c': - this->connection_config_file_ = get_opt.optarg; - break; - case 'd': - this->debug_ = 1; - break; - case 'p': - // We are not playing the active Connector role. - this->active_connector_role_ = 0; - break; - case 'q': - this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg); - break; - case 'r': - this->consumer_config_file_ = get_opt.optarg; - break; - case 'w': // Time performance for a designated amount of time. - this->performance_window_ = ACE_OS::atoi (get_opt.optarg); - // Use blocking connection semantics so that we get accurate - // timings (since all connections start at once). - this->blocking_semantics_ = 0; - break; - default: - break; - } + if (ACE_Service_Config::reactor ()->schedule_timer + (this, 0, this->options ().performance_window_) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); + else + ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", + this->options ().performance_window_))); } +#endif + return 0; } - -#endif /* ACE_EVENT_CHANNEL_C */ diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index 4e7afc5d328..1ecf468addf 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -19,64 +19,95 @@ #include "Proxy_Handler_Connector.h" -template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER> -class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler +class ACE_Svc_Export ACE_Event_Channel_Options + // = TITLE + // Maintains the options for an <ACE_Event_Channel>. +{ +public: + ACE_Event_Channel_Options (void); + // Initialization. + + int performance_window_; + // Number of seconds after connection establishment to report + // throughput. + + int blocking_semantics_; + // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). +}; + +class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<SYNCH_STRATEGY> // = TITLE // Define a generic Event_Channel. + // + // = DESCRIPTION { public: // = Initialization and termination methods. ACE_Event_Channel (void); ~ACE_Event_Channel (void); - int open (int argc, char *argv[]); - // Initialize the Channel. + virtual int open (void * = 0); + // Open the channel. - int close (void); + virtual int close (u_long = 0); // Close down the Channel. -private: - int parse_args (int argc, char *argv[]); - // Parse the command-line arguments. + // = Proxy management methods. + int initiate_proxy_connection (Proxy_Handler *, + ACE_Synch_Options & = ACE_Synch_Options::synch); + // Initiate the connection of the <Proxy_Handler> to its peer. - int parse_connection_config_file (void); - // Parse the connection configuration file. + int complete_proxy_connection (Proxy_Handler *); + // Complete the initialization of the <Proxy_Handler> once it's + // connected to its Peer. - int parse_consumer_config_file (void); - // Parse the consumer map configuration file. + int reinitiate_proxy_connection (Proxy_Handler *); + // Reinitiate a connection asynchronously when the Peer fails. - int initiate_connections (void); - // Initiate connections to the peers. + int bind_proxy (Proxy_Handler *); + // Bind the <Proxy_Handler> to the <connection_map_>. - virtual int handle_timeout (const ACE_Time_Value &, const void *arg); - // Perform timer-based performance profiling. + int find_proxy (ACE_INT32 conn_id, Proxy_Handler *&); + // Locate the <Proxy_Handler> with <conn_id>. - const char *connection_config_file_; - // Name of the connection configuration file. + int subscribe (const Event_Key &event_addr, + Consumer_Dispatch_Set *cds); + // Subscribe the <Consumer_Dispatch_Set> to receive events that + // match <Event_Key>. - const char *consumer_config_file_; - // Name of the consumer map configuration file. + // = Event forwarding method. + virtual int put (ACE_Message_Block *mb, ACE_Time_Value * = 0); + // Pass <mb> to the Event Channel so it can forward it to Consumers. - int active_connector_role_; - // Enabled if we are playing the role of the active Connector. + ACE_Event_Channel_Options &options (void); + // Points to the Event_Channel options. - int performance_window_; - // Number of seconds after connection establishment to report - // throughput. - - int blocking_semantics_; - // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + int initiate_connections (void); + // Initiate connections to the peers. + +private: + virtual int svc (void); + // Run as an active object. - int debug_; - // Are we debugging? + int parse_args (int argc, char *argv[]); + // Parse the command-line arguments. - Proxy_Handler_Connector *connector_; - // This is used to establish the connections actively. + virtual int handle_timeout (const ACE_Time_Value &, + const void *arg); + // Perform timer-based performance profiling. - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). + Proxy_Handler_Connector connector_; + // Used to establish the connections actively. + + // Proxy_Handler_Acceptor acceptor_; + // Used to establish the connections passively. // = Make life easier by defining typedefs. + // Note that Proxy_Handler is assumed to the base class of + // SUPPLIER_PROXY and CONSUMER_PROXY. typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP; typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY; @@ -85,16 +116,10 @@ private: // Table that maps Connection IDs to Proxy_Handler *'s. Event_Forwarding_Discriminator efd_; - // Map that associates event addresses to a set of Consumer_Proxy - // *'s. -}; + // Map that associates an event to a set of Consumer_Proxy *'s. -#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) -#include "Event_Channel.cpp" -#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ - -#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) -#pragma implementation ("Event_Channel.cpp") -#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + ACE_Event_Channel_Options options_; + // The options for the channel. +}; #endif /* ACE_EVENT_CHANNEL */ diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp index 8261ea13eb2..4dfbb658c1f 100644 --- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp @@ -6,51 +6,49 @@ #include "Event_Forwarding_Discriminator.h" -// Bind the Event_Addr to the INT_ID. +// Bind the Event_Key to the INT_ID. int -Event_Forwarding_Discriminator::bind (Event_Addr event_addr, - Dispatch_Set *Dispatch_Set) +Event_Forwarding_Discriminator::bind (Event_Key event_addr, + Consumer_Dispatch_Set *cds) { - return this->map_.bind (event_addr, Dispatch_Set); + return this->map_.bind (event_addr, cds); } -// Find the Dispatch_Set corresponding to the Event_Addr. +// Find the Consumer_Dispatch_Set corresponding to the Event_Key. int -Event_Forwarding_Discriminator::find (Event_Addr event_addr, - Dispatch_Set *&Dispatch_Set) +Event_Forwarding_Discriminator::find (Event_Key event_addr, + Consumer_Dispatch_Set *&cds) { - return this->map_.find (event_addr, Dispatch_Set); + return this->map_.find (event_addr, cds); } -// Unbind (remove) the Event_Addr from the map. +// Unbind (remove) the Event_Key from the map. int -Event_Forwarding_Discriminator::unbind (Event_Addr event_addr) +Event_Forwarding_Discriminator::unbind (Event_Key event_addr) { return this->map_.unbind (event_addr); } -Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &rt) - : map_iter_ (rt.map_) +Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator + (Event_Forwarding_Discriminator &rt) + : map_iter_ (rt.map_) { } int -Event_Forwarding_Discriminator_Iterator::next (Dispatch_Set *&ss) +Event_Forwarding_Discriminator_Iterator::next (Consumer_Dispatch_Set *&cds) { - // Loop in order to skip over inactive entries if necessary. - - for (ACE_Map_Entry<Event_Addr, Dispatch_Set *> *temp = 0; - this->map_iter_.next (temp) != 0; - this->advance ()) + ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *> *temp; + if (this->map_iter_.next (temp) == 0) + return 0; + else { - // Otherwise, return the next item. - ss = temp->int_id_; + cds = temp->int_id_; return 1; } - return 0; } int diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h index 35a594b61b5..9b7531c1f46 100644 --- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h @@ -20,29 +20,27 @@ #include "ace/Map_Manager.h" #include "Concurrency_Strategies.h" #include "Event.h" -#include "Dispatch_Set.h" +#include "Consumer_Dispatch_Set.h" class Event_Forwarding_Discriminator { // = TITLE - // Define a generic consumer map based on the ACE Map_Manager. - // - // = DESCRIPTION - // This class makes it easier to use the Map_Manager. + // Map events to the set of Consumer_Proxies that have subscribed + // to receive the event. public: - int bind (Event_Addr event, Dispatch_Set *Dispatch_Set); - // Associate Event with the Dispatch_Set. + int bind (Event_Key event, Consumer_Dispatch_Set *cds); + // Associate Event with the Consumer_Dispatch_Set. - int find (Event_Addr event, Dispatch_Set *&Dispatch_Set); - // Break any association of EXID. - - int unbind (Event_Addr event); + int unbind (Event_Key event); // Locate EXID and pass out parameter via INID. If found, // return 0, else -1. + int find (Event_Key event, Consumer_Dispatch_Set *&cds); + // Break any association of EXID. + public: - ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_; - // Map that associates Event Addrs (external ids) with Dispatch_Set *'s + ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_; + // Map that associates Event Addrs (external ids) with Consumer_Dispatch_Set *'s // <internal IDs>. }; @@ -52,11 +50,11 @@ class Event_Forwarding_Discriminator_Iterator // Define an iterator for the Consumer Map. public: Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm); - int next (Dispatch_Set *&); + int next (Consumer_Dispatch_Set *&); int advance (void); private: - ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_iter_; + ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_iter_; // Map we are iterating over. }; #endif /* _CONSUMER_MAP_H */ diff --git a/apps/Gateway/Gateway/File_Parser.cpp b/apps/Gateway/Gateway/File_Parser.cpp index be33e9a96d2..07bda87180b 100644 --- a/apps/Gateway/Gateway/File_Parser.cpp +++ b/apps/Gateway/Gateway/File_Parser.cpp @@ -15,7 +15,11 @@ typedef FP::Return_Type FP_RETURN_TYPE; template <class ENTRY> int File_Parser<ENTRY>::open (const char filename[]) { - return (this->infile_ = ACE_OS::fopen (filename, "r")) == 0 ? -1 : 0; + this->infile_ = ACE_OS::fopen (filename, "r"); + if (this->infile_ == 0) + return -1; + else + return 0; } template <class ENTRY> int @@ -27,17 +31,13 @@ File_Parser<ENTRY>::close (void) template <class ENTRY> FP_RETURN_TYPE File_Parser<ENTRY>::getword (char buf[]) { - FP_RETURN_TYPE read_result = this->readword(buf); - if (read_result == FP::SUCCESS) - return FP::SUCCESS; - else - return read_result; + return this->readword (buf); } // Get the next string from the file via this->readword() // Check make sure the string forms a valid number. template <class ENTRY> FP_RETURN_TYPE -File_Parser<ENTRY>::getint (int &value) +File_Parser<ENTRY>::getint (ACE_INT32 &value) { char buf[BUFSIZ]; FP_RETURN_TYPE read_result = this->readword(buf); @@ -50,7 +50,7 @@ File_Parser<ENTRY>::getint (int &value) value = ACE_OS::strtol (buf, &ptr, 10); // check if the buf is a decimal or not - if ((value == 0) && (ptr == buf)) + if (value == 0 && ptr == buf) return FP::ERROR; else return FP::SUCCESS; @@ -81,8 +81,8 @@ File_Parser<ENTRY>::readword (char buf[]) buf[wordlength] = '\0'; if (c == EOF) { - // If the EOF is just a dilimeter, don't return EOF so that the - // word gets processed + // If EOF is just a delimiter, don't return EOF so that the word + // gets processed. if (wordlength > 0) { ungetc (c, this->infile_); @@ -94,7 +94,7 @@ File_Parser<ENTRY>::readword (char buf[]) } else if (c == '\n') { - // if the EOLINE is just a dilimeter, don't return EOLINE + // if the EOLINE is just a delimiter, don't return EOLINE // so that the word gets processed if (wordlength > 0) ungetc (c, this->infile_); diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index 80b768aff84..f1de7429db0 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -52,7 +52,7 @@ protected: FP::Return_Type getword (char buf[]); // Read the next ASCII word. - FP::Return_Type getint (int &value); + FP::Return_Type getint (ACE_INT32 &value); // Read the next integer. FP::Return_Type readword (char buf[]); diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index 2c963ff3d7f..4ff09aed1b7 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -1,6 +1,8 @@ /* -*- C++ -*- */ // $Id$ +#include "ace/Get_Opt.h" +#include "Config_Files.h" #include "ace/Service_Config.h" #include "Event_Channel.h" #include "Gateway.h" @@ -24,6 +26,12 @@ public: virtual int info (char **, size_t) const; // Return info about this service. + int parse_connection_config_file (void); + // Parse the connection configuration file. + + int parse_consumer_config_file (void); + // Parse the consumer configuration file. + protected: int handle_input (ACE_HANDLE); // Shut down the Gateway when input comes in from the controlling @@ -36,13 +44,21 @@ protected: // Parse gateway configuration arguments obtained from svc.conf // file. - ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_; + char connection_config_file_[MAXPATHLEN + 1]; + // Name of the connection configuration file. + + char consumer_config_file_[MAXPATHLEN + 1]; + // Name of the consumer map configuration file. + + ACE_Event_Channel event_channel_; // The Event Channel routes events from Supplier(s) to Consumer(s). -}; -// Convenient shorthands. -// #define IC SUPPLIER_HANDLER -// #define OC CONSUMER_HANDLER + int active_connector_role_; + // Enabled if we are playing the role of the active Connector. + + int debug_; + // Are we debugging? +}; int Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *) @@ -70,18 +86,76 @@ Gateway::handle_input (ACE_HANDLE h) return this->handle_signal (h); } +// Parse the "command-line" arguments and set the corresponding flags. + +int +Gateway::parse_args (int argc, char *argv[]) +{ + ACE_OS::strcpy (this->connection_config_file_, "connection_config"); + ACE_OS::strcpy (this->consumer_config_file_, "consumer_config"); + this->active_connector_role_ = 1; + this->debug_ = 0; + + ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0); + + for (int c; (c = get_opt ()) != -1; ) + { + switch (c) + { + case 'b': // Use blocking connection establishment. + this->event_channel_.options ().blocking_semantics_ = 0; + break; + case 'c': + ACE_OS::strncpy (this->connection_config_file_, + get_opt.optarg, + sizeof this->connection_config_file_); + break; + case 'd': + this->debug_ = 1; + break; + case 'p': + // We are not playing the active Connector role. + this->active_connector_role_ = 0; + break; + case 'q': + this->event_channel_.options ().socket_queue_size_ = + ACE_OS::atoi (get_opt.optarg); + break; + case 'r': + ACE_OS::strncpy (this->consumer_config_file_, + get_opt.optarg, + sizeof this->consumer_config_file_); + break; + case 'w': // Time performance for a designated amount of time. + this->event_channel_.options ().performance_window_ = + ACE_OS::atoi (get_opt.optarg); + // Use blocking connection semantics so that we get accurate + // timings (since all connections start at once). + this->event_channel_.options ().blocking_semantics_ = 0; + break; + default: + break; + } + } + return 0; +} + int Gateway::init (int argc, char *argv[]) { - if (this->event_channel_.open (argc, argv) == -1) + // Initialize the Event_Channel. + if (this->event_channel_.open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1); + // Parse the "command-line" arguments. + this->parse_args (argc, argv); + ACE_Sig_Set sig_set; sig_set.sig_add (SIGINT); sig_set.sig_add (SIGQUIT); - // Register ourselves to receive SIGINT and SIGQUIT - // so we can shut down gracefully via signals. + // Register ourselves to receive SIGINT and SIGQUIT so we can shut + // down gracefully via signals. if (ACE_Service_Config::reactor ()->register_handler (sig_set, this) == -1) @@ -90,6 +164,20 @@ Gateway::init (int argc, char *argv[]) if (ACE_Service_Config::reactor ()->register_handler (0, this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + + if (this->active_connector_role_) + { + // Parse the connection configuration file. + this->parse_connection_config_file (); + + // Parse the consumer map config file and build the consumer + // map. + this->parse_consumer_config_file (); + + // Initiate connections with the peers. + this->event_channel_.initiate_connections (); + } + return 0; } @@ -120,6 +208,133 @@ Gateway::info (char **strp, size_t length) const return ACE_OS::strlen (buf); } +// Parse and build the connection table. + +int +Gateway::parse_connection_config_file (void) +{ + // File that contains the consumer map configuration information. + Connection_Config_File_Parser connection_file; + Connection_Config_File_Entry entry; + int file_empty = 1; + int line_number = 0; + + if (connection_file.open (this->connection_config_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1); + + // Read config file one line at a time. + while (connection_file.read_entry (entry, line_number) != FP::EOFILE) + { + file_empty = 0; + + if (this->debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, " + "proxy role = %c, max retry timeout = %d, local port = %d\n", + entry.conn_id_, + entry.host_, + entry.remote_port_, + entry.proxy_role_, + entry.max_retry_delay_, + entry.local_port_)); + + Proxy_Handler *proxy_handler = 0; + + // Initialize the entry's peer addressing info. + + ACE_INET_Addr remote_addr (entry.remote_port_, entry.host_); + ACE_INET_Addr local_addr (entry.local_port_); + + // The next few lines of code are dependent on whether we are + // making an Supplier_Proxy or an Consumer_Proxy. + + if (entry.proxy_role_ == 'C') // Configure a Consumer_Proxy. + ACE_NEW_RETURN (proxy_handler, + CONSUMER_PROXY (this->event_channel_, remote_addr, + local_addr, entry.conn_id_), + -1); + else // proxy_role == 'S', so configure a Supplier_Proxy. + ACE_NEW_RETURN (proxy_handler, + SUPPLIER_PROXY (this->event_channel_, remote_addr, + local_addr, entry.conn_id_), + -1); + + // The following code is common to both Supplier_Proxys_ and + // Consumer_Proxys. + + // Initialize max timeout. + proxy_handler->max_timeout (entry.max_retry_delay_); + + // Bind the new Proxy_Handler to the connection ID. + this->event_channel_.bind_proxy (proxy_handler); + } + + if (file_empty) + ACE_ERROR ((LM_WARNING, + "warning: connection proxy_handler configuration file was empty\n")); + return 0; +} + +int +Gateway::parse_consumer_config_file (void) +{ + // File that contains the consumer event forwarding information. + Consumer_Config_File_Parser consumer_file; + Consumer_Config_File_Entry entry; + int file_empty = 1; + int line_number = 0; + + if (consumer_file.open (this->consumer_config_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1); + + // Read config file line at a time. + while (consumer_file.read_entry (entry, line_number) != FP::EOFILE) + { + file_empty = 0; + + if (this->debug_) + { + ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " + "number of consumers = %d\n", + entry.conn_id_, + entry.supplier_id_, + entry.type_, + entry.total_consumers_)); + for (int i = 0; i < entry.total_consumers_; i++) + ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", + i, entry.consumers_[i])); + } + + Consumer_Dispatch_Set *dispatch_set; + ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1); + + Event_Key event_addr (entry.conn_id_, + entry.supplier_id_, + entry.type_); + + // Add the Consumers to the Dispatch_Set. + for (int i = 0; i < entry.total_consumers_; i++) + { + Proxy_Handler *proxy_handler = 0; + + // Lookup destination and add to Consumer_Dispatch_Set set + // if found. + if (this->event_channel_.find_proxy (entry.consumers_[i], + proxy_handler) != -1) + dispatch_set->insert (proxy_handler); + else + ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", + i, entry.consumers_[i])); + } + + this->event_channel_.subscribe (event_addr, dispatch_set); + } + + if (file_empty) + ACE_ERROR ((LM_WARNING, + "warning: consumer map configuration file was empty\n")); + return 0; +} + // The following is a "Factory" used by the ACE_Service_Config and // svc.conf file to dynamically initialize the state of the Gateway. diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile index 0f5ddc07eb0..c3ae8dffe4d 100644 --- a/apps/Gateway/Gateway/Makefile +++ b/apps/Gateway/Gateway/Makefile @@ -153,7 +153,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ - Dispatch_Set.h Gateway.h + Consumer_Dispatch_Set.h Gateway.h .obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ @@ -222,7 +222,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ - Dispatch_Set.h Event_Channel.h + Consumer_Dispatch_Set.h Event_Channel.h .obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \ Event_Forwarding_Discriminator.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ @@ -245,9 +245,9 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ - Event.h Dispatch_Set.h \ + Event.h Consumer_Dispatch_Set.h \ $(WRAPPER_ROOT)/ace/Set.h -.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Dispatch_Set.h \ +.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Consumer_Dispatch_Set.h \ $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -381,7 +381,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ - Dispatch_Set.h + Consumer_Dispatch_Set.h .obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \ Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ @@ -446,7 +446,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Task_T.h \ Event_Forwarding_Discriminator.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Concurrency_Strategies.h Event.h Dispatch_Set.h \ + Concurrency_Strategies.h Event.h Consumer_Dispatch_Set.h \ Proxy_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Connector.i diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp index 86e0fff8e41..2f161c171f6 100644 --- a/apps/Gateway/Gateway/Proxy_Handler.cpp +++ b/apps/Gateway/Gateway/Proxy_Handler.cpp @@ -1,11 +1,18 @@ // $Id$ -#include "Dispatch_Set.h" -#include "Proxy_Handler_Connector.h" +#include "Event_Channel.h" -// Convenient short-hands. -#define CO CONDITION -#define MU MAP_MUTEX +void +Proxy_Handler::id (ACE_INT32 id) +{ + this->id_ = id; +} + +ACE_INT32 +Proxy_Handler::id (void) +{ + return this->id_; +} // The total number of bytes sent/received on this Proxy. @@ -21,36 +28,35 @@ Proxy_Handler::total_bytes (size_t bytes) this->total_bytes_ += bytes; } -Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), - efd_ (efd), - id_ (-1), +Proxy_Handler::Proxy_Handler (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : remote_addr_ (remote_addr), + local_addr_ (local_addr), + id_ (conn_id), total_bytes_ (0), state_ (Proxy_Handler::IDLE), - connector_ (ioc), timeout_ (1), - max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), - socket_queue_size_ (socket_queue_size) + max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), + event_channel_ (ec) { } -// Set the direction. +// Set the proxy_role. void -Proxy_Handler::direction (char d) +Proxy_Handler::proxy_role (char d) { - this->direction_ = d; + this->proxy_role_ = d; } -// Get the direction. +// Get the proxy_role. char -Proxy_Handler::direction (void) +Proxy_Handler::proxy_role (void) { - return this->direction_; + return this->proxy_role_; } // Sets the timeout delay. @@ -64,9 +70,9 @@ Proxy_Handler::timeout (int to) this->timeout_ = to; } -// Recalculate the current retry timeout delay using exponential +// Re-calculate the current retry timeout delay using exponential // backoff. Returns the original timeout (i.e., before the -// recalculation). +// re-calculation). int Proxy_Handler::timeout (void) @@ -99,37 +105,16 @@ Proxy_Handler::max_timeout (void) // Restart connection asynchronously when timeout occurs. int -Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *) +Proxy_Handler::handle_timeout (const ACE_Time_Value &, + const void *) { ACE_DEBUG ((LM_DEBUG, "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n", this->id (), this->timeout_)); - return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); -} - -// Restart connection (blocking_semantics dicates whether we -// restart synchronously or asynchronously). -int -Proxy_Handler::reinitiate_connection (void) -{ - // Skip over deactivated descriptors. - if (this->get_handle () != ACE_INVALID_HANDLE) - { - // Make sure to close down peer to reclaim descriptor. - this->peer ().close (); - - ACE_DEBUG ((LM_DEBUG, - "(%t) scheduling reinitiation of Proxy_Handler %d\n", - this->id ())); - - // Reschedule ourselves to try and connect again. - if (ACE_Service_Config::reactor ()->schedule_timer - (this, 0, this->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - return 0; + // Delegate the re-connection attempt to the Event Channel. + return this->event_channel_.initiate_proxy_connection + (this, ACE_Synch_Options::asynch); } // Handle shutdown of the Proxy_Handler object. @@ -141,7 +126,8 @@ Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) "(%t) shutting down Proxy_Handler %d on handle %d\n", this->id (), this->get_handle ())); - return this->reinitiate_connection (); + // Restart the connection, if possible. + return this->event_channel_.reinitiate_proxy_connection (this); } // Set the state of the Proxy. @@ -152,66 +138,29 @@ Proxy_Handler::state (Proxy_Handler::State s) this->state_ = s; } -// Perform the first-time initiation of a connection to the peer. - -int -Proxy_Handler::initialize_connection (void) -{ - this->state_ = Proxy_Handler::ESTABLISHED; - - // Restart the timeout to 1. - this->timeout (1); - - // Action that sends the connection id to the peerd. - - ACE_INT32 id = htonl (this->id ()); - - ssize_t n = this->peer ().send ((const void *) &id, sizeof id); - - if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - n == 0 ? "peer has closed down unexpectedly" : "send"), - -1); - return 0; -} - -// Set the size of the socket queue. - -void -Proxy_Handler::socket_queue_size (void) -{ - if (this->socket_queue_size_ > 0) - { - int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; - - if (this->peer ().set_option (SOL_SOCKET, option, - &this->socket_queue_size_, - sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); - } -} - -// Upcall from the ACE_Acceptor::handle_input() that -// delegates control to our application-specific Proxy_Handler. +// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates +// control to our Proxy_Handler. int -Proxy_Handler::open (void *a) +Proxy_Handler::open (void *) { - ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's handle = %d\n", this->peer ().get_handle ())); - // Set the size of the socket queue. - this->socket_queue_size (); - // Turn on non-blocking I/O. if (this->peer ().enable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - // Call down to the base class to activate and register this handler. - if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_.complete_proxy_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); - return this->initialize_connection (); + // Register ourselves to receive input events. + else if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + else + return 0; } // Return the current state of the Proxy. @@ -222,30 +171,6 @@ Proxy_Handler::state (void) return this->state_; } -void -Proxy_Handler::id (ACE_INT32 id) -{ - this->id_ = id; -} - -ACE_INT32 -Proxy_Handler::id (void) -{ - return this->id_; -} - -// Set the peer's address information. -int -Proxy_Handler::bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32 id) -{ - this->remote_addr_ = remote_addr; - this->local_addr_ = local_addr; - this->id_ = id; - return 0; -} - ACE_INET_Addr & Proxy_Handler::remote_addr (void) { @@ -258,15 +183,13 @@ Proxy_Handler::local_addr (void) return this->local_addr_; } -// Constructor sets the consumer map pointer. - -Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) +Consumer_Proxy::Consumer_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : Proxy_Handler (ec, remote_addr, local_addr, conn_id) { - this->direction_ = 'C'; + this->proxy_role_ = 'C'; this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE); } @@ -311,7 +234,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) // Try to send the event. If we don't send it all (e.g., due to // flow control), then re-queue the remainder at the head of the // Event_List and ask the ACE_Reactor to inform us (via - // handle_output()) when it is possible to try again. + // handle_output()) when it is possible to try again. ssize_t n = this->send (event); @@ -325,7 +248,8 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) } else if (errno == EWOULDBLOCK) // Didn't manage to send everything. { - ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing activated on handle %d to routing id %d\n", this->get_handle (), this->id ())); // ACE_Queue in *front* of the list to preserve order. @@ -354,11 +278,11 @@ Consumer_Proxy::send (ACE_Message_Block *event) else if (n < len) // Re-adjust pointer to skip over the part we did send. event->rd_ptr (n); - else /* if (n == length) */ + else // if (n == length) { - // The whole event is sent, we can now safely deallocate the - // buffer. Note that this should decrement a reference count... - delete event; + // The whole event is sent, we now decrement the reference count + // (which deletes itself with it reaches 0. + event->release (); errno = 0; } this->total_bytes (n); @@ -389,9 +313,9 @@ Consumer_Proxy::handle_output (ACE_HANDLE) break; case -1: - // We are responsible for freeing an ACE_Message_Block if + // We are responsible for releasing an ACE_Message_Block if // failures occur. - delete event; + event->release (); ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); /* FALLTHROUGH */ @@ -436,17 +360,14 @@ Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *) (event, (ACE_Time_Value *) &ACE_Time_Value::zero); } -// Constructor sets the consumer map pointer and the connector -// pointer. - -Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) +Supplier_Proxy::Supplier_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) : msg_frag_ (0), - Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) + Proxy_Handler (ec, remote_addr, local_addr, conn_id) { - this->direction_ = 'S'; + this->proxy_role_ = 'S'; this->msg_queue ()->high_water_mark (0); } @@ -490,8 +411,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) ACE_DEBUG ((LM_DEBUG, "attempted to read %d\n", header_bytes_left_to_read)); - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); return header_received; } @@ -508,11 +428,34 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) errno = EWOULDBLOCK; return -1; } + + // Convert the header into host byte order so that we can access + // it directly without having to repeatedly muck with it... + event->header_.decode (); + + if (event->header_.len_ > sizeof event->data_) + { + // This data_ payload is too big! + errno = EINVAL; + ACE_DEBUG ((LM_DEBUG, + "Data payload is too big (%d bytes)\n", + event->header_.len_)); + return -1; + } + } - // At this point there is a complete, valid header in msg_frag_ + // At this point there is a complete, valid header in Event. Now we + // need to get the event payload. Due to incomplete reads this may + // not be the first time we've read in a fragment for this message. + // We account for this here. Note that the first time in here + // msg_frag_->wr_ptr() will point to event->data_. Every time we do + // a successful fragment read, we advance wr_ptr(). Therefore, by + // subtracting how much we've already read from the + // event->header_.len_ we complete the data_bytes_left_to_read... + ssize_t data_bytes_left_to_read = - sizeof (Event) - this->msg_frag_->length (); + ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); ssize_t data_received = this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); @@ -529,8 +472,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) /* FALLTHROUGH */; case 0: // Premature EOF. - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); return 0; default: @@ -550,23 +492,22 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) // Allocate an event forwarding header and chain the data // portion onto its continuation field. - forward_addr = new ACE_Message_Block (sizeof (Event_Addr), + forward_addr = new ACE_Message_Block (sizeof (Event_Key), ACE_Message_Block::MB_PROTO, this->msg_frag_); if (forward_addr == 0) { - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); errno = ENOMEM; return -1; } - Event_Addr event_addr (this->id (), + Event_Key event_addr (this->id (), event->header_.supplier_id_, event->header_.type_); - // Copy the forwarding address from the Event_Addr into + // Copy the forwarding address from the Event_Key into // forward_addr. - forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr)); + forward_addr->copy ((char *) &event_addr, sizeof (Event_Key)); // Reset the pointer to indicate we've got an entire event. this->msg_frag_ = 0; @@ -579,8 +520,12 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) event->header_.len_, event->data_)); #else ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", - event->header_.supplier_id_, event->header_.len_, this->total_bytes ())); -#endif + event->header_.supplier_id_, event->header_.len_, data_received + header_received)); +#endif /* VERBOSE */ + + // Encode before returning so that we can set things out in + // network byte order. + event->header_.encode (); return data_received + header_received; } } @@ -620,79 +565,17 @@ Supplier_Proxy::handle_input (ACE_HANDLE) } } -// Forward an event to its appropriate Consumer(s). +// Forward an event to its appropriate Consumer(s). This delegates to +// the <ACE_Event_Channel> to do the actual forwarding. int Supplier_Proxy::forward (ACE_Message_Block *forward_addr) { - // We got a valid event, so determine its virtual forwarding - // address, which is stored in the first of the two event blocks, - // which are chained together by this->recv(). - - Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr (); - - // Skip over the address portion and get the data. - const ACE_Message_Block *const data = forward_addr->cont (); - - // <dispatch_set> points to the set of Consumers associated with - // this forwarding address. - Dispatch_Set *dispatch_set = 0; - - if (this->efd_->find (*forwarding_addr, dispatch_set) != -1) - { - // Check to see if there are any destinations. - if (dispatch_set->size () == 0) - ACE_DEBUG ((LM_WARNING, - "there are no active destinations for this event currently\n")); - - else // There are destinations, so forward the event. - { - Dispatch_Set_Iterator dsi (*dispatch_set); - - for (Proxy_Handler **proxy_handler = 0; - dsi.next (proxy_handler) != 0; - dsi.advance ()) - { - // Only process active proxy_handlers. - if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) - { - // Clone the event portion (should be doing - // reference counting here...) - ACE_Message_Block *newmsg = data->clone (); - - ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", - (*proxy_handler)->id ())); - - if ((*proxy_handler)->put (newmsg) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", - "gateway is flow controlled, so we're dropping events")); - else - ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", - "put", (*proxy_handler)->id ())); - - // We are responsible for freeing a - // ACE_Message_Block if failures occur. - delete newmsg; - } - } - } - // Will become superfluous once we have reference - // counting... - delete forward_addr; - return 0; - } - } - delete forward_addr; - // Failure return. - ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", - forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_)); - return 0; + return this->event_channel_.put (forward_addr); } #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>; -template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>; -template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>; +template class ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h index d91fa3108ff..ffce18d1c71 100644 --- a/apps/Gateway/Gateway/Proxy_Handler.h +++ b/apps/Gateway/Gateway/Proxy_Handler.h @@ -21,11 +21,12 @@ #include "ace/SOCK_Connector.h" #include "ace/Svc_Handler.h" #include "Event_Forwarding_Discriminator.h" -#include "Dispatch_Set.h" +#include "Consumer_Dispatch_Set.h" #include "Event.h" // Forward declaration. class Proxy_Handler_Connector; +class ACE_Event_Channel; class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> // = TITLE @@ -36,20 +37,15 @@ class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> // Channel from Suppliers and forward them to Consumers. { public: - Proxy_Handler (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); + Proxy_Handler (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); virtual int open (void * = 0); // Initialize and activate a single-threaded Proxy_Handler (called by // ACE_Connector::handle_output()). - int bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32); - // Set the peer's addressing and routing information. - ACE_INET_Addr &remote_addr (void); // Returns the peer's routing address. @@ -82,12 +78,12 @@ public: void max_timeout (int); int max_timeout (void); - // = Set/get direction (i.e., 'S' for Supplier and 'C' for Consumer + // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer // (necessary for error checking). - void direction (char); - char direction (void); + void proxy_role (char); + char proxy_role (void); - // = The total number of bytes sent/received on this channel. + // = The total number of bytes sent/received on this proxy. size_t total_bytes (void); void total_bytes (size_t bytes); // Increment count by <bytes>. @@ -101,22 +97,10 @@ protected: MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. }; - int initialize_connection (void); - // Perform the first-time initiation of a connection to the peer. - - int reinitiate_connection (void); - // Reinitiate a connection asynchronously when peers fail. - - void socket_queue_size (void); - // Set the socket queue size. - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Perform Proxy_Handler termination. - Event_Forwarding_Discriminator *efd_; - // Maps Events to a set of Consumers. - ACE_INET_Addr remote_addr_; // Address of peer. @@ -127,14 +111,10 @@ protected: // The assigned routing ID of this entry. size_t total_bytes_; - // The total number of bytes sent/received on this channel. + // The total number of bytes sent/received on this proxy. State state_; - // The current state of the channel. - - Proxy_Handler_Connector *connector_; - // Back pointer to Proxy_Handler_Connector to reestablish broken - // connections. + // The current state of the proxy. int timeout_; // Amount of time to wait between reconnection attempts. @@ -142,12 +122,13 @@ protected: int max_timeout_; // Maximum amount of time to wait between reconnection attempts. - char direction_; - // Indicates which direction data flows through the channel ('S' == - // Supplier and 'C' == Consumer). + char proxy_role_; + // Indicates which role the proxy plays ('S' == Supplier and 'C' == + // Consumer). - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). + ACE_Event_Channel &event_channel_; + // Reference to the <ACE_Event_Channel> that we use to forward all + // the events from Consumers and Suppliers. }; class Supplier_Proxy : public Proxy_Handler @@ -158,13 +139,15 @@ class Supplier_Proxy : public Proxy_Handler // Performs framing and error checking. { public: - Supplier_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - // Constructor sets the consumer map pointer. + // = Initialization method. + Supplier_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); protected: + // = All the following methods are upcalls, so they can be protected. + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); // Receive and process peer events. @@ -172,7 +155,8 @@ protected: // Receive an event from a Supplier. int forward (ACE_Message_Block *event); - // Forward the Event to a Consumer. + // Forward the <event> to its appropriate Consumer. This delegates + // to the <ACE_Event_Channel> to do the actual forwarding. ACE_Message_Block *msg_frag_; // Keep track of event fragment to handle non-blocking recv's from @@ -184,19 +168,22 @@ class Consumer_Proxy : public Proxy_Handler // Handles transmission of events to Consumers. // // = DESCRIPTION - // Uses a single-threaded approach. + // Performs queueing and error checking. Uses a single-threaded + // Reactive approach to handle flow control. { public: - Consumer_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // = Initialization method. + Consumer_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); + + virtual int put (ACE_Message_Block *event, + ACE_Time_Value * = 0); // Send an event to a Consumer (may be queued if necessary). protected: - // = We'll allow up to 16 megabytes to be queued per-output channel. + // = We'll allow up to 16 megabytes to be queued per-output proxy. enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16}; virtual int handle_output (ACE_HANDLE); diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp index 7ac0a77a2d4..dc18eca8500 100644 --- a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp +++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp @@ -18,15 +18,15 @@ Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) // Locate the ACE_Svc_Handler corresponding to the socket descriptor. if (this->handler_map_.find (sd, stp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", + ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate proxy %d in connector map, %p\n", sd, "find"), -1); - Proxy_Handler *channel = stp->svc_handler (); + Proxy_Handler *proxy_handler = stp->svc_handler (); // Schedule a reconnection request at some point in the future - // (note that channel uses an exponential backoff scheme). - if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, - channel->timeout ()) == -1) + // (note that proxy_handler uses an exponential backoff scheme). + if (ACE_Service_Config::reactor ()->schedule_timer + (proxy_handler, 0, proxy_handler->timeout ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); return 0; @@ -35,36 +35,37 @@ Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) // Initiate (or reinitiate) a connection to the Proxy_Handler. int -Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel, - ACE_Synch_Options &synch_options) +Proxy_Handler_Connector::initiate_connection (Proxy_Handler *proxy_handler, + ACE_Synch_Options &synch_options) { - char buf[MAXHOSTNAMELEN]; + char addr_buf[MAXHOSTNAMELEN]; // Mark ourselves as idle so that the various iterators // will ignore us until we are reconnected. - channel->state (Proxy_Handler::IDLE); + proxy_handler->state (Proxy_Handler::IDLE); - if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 - || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) + // We check the remote addr second so that it remains in the addr_buf. + if (proxy_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1 + || proxy_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "can't obtain peer's address"), -1); // Try to connect to the Peer. - if (this->connect (channel, channel->remote_addr (), - synch_options, channel->local_addr ()) == -1) + if (this->connect (proxy_handler, proxy_handler->remote_addr (), + synch_options, proxy_handler->local_addr ()) == -1) { if (errno != EWOULDBLOCK) { - channel->state (Proxy_Handler::FAILED); + proxy_handler->state (Proxy_Handler::FAILED); ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", - "connect", buf)); + "connect", addr_buf)); // Reschedule ourselves to try and connect again. if (synch_options[ACE_Synch_Options::USE_REACTOR]) { if (ACE_Service_Config::reactor ()->schedule_timer - (channel, 0, channel->timeout ()) == 0) + (proxy_handler, 0, proxy_handler->timeout ()) == 0) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); } @@ -75,18 +76,18 @@ Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel, } else { - channel->state (Proxy_Handler::CONNECTING); + proxy_handler->state (Proxy_Handler::CONNECTING); ACE_DEBUG ((LM_DEBUG, "(%t) in the process of connecting %s to %s\n", synch_options[ACE_Synch_Options::USE_REACTOR] - ? "asynchronously" : "synchronously", buf)); + ? "asynchronously" : "synchronously", addr_buf)); } } else { - channel->state (Proxy_Handler::ESTABLISHED); + proxy_handler->state (Proxy_Handler::ESTABLISHED); ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", - buf, channel->get_handle ())); + addr_buf, proxy_handler->get_handle ())); } return 0; } diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp index 98722a96295..f316e4e82bf 100644 --- a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp @@ -1,30 +1,33 @@ -#include "Thr_Proxy_Handler.h" // $Id$ -#include "Proxy_Handler_Connector.h" +#include "Event_Channel.h" +#include "Thr_Proxy_Handler.h" #if defined (ACE_HAS_THREADS) -Thr_Consumer_Proxy::Thr_Consumer_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Consumer_Proxy (efd, ioc, thr_mgr, socket_queue_size) +Thr_Consumer_Proxy::Thr_Consumer_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : Consumer_Proxy (ec, remote_addr, local_addr, conn_id) { } -// This method should be called only when the peer shuts down -// unexpectedly. This method marks the Proxy_Handler as having failed and -// deactivates the ACE_Message_Queue (to wake up the thread blocked on -// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will -// eventually try to reconnect... +// This method should be called only when the Consumer shuts down +// unexpectedly. This method marks the Proxy_Handler as having failed +// and deactivates the ACE_Message_Queue (to wake up the thread +// blocked on <dequeue_head> in svc()). +// Thr_Output_Handler::handle_close () will eventually try to +// reconnect... int Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) { + // Call down to the <Consumer_Proxy> to handle this first. this->Consumer_Proxy::handle_input (h); - ACE_Service_Config::reactor ()->remove_handler (h, - ACE_Event_Handler::RWE_MASK - | ACE_Event_Handler::DONT_CALL); + + ACE_Service_Config::reactor ()->remove_handler + (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); + // Deactivate the queue while we try to get reconnected. this->msg_queue ()->deactivate (); return 0; @@ -36,31 +39,28 @@ Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) int Thr_Consumer_Proxy::open (void *) { - // Set the size of the socket queue. - this->socket_queue_size (); - // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_.complete_proxy_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); + // Register ourselves to receive input events (which indicate that - // the Peer has shut down unexpectedly). - if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) + // the Consumer has shut down unexpectedly). + else if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); - // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) { ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit - // messages to peers. + // events to Consumers. return this->activate (THR_NEW_LWP | THR_DETACHED); } else @@ -70,87 +70,93 @@ Thr_Consumer_Proxy::open (void *) } } -// ACE_Queue up a message for transmission (must not block since all -// Supplier_Proxys are single-threaded). +// Queue up an event for transmission (must not block since +// Supplier_Proxys may be single-threaded). int Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *) { // Perform non-blocking enqueue. - return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); } -// Transmit messages to the peer (note simplification resulting from +// Transmit events to the peer (note simplification resulting from // threads...) int Thr_Consumer_Proxy::svc (void) { + for (;;) { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Proxy's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) connected! Thr_Consumer_Proxy's handle = %d\n", this->peer ().get_handle ())); // Since this method runs in its own thread it is OK to block on // output. for (ACE_Message_Block *mb = 0; - this->msg_queue ()->dequeue_head (mb) != -1; ) - if (this->send (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); - - ACE_ASSERT (errno == ESHUTDOWN); - - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", - this->id (), this->get_handle ())); - - this->peer ().close (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", - tv.sec ())); - ACE_OS::sleep (tv); - } + this->msg_queue ()->dequeue_head (mb) != -1; + ) + { + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); + } + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", + this->id (), this->get_handle ())); + + this->peer ().close (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->event_channel_.initiate_proxy_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + + ACE_OS::sleep (tv); + } } return 0; } -Thr_Supplier_Proxy::Thr_Supplier_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Supplier_Proxy (efd, ioc, thr_mgr, socket_queue_size) +Thr_Supplier_Proxy::Thr_Supplier_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : Supplier_Proxy (ec, remote_addr, local_addr, conn_id) { } int Thr_Supplier_Proxy::open (void *) { - // Set the size of the socket queue. - this->socket_queue_size (); - // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_.complete_proxy_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) { ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit - // messages to peers. + // events to peers. return this->activate (THR_NEW_LWP | THR_DETACHED); } else @@ -160,7 +166,7 @@ Thr_Supplier_Proxy::open (void *) } } -// Receive messages from a Peer in a separate thread (note reuse of +// Receive events from a Peer in a separate thread (note reuse of // existing code!). int @@ -168,20 +174,20 @@ Thr_Supplier_Proxy::svc (void) { for (;;) { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Proxy's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) connected! Thr_Supplier_Proxy's handle = %d\n", this->peer ().get_handle ())); - // Since this method runs in its own thread and processes - // messages for one connection it is OK to block on input and - // output. + // Since this method runs in its own thread and processes events + // for one connection it is OK to call down to the + // <Supplier_Proxy::handle_input> method, which blocks on input. while (this->handle_input () != -1) continue; ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n", - this->id (), - this->get_handle ())); + this->id (), this->get_handle ())); this->peer ().close (); @@ -190,11 +196,12 @@ Thr_Supplier_Proxy::svc (void) for (this->timeout (1); // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) + this->event_channel_.initiate_proxy_connection (this) == -1; ) { ACE_Time_Value tv (this->timeout ()); ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", tv.sec ())); + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); ACE_OS::sleep (tv); } } diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h index 8ecced3805c..275bc87b320 100644 --- a/apps/Gateway/Gateway/Thr_Proxy_Handler.h +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.h @@ -25,21 +25,22 @@ class Thr_Consumer_Proxy : public Consumer_Proxy // Runs each Output Proxy_Handler in a separate thread. { public: - Thr_Consumer_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); + Thr_Consumer_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); virtual int open (void *); // Initialize the threaded Consumer_Proxy object and spawn a new // thread. - virtual int handle_input (ACE_HANDLE); - // Called when Peer shutdown unexpectedly. - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); // Send a message to a peer. +protected: + virtual int handle_input (ACE_HANDLE); + // Called when Peer shutdown unexpectedly. + virtual int svc (void); // Transmit peer messages. }; @@ -49,14 +50,15 @@ class Thr_Supplier_Proxy : public Supplier_Proxy // Runs each Input Proxy_Handler in a separate thread. { public: - Thr_Supplier_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); + Thr_Supplier_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); virtual int open (void *); // Initialize the object and spawn a new thread. +protected: virtual int svc (void); // Transmit peer messages. }; diff --git a/apps/Gateway/Gateway/gatewayd.cpp b/apps/Gateway/Gateway/gatewayd.cpp index 48b6e9a173d..b0af5f7cace 100644 --- a/apps/Gateway/Gateway/gatewayd.cpp +++ b/apps/Gateway/Gateway/gatewayd.cpp @@ -17,18 +17,17 @@ main (int argc, char *argv[]) ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1)); else // Use static binding. { - static char *l_argv[3] = { "-d" }; ACE_Service_Object *so = ACE_SVC_INVOKE (Gateway); - if (so->init (1, l_argv) == -1) + if (so->init (argc - 1, argv + 1) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1)); } } // Run forever, performing the configured services until we are shut - // down by a signal. + // down by a SIGINT/SIGQUIT signal. - ACE_Service_Config::run_reactor_event_loop (); + daemon.run_reactor_event_loop (); return 0; } |