diff options
28 files changed, 778 insertions, 730 deletions
diff --git a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp index 9e0b6c45fa9..f60a8068db4 100644 --- a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp +++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp @@ -2,41 +2,41 @@ #define ACE_BUILD_SVC_DLL #include "Event_Channel.h" -#include "Concrete_Proxy_Handlers.h" +#include "Concrete_Connection_Handlers.h" -Consumer_Proxy::Consumer_Proxy (const Proxy_Config_Info &pci) - : Proxy_Handler (pci) +Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci) + : Connection_Handler (pci) { - this->proxy_role_ = 'C'; - this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE); + this->connection_role_ = 'C'; + this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ()); } // This method should be called only when the Consumer shuts down -// unexpectedly. This method simply marks the Proxy_Handler as having +// unexpectedly. This method simply marks the Connection_Handler as having // failed so that handle_close () can reconnect. int -Consumer_Proxy::handle_input (ACE_HANDLE) +Consumer_Handler::handle_input (ACE_HANDLE) { char buf[1]; - this->state (Proxy_Handler::FAILED); + this->state (Connection_Handler::FAILED); switch (this->peer ().recv (buf, sizeof buf)) { case -1: ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n", + "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n", this->id ()), -1); /* NOTREACHED */ case 0: ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n", + "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n", this->id ()), -1); /* NOTREACHED */ default: ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n", + "(%t) Consumer is erroneously sending input to Consumer_Handler %d\n", this->id ()), -1); /* NOTREACHED */ } @@ -47,7 +47,7 @@ Consumer_Proxy::handle_input (ACE_HANDLE) // Event_List. int -Consumer_Proxy::nonblk_put (ACE_Message_Block *event) +Consumer_Handler::nonblk_put (ACE_Message_Block *event) { // Try to send the event. If we don't send it all (e.g., due to // flow control), then re-queue the remainder at the head of the @@ -60,7 +60,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) { // Things have gone wrong, let's try to close down and set up a // new reconnection by calling handle_close(). - this->state (Proxy_Handler::FAILED); + this->state (Connection_Handler::FAILED); this->handle_close (); return -1; } @@ -86,7 +86,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) } ssize_t -Consumer_Proxy::send (ACE_Message_Block *event) +Consumer_Handler::send (ACE_Message_Block *event) { ACE_DEBUG ((LM_DEBUG, "(%t) sending %d bytes to Consumer %d\n", event->length (), this->id ())); @@ -114,7 +114,7 @@ Consumer_Proxy::send (ACE_Message_Block *event) // This method is automatically called by the ACE_Reactor. int -Consumer_Proxy::handle_output (ACE_HANDLE) +Consumer_Handler::handle_output (ACE_HANDLE) { ACE_Message_Block *event = 0; @@ -169,7 +169,7 @@ Consumer_Proxy::handle_output (ACE_HANDLE) // Send an event to a Consumer (may queue if necessary). int -Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *) +Consumer_Handler::put (ACE_Message_Block *event, ACE_Time_Value *) { if (this->msg_queue ()->is_empty ()) // Try to send the event *without* blocking! @@ -181,11 +181,11 @@ Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *) (event, (ACE_Time_Value *) &ACE_Time_Value::zero); } -Supplier_Proxy::Supplier_Proxy (const Proxy_Config_Info &pci) - : Proxy_Handler (pci), +Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci) + : Connection_Handler (pci), msg_frag_ (0) { - this->proxy_role_ = 'S'; + this->connection_role_ = 'S'; this->msg_queue ()->high_water_mark (0); } @@ -201,7 +201,7 @@ Supplier_Proxy::Supplier_Proxy (const Proxy_Config_Info &pci) // of software from knowledge of the event structure. int -Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) +Supplier_Handler::recv (ACE_Message_Block *&forward_addr) { if (this->msg_frag_ == 0) // No existing fragment... @@ -329,8 +329,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) } Event_Key event_addr (this->id (), - event->header_.supplier_id_, - event->header_.type_); + event->header_.type_); // Copy the forwarding address from the Event_Key into // forward_addr. forward_addr->copy ((char *) &event_addr, sizeof (Event_Key)); @@ -340,10 +339,16 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) } this->total_bytes (data_received + header_received); - ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", - event->header_.supplier_id_, event->header_.len_, data_received + header_received)); + ACE_DEBUG ((LM_DEBUG, + "(%t) connection id = %d, cur len = %d, total bytes read = %d\n", + event->header_.connection_id_, + event->header_.len_, + data_received + header_received)); if (Options::instance ()->enabled (Options::VERBOSE)) - ACE_DEBUG ((LM_DEBUG, "data_ = %*s\n", event->header_.len_ - 2, event->data_)); + ACE_DEBUG ((LM_DEBUG, + "data_ = %*s\n", + event->header_.len_ - 2, + event->data_)); // Encode before returning so that we can set things out in // network byte order. @@ -356,7 +361,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) // gatewayd, as well as stdio). int -Supplier_Proxy::handle_input (ACE_HANDLE) +Supplier_Handler::handle_input (ACE_HANDLE) { ACE_Message_Block *forward_addr = 0; @@ -365,9 +370,9 @@ Supplier_Proxy::handle_input (ACE_HANDLE) case 0: // Note that a peer should never initiate a shutdown by closing // the connection. Instead, it should reconnect. - this->state (Proxy_Handler::FAILED); + this->state (Connection_Handler::FAILED); ACE_ERROR_RETURN ((LM_ERROR, - "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n", + "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n", this->id ()), -1); /* NOTREACHED */ case -1: @@ -376,8 +381,8 @@ Supplier_Proxy::handle_input (ACE_HANDLE) return 0; else // A weird problem occurred, shut down and start again. { - this->state (Proxy_Handler::FAILED); - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n", + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n", "Peer has failed unexpectedly", this->id ()), -1); } @@ -388,32 +393,32 @@ Supplier_Proxy::handle_input (ACE_HANDLE) } // Forward an event to its appropriate Consumer(s). This delegates to -// the <ACE_Event_Channel> to do the actual forwarding. +// the <Event_Channel> to do the actual forwarding. int -Supplier_Proxy::forward (ACE_Message_Block *forward_addr) +Supplier_Handler::forward (ACE_Message_Block *forward_addr) { return this->event_channel_->put (forward_addr); } #if defined (ACE_HAS_THREADS) -Thr_Consumer_Proxy::Thr_Consumer_Proxy (const Proxy_Config_Info &pci) - : Consumer_Proxy (pci) +Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci) + : Consumer_Handler (pci) { } // This method should be called only when the Consumer shuts down -// unexpectedly. This method marks the Proxy_Handler as having failed +// unexpectedly. This method marks the Connection_Handler as having failed // and deactivates the ACE_Message_Queue (to wake up the thread // blocked on <dequeue_head> in svc()). // Thr_Output_Handler::handle_close () will eventually try to // reconnect... int -Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) +Thr_Consumer_Handler::handle_input (ACE_HANDLE h) { - // Call down to the <Consumer_Proxy> to handle this first. - this->Consumer_Proxy::handle_input (h); + // Call down to the <Consumer_Handler> to handle this first. + this->Consumer_Handler::handle_input (h); ACE_Reactor::instance ()->remove_handler (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); @@ -423,19 +428,19 @@ Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) return 0; } -// Initialize the threaded Consumer_Proxy object and spawn a new +// Initialize the threaded Consumer_Handler object and spawn a new // thread. int -Thr_Consumer_Proxy::open (void *) +Thr_Consumer_Handler::open (void *) { // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); // Call back to the <Event_Channel> to complete our initialization. - else if (this->event_channel_->complete_proxy_connection (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); + else if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); // Register ourselves to receive input events (which indicate that // the Consumer has shut down unexpectedly). @@ -461,10 +466,10 @@ Thr_Consumer_Proxy::open (void *) } // Queue up an event for transmission (must not block since -// Supplier_Proxys may be single-threaded). +// Supplier_Handlers may be single-threaded). int -Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *) +Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) { // Perform non-blocking enqueue. return this->msg_queue ()->enqueue_tail @@ -475,13 +480,13 @@ Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *) // threads...) int -Thr_Consumer_Proxy::svc (void) +Thr_Consumer_Handler::svc (void) { for (;;) { ACE_DEBUG ((LM_DEBUG, - "(%t) Thr_Consumer_Proxy's handle = %d\n", + "(%t) Thr_Consumer_Handler's handle = %d\n", this->peer ().get_handle ())); // Since this method runs in its own thread it is OK to block on @@ -496,14 +501,14 @@ Thr_Consumer_Proxy::svc (void) ACE_ASSERT (errno == ESHUTDOWN); ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", + "(%t) shutting down threaded Consumer_Handler %d on handle %d\n", this->id (), this->get_handle ())); this->peer ().close (); for (this->timeout (1); // Default is to reconnect synchronously. - this->event_channel_->initiate_proxy_connection (this) == -1; ) + this->event_channel_->initiate_connection_connection (this) == -1; ) { ACE_Time_Value tv (this->timeout ()); @@ -519,21 +524,21 @@ Thr_Consumer_Proxy::svc (void) return 0; } -Thr_Supplier_Proxy::Thr_Supplier_Proxy (const Proxy_Config_Info &pci) - : Supplier_Proxy (pci) +Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci) + : Supplier_Handler (pci) { } int -Thr_Supplier_Proxy::open (void *) +Thr_Supplier_Handler::open (void *) { // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); // Call back to the <Event_Channel> to complete our initialization. - else if (this->event_channel_->complete_proxy_connection (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); + else if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue @@ -556,24 +561,25 @@ Thr_Supplier_Proxy::open (void *) // existing code!). int -Thr_Supplier_Proxy::svc (void) +Thr_Supplier_Handler::svc (void) { for (;;) { ACE_DEBUG ((LM_DEBUG, - "(%t) Thr_Supplier_Proxy's handle = %d\n", + "(%t) Thr_Supplier_Handler's handle = %d\n", this->peer ().get_handle ())); // Since this method runs in its own thread and processes events // for one connection it is OK to call down to the - // <Supplier_Proxy::handle_input> method, which blocks on input. + // <Supplier_Handler::handle_input> method, which blocks on input. - while (this->Supplier_Proxy::handle_input () != -1) + while (this->Supplier_Handler::handle_input () != -1) continue; ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n", - this->id (), this->get_handle ())); + "(%t) shutting down threaded Supplier_Handler %d on handle %d\n", + this->id (), + this->get_handle ())); this->peer ().close (); @@ -582,7 +588,7 @@ Thr_Supplier_Proxy::svc (void) for (this->timeout (1); // Default is to reconnect synchronously. - this->event_channel_->initiate_proxy_connection (this) == -1; ) + this->event_channel_->initiate_connection_connection (this) == -1; ) { ACE_Time_Value tv (this->timeout ()); ACE_ERROR ((LM_ERROR, diff --git a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.h b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h index 24d02094dde..a14d97b9d3e 100644 --- a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.h +++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h @@ -7,10 +7,10 @@ // gateway // // = FILENAME -// Concrete_Proxy_Handlers.h +// Concrete_Connection_Handlers.h // // = DESCRIPTION -// These are all the subclasses of Proxy_Handler that define the +// These are all the subclasses of Connection_Handler that define the // appropriate threaded/reactive Consumer/Supplier behavior. // // = AUTHOR @@ -18,21 +18,23 @@ // // ============================================================================ -#if !defined (_CONCRETE_PROXY_HANDLER) -#define _CONCRETE_PROXY_HANDLER +#if !defined (CONCRETE_CONNECTION_HANDLER) +#define CONCRETE_CONNECTION_HANDLER -#include "Proxy_Handler.h" +#include "Connection_Handler.h" -class Supplier_Proxy : public Proxy_Handler +class Supplier_Handler : public Connection_Handler +{ // = TITLE - // Handles reception of Events from Suppliers + // Handles reception of Events from Suppliers. // // = DESCRIPTION - // Performs framing and error checking. -{ + // Performs framing and error checking on Events. Intended to + // run reactively, i.e., in one thread of control using a + // Reactor for demuxing and dispatching. public: // = Initialization method. - Supplier_Proxy (const Proxy_Config_Info &); + Supplier_Handler (const Connection_Config_Info &); protected: // = All the following methods are upcalls, so they can be protected. @@ -45,33 +47,32 @@ protected: int forward (ACE_Message_Block *event); // Forward the <event> to its appropriate Consumer. This delegates - // to the <ACE_Event_Channel> to do the actual forwarding. + // to the <Event_Channel> to do the actual forwarding. ACE_Message_Block *msg_frag_; // Keep track of event fragment to handle non-blocking recv's from // Suppliers. }; -class Consumer_Proxy : public Proxy_Handler +class Consumer_Handler : public Connection_Handler +{ // = TITLE // Handles transmission of events to Consumers. // // = DESCRIPTION - // Performs queueing and error checking. Uses a single-threaded - // Reactive approach to handle flow control. -{ + // Performs queueing and error checking. Intended to run + // reactively, i.e., in one thread of control using a Reactor + // for demuxing and dispatching. Also uses a Reactor to handle + // flow controlled output connections. public: // = Initialization method. - Consumer_Proxy (const Proxy_Config_Info &); + Consumer_Handler (const Connection_Config_Info &); virtual int put (ACE_Message_Block *event, ACE_Time_Value * = 0); // Send an event to a Consumer (may be queued if necessary). protected: - // = We'll allow up to 16 megabytes to be queued per-output proxy. - enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16}; - virtual int handle_output (ACE_HANDLE); // Finish sending event when flow control conditions abate. @@ -85,15 +86,15 @@ protected: // Receive and process shutdowns from a Consumer. }; -class Thr_Consumer_Proxy : public Consumer_Proxy - // = TITLE - // Runs each Output Proxy_Handler in a separate thread. +class Thr_Consumer_Handler : public Consumer_Handler { + // = TITLE + // Runs each <Consumer_Handler> in a separate thread. public: - Thr_Consumer_Proxy (const Proxy_Config_Info &); + Thr_Consumer_Handler (const Connection_Config_Info &); virtual int open (void *); - // Initialize the threaded Consumer_Proxy object and spawn a new + // Initialize the threaded Consumer_Handler object and spawn a new // thread. virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); @@ -107,12 +108,12 @@ protected: // Transmit peer messages. }; -class Thr_Supplier_Proxy : public Supplier_Proxy - // = TITLE - // Runs each Input Proxy_Handler in a separate thread. +class Thr_Supplier_Handler : public Supplier_Handler { + // = TITLE + // Runs each <Supplier_Handler> in a separate thread. public: - Thr_Supplier_Proxy (const Proxy_Config_Info &pci); + Thr_Supplier_Handler (const Connection_Config_Info &pci); virtual int open (void *); // Initialize the object and spawn a new thread. @@ -122,4 +123,4 @@ protected: // Transmit peer messages. }; -#endif /* _CONCRETE_PROXY_HANDLER */ +#endif /* CONCRETE_CONNECTION_HANDLER */ diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index f1d784c2bcf..062b16040f2 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -19,18 +19,13 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry, // Ignore comments, check for EOF and EOLINE if this succeeds, we // have our connection id. - while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS) + while ((result = this->getint (entry.connection_id_)) != FP::SUCCESS) if (result == FP::EOFILE) return FP::EOFILE; else if (result == FP::EOLINE || result == FP::COMMENT) line_number++; - // Get the supplier id. - result = this->getint (entry.supplier_id_); - if (result != FP::SUCCESS) - return result; - // Get the payload type. result = this->getint (entry.type_); if (result != FP::SUCCESS) @@ -50,7 +45,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry, } FP_RETURN_TYPE -Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, +Connection_Config_File_Parser::read_entry (Connection_Config_Info &entry, int &line_number) { char buf[BUFSIZ]; @@ -62,7 +57,7 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, // Ignore comments, check for EOF and EOLINE if this succeeds, we // have our connection id - while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS) + while ((result = this->getint (entry.connection_id_)) != FP::SUCCESS) if (result == FP::EOFILE) return FP::EOFILE; else if (result == FP::EOLINE @@ -85,11 +80,11 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, if (result != FP::SUCCESS) return result; else - entry.proxy_role_ = buf[0]; + entry.connection_role_ = buf[0]; - if (entry.proxy_role_ == 'C') + if (entry.connection_role_ == 'C') entry.remote_port_ = Options::instance ()->consumer_connector_port (); - else if (entry.proxy_role_ == 'S') + else if (entry.connection_role_ == 'S') entry.remote_port_ = Options::instance ()->supplier_connector_port (); else // Yikes, this is a *weird* error! @@ -106,7 +101,7 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, if (result != FP::SUCCESS) return result; else - entry.proxy_role_ = buf[0]; + entry.connection_role_ = buf[0]; } // Get the max retry delay. @@ -118,7 +113,9 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, // Get the local port number. result = this->getint (port); - if (result != FP::SUCCESS) + if (result == FP::DEFAULT) + entry.local_port_ = 0; // @@ Should make this an option. + else if (result != FP::SUCCESS) return result; else entry.local_port_ = u_short (port); @@ -136,70 +133,87 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, } #if defined (DEBUGGING) -int main (int argc, char *argv[]) +int +main (int argc, char *argv[]) { - if (argc != 4) - { - // ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1); - cerr << argv[0] << " CCfilename filename Mapfilename.\n"; - exit (1); - } FP_RETURN_TYPE result; - Proxy_Config_Info entry; - Proxy_Config_File_Parser CCfile; - - CCfile.open (argv[1]); - int line_number = 0; - printf ("ConnID\tHost\t\tRPort\tDir\tRetry\tLPort\n"); - - // Read config file line at a time. - while ((result = CCfile.read_entry (entry, line_number)) != EOF) - { - if (result != FP::SUCCESS) - // ACE_DEBUG ((LM_DEBUG, "Error line %d.\n", line_number)); - cerr << "Error at line " << line_number << endl; - else - printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n", - entry.proxy_id_, entry.host_, entry.remote_port_, entry.proxy_role_, - entry.max_retry_timeout_, entry.transform_, entry.local_port_); - } - CCfile.close(); - - Consumer_Config_Info entry; - Consumer_Config_File_Parser file; + { + Connection_Config_Info entry; + Connection_Config_File_Parser connection_config_file; - file.open (argv[2]); + connection_config_file.open (argc > 1 ? argv[1] : "connection_config"); - line_number = 0; + int line_number = 0; - printf ("\nConnID\tLogic\tPayload\tDestinations\n"); + ACE_DEBUG ((LM_DEBUG, + "ConnID\tHost\t\tRPort\tRole\tRetry\tLPort\tPriority\n")); - // Read config file line at a time. - while ((result = file.read_entry (entry, line_number)) != EOF) - { - if (result != FP::SUCCESS) - cerr << "Error at line " << line_number << endl; + // Read config file line at a time. + while ((result = connection_config_file.read_entry (entry, line_number)) != FP::EOFILE) + if (result == FP::PARSE_ERROR) + ACE_DEBUG ((LM_DEBUG, + "Error line %d.\n", + line_number)); + else + ACE_DEBUG ((LM_DEBUG, + "%d\t%s\t%d\t%c\t%d\t%d\t%d\n", + entry.connection_id_, + entry.host_, + entry.remote_port_, + entry.connection_role_, + entry.max_retry_timeout_, + entry.local_port_, + entry.priority_)); + + connection_config_file.close (); + } + + { + Consumer_Config_Info entry; + Consumer_Config_File_Parser consumer_config_file; + + consumer_config_file.open (argc > 2 ? argv[2] : "consumer_config"); + + line_number = 0; + + ACE_DEBUG ((LM_DEBUG, + "\nConnID\tLogic\tPayload\tDestinations\n")); + + // Read config file line at a time. + while ((result = consumer_config_file.read_entry (entry, line_number)) != FP::EOFILE) + if (result == FP::PARSE_ERROR) + ACE_DEBUG ((LM_DEBUG, + "Error line %d.\n", + line_number)); else { - printf ("%d\t%d\t%d\t%d\t", - entry.proxy_id_, entry.supplier_id_, entry.type_); + ACE_DEBUG ((LM_DEBUG, + "%d\t%d\t%d\t%d\t", + entry.connection_id_, + entry.supplier_id_, + entry.type_)); + while (--entry.total_consumers_ >= 0) - printf ("%d,", entry.consumers_[entry.total_consumers_]); - printf ("\n"); + ACE_DEBUG ((LM_DEBUG, + "%d,", + entry.consumers_[entry.total_consumers_])); + ACE_DEBUG ((LM_DEBUG, + "\n")); } - } - file.close(); + + consumer_config_file.close (); + } return 0; } #endif /* DEBUGGING */ #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class File_Parser<Proxy_Config_Info>; +template class File_Parser<Connection_Config_Info>; template class File_Parser<Consumer_Config_Info>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate File_Parser<Proxy_Config_Info> +#pragma instantiate File_Parser<Connection_Config_Info> #pragma instantiate File_Parser<Consumer_Config_Info> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 2f1f1280bc2..1199c615833 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -17,19 +17,19 @@ #if !defined (_CONFIG_FILES) #define _CONFIG_FILES -#include "ace/OS.h" #include "File_Parser.h" +#include "Event.h" // Forward declaration. -class ACE_Event_Channel; +class Event_Channel; -class Proxy_Config_Info +class Connection_Config_Info // = TITLE // Stores connection configuration information. { public: - ACE_INT32 proxy_id_; - // Connection id for this Proxy_Handler. + ACE_INT32 connection_id_; + // Connection id for this Connection_Handler. char host_[BUFSIZ]; // Host to connect with. @@ -37,7 +37,7 @@ public: u_short remote_port_; // Port to connect with. - char proxy_role_; + char connection_role_; // 'S' (supplier) or 'C' (consumer). ACE_INT32 max_retry_timeout_; @@ -50,19 +50,19 @@ public: // Priority by which different Consumers and Suppliers should be // serviced. - ACE_Event_Channel *event_channel_; + Event_Channel *event_channel_; // We just need a place to store this until we can pass it along - // when creating a Proxy_Handler. + // when creating a Connection_Handler. }; -class Proxy_Config_File_Parser : public File_Parser<Proxy_Config_Info> +class Connection_Config_File_Parser : public File_Parser<Connection_Config_Info> // = TITLE - // Parser for the Proxy_Handler Connection file. + // Parser for the Connection_Handler Connection file. { public: - virtual FP::Return_Type read_entry (Proxy_Config_Info &entry, + virtual FP::Return_Type read_entry (Connection_Config_Info &entry, int &line_number); - // Read in a <Proxy_Config_Info> entry. + // Read in a <Connection_Config_Info> entry. }; @@ -77,12 +77,9 @@ public: // Total number of multicast consumers. }; - ACE_INT32 proxy_id_; + ACE_INT32 connection_id_; // Connection id for this proxy. - ACE_INT32 supplier_id_; - // Logical supplier id for this proxy. - ACE_INT32 type_; // Message type. diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Connection_Handler.cpp index af12f5b6bff..3b9e8909dbc 100644 --- a/apps/Gateway/Gateway/Proxy_Handler.cpp +++ b/apps/Gateway/Gateway/Connection_Handler.cpp @@ -2,16 +2,16 @@ #define ACE_BUILD_SVC_DLL #include "Event_Channel.h" -#include "Concrete_Proxy_Handlers.h" +#include "Concrete_Connection_Handlers.h" void -Proxy_Handler::id (ACE_INT32 id) +Connection_Handler::id (ACE_INT32 id) { this->id_ = id; } ACE_INT32 -Proxy_Handler::id (void) +Connection_Handler::id (void) { return this->id_; } @@ -19,27 +19,27 @@ Proxy_Handler::id (void) // The total number of bytes sent/received on this Proxy. size_t -Proxy_Handler::total_bytes (void) +Connection_Handler::total_bytes (void) { return this->total_bytes_; } void -Proxy_Handler::total_bytes (size_t bytes) +Connection_Handler::total_bytes (size_t bytes) { this->total_bytes_ += bytes; } -Proxy_Handler::Proxy_Handler (void) +Connection_Handler::Connection_Handler (void) { } -Proxy_Handler::Proxy_Handler (const Proxy_Config_Info &pci) +Connection_Handler::Connection_Handler (const Connection_Config_Info &pci) : remote_addr_ (pci.remote_port_, pci.host_), local_addr_ (pci.local_port_), - id_ (pci.proxy_id_), + id_ (pci.connection_id_), total_bytes_ (0), - state_ (Proxy_Handler::IDLE), + state_ (Connection_Handler::IDLE), timeout_ (1), max_timeout_ (pci.max_retry_timeout_), event_channel_ (pci.event_channel_) @@ -48,26 +48,26 @@ Proxy_Handler::Proxy_Handler (const Proxy_Config_Info &pci) this->priority (int (pci.priority_)); } -// Set the proxy_role. +// Set the connection_role. void -Proxy_Handler::proxy_role (char d) +Connection_Handler::connection_role (char d) { - this->proxy_role_ = d; + this->connection_role_ = d; } -// Get the proxy_role. +// Get the connection_role. char -Proxy_Handler::proxy_role (void) +Connection_Handler::connection_role (void) { - return this->proxy_role_; + return this->connection_role_; } // Sets the timeout delay. void -Proxy_Handler::timeout (int to) +Connection_Handler::timeout (int to) { if (to > this->max_timeout_) to = this->max_timeout_; @@ -80,7 +80,7 @@ Proxy_Handler::timeout (int to) // re-calculation). int -Proxy_Handler::timeout (void) +Connection_Handler::timeout (void) { int old_timeout = this->timeout_; this->timeout_ *= 2; @@ -94,7 +94,7 @@ Proxy_Handler::timeout (void) // Sets the max timeout delay. void -Proxy_Handler::max_timeout (int mto) +Connection_Handler::max_timeout (int mto) { this->max_timeout_ = mto; } @@ -102,7 +102,7 @@ Proxy_Handler::max_timeout (int mto) // Gets the max timeout delay. int -Proxy_Handler::max_timeout (void) +Connection_Handler::max_timeout (void) { return this->max_timeout_; } @@ -110,54 +110,54 @@ Proxy_Handler::max_timeout (void) // Restart connection asynchronously when timeout occurs. int -Proxy_Handler::handle_timeout (const ACE_Time_Value &, +Connection_Handler::handle_timeout (const ACE_Time_Value &, const void *) { ACE_DEBUG ((LM_DEBUG, - "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n", + "(%t) attempting to reconnect Connection_Handler %d with timeout = %d\n", this->id (), this->timeout_)); // Delegate the re-connection attempt to the Event Channel. - this->event_channel_->initiate_proxy_connection (this); + this->event_channel_->initiate_connection_connection (this); return 0; } -// Handle shutdown of the Proxy_Handler object. +// Handle shutdown of the Connection_Handler object. int -Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +Connection_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { ACE_DEBUG ((LM_DEBUG, - "(%t) shutting down %s Proxy_Handler %d on handle %d\n", - this->proxy_role () == 'C' ? "Consumer" : "Supplier", + "(%t) shutting down %s Connection_Handler %d on handle %d\n", + this->connection_role () == 'C' ? "Consumer" : "Supplier", this->id (), this->get_handle ())); // Restart the connection, if possible. - return this->event_channel_->reinitiate_proxy_connection (this); + return this->event_channel_->reinitiate_connection_connection (this); } // Set the state of the Proxy. void -Proxy_Handler::state (Proxy_Handler::State s) +Connection_Handler::state (Connection_Handler::State s) { this->state_ = s; } // Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates -// control to our Proxy_Handler. +// control to our Connection_Handler. int -Proxy_Handler::open (void *) +Connection_Handler::open (void *) { - ACE_DEBUG ((LM_DEBUG, "(%t) %s Proxy_Handler's handle = %d\n", - this->proxy_role () == 'C' ? "Consumer" : "Supplier", + ACE_DEBUG ((LM_DEBUG, "(%t) %s Connection_Handler's handle = %d\n", + this->connection_role () == 'C' ? "Consumer" : "Supplier", this->peer ().get_handle ())); // Call back to the <Event_Channel> to complete our initialization. - if (this->event_channel_->complete_proxy_connection (this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); + if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); // Turn on non-blocking I/O. else if (this->peer ().enable (ACE_NONBLOCK) == -1) @@ -173,72 +173,72 @@ Proxy_Handler::open (void *) // Return the current state of the Proxy. -Proxy_Handler::State -Proxy_Handler::state (void) +Connection_Handler::State +Connection_Handler::state (void) { return this->state_; } ACE_INET_Addr & -Proxy_Handler::remote_addr (void) +Connection_Handler::remote_addr (void) { return this->remote_addr_; } ACE_INET_Addr & -Proxy_Handler::local_addr (void) +Connection_Handler::local_addr (void) { return this->local_addr_; } -// Make the appropriate type of <Proxy_Handler> (i.e., -// <Consumer_Proxy>, <Supplier_Proxy>, <Thr_Consumer_Proxy>, or -// <Thr_Supplier_Proxy>). +// Make the appropriate type of <Connection_Handler> (i.e., +// <Consumer_Handler>, <Supplier_Handler>, <Thr_Consumer_Handler>, or +// <Thr_Supplier_Handler>). -Proxy_Handler * -Proxy_Handler_Factory::make_proxy_handler (const Proxy_Config_Info &pci) +Connection_Handler * +Connection_Handler_Factory::make_connection_handler (const Connection_Config_Info &pci) { - Proxy_Handler *proxy_handler = 0; + Connection_Handler *connection_handler = 0; // The next few lines of code are dependent on whether we are making - // a threaded/reactive Supplier_Proxy/Consumer_Proxy. + // a threaded/reactive Supplier_Handler/Consumer_Handler. - if (pci.proxy_role_ == 'C') // Configure a Consumer_Proxy. + if (pci.connection_role_ == 'C') // Configure a Consumer_Handler. { #if defined (ACE_HAS_THREADS) - // Create a threaded Consumer_Proxy. + // Create a threaded Consumer_Handler. if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (), Options::OUTPUT_MT)) - ACE_NEW_RETURN (proxy_handler, - Thr_Consumer_Proxy (pci), + ACE_NEW_RETURN (connection_handler, + Thr_Consumer_Handler (pci), 0); - // Create a reactive Consumer_Proxy. + // Create a reactive Consumer_Handler. else #endif /* ACE_HAS_THREADS */ - ACE_NEW_RETURN (proxy_handler, - Consumer_Proxy (pci), + ACE_NEW_RETURN (connection_handler, + Consumer_Handler (pci), 0); } - else // proxy_role == 'S', so configure a Supplier_Proxy. + else // connection_role == 'S', so configure a Supplier_Handler. { #if defined (ACE_HAS_THREADS) - // Create a threaded Supplier_Proxy. + // Create a threaded Supplier_Handler. if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (), Options::INPUT_MT)) - ACE_NEW_RETURN (proxy_handler, - Thr_Supplier_Proxy (pci), + ACE_NEW_RETURN (connection_handler, + Thr_Supplier_Handler (pci), 0); - // Create a reactive Supplier_Proxy. + // Create a reactive Supplier_Handler. else #endif /* ACE_HAS_THREAD */ - ACE_NEW_RETURN (proxy_handler, - Supplier_Proxy (pci), + ACE_NEW_RETURN (connection_handler, + Supplier_Handler (pci), 0); } - return proxy_handler; + return connection_handler; } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Connection_Handler.h index 76bd4024478..665c1635a97 100644 --- a/apps/Gateway/Gateway/Proxy_Handler.h +++ b/apps/Gateway/Gateway/Connection_Handler.h @@ -7,15 +7,15 @@ // gateway // // = FILENAME -// Proxy_Handler.h +// Connection_Handler.h // // = AUTHOR // Doug Schmidt // // ============================================================================ -#if !defined (_PROXY_HANDLER) -#define _PROXY_HANDLER +#if !defined (_CONNECTION_HANDLER) +#define _CONNECTION_HANDLER #include "ace/Service_Config.h" #include "ace/SOCK_Connector.h" @@ -25,25 +25,26 @@ #include "Event.h" // Forward declaration. -class ACE_Event_Channel; +class Event_Channel; -class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH> +class Connection_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH> +{ // = TITLE - // Proxy_Handler contains info about connection state and addressing. + // Connection_Handler contains info about connection state and + // addressing. // // = DESCRIPTION - // The Proxy_Handler classes process events sent to the Event - // Channel from Suppliers and forward them to Consumers. -{ + // The Connection_Handler classes process events sent to the + // Event Channel from Suppliers and forward them to Consumers. public: - Proxy_Handler (void); + Connection_Handler (void); // Default constructor (needed to make <ACE_Connector> happy). - Proxy_Handler (const Proxy_Config_Info &); + Connection_Handler (const Connection_Config_Info &); // Real constructor. virtual int open (void * = 0); - // Initialize and activate a single-threaded Proxy_Handler (called by + // Initialize and activate a single-threaded Connection_Handler (called by // ACE_Connector::handle_output()). ACE_INET_Addr &remote_addr (void); @@ -56,14 +57,14 @@ public: ACE_INT32 id (void); void id (ACE_INT32); - // = The current state of the Proxy_Handler. + // = The current state of the Connection_Handler. enum State { IDLE = 1, // Prior to initialization. CONNECTING, // During connection establishment. - ESTABLISHED, // Proxy_Handler is established and active. - DISCONNECTING, // Proxy_Handler is in the process of connecting. - FAILED // Proxy_Handler has failed. + ESTABLISHED, // Connection_Handler is established and active. + DISCONNECTING, // Connection_Handler is in the process of connecting. + FAILED // Connection_Handler has failed. }; // = Set/get the current state. @@ -80,8 +81,8 @@ public: // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer // (necessary for error checking). - void proxy_role (char); - char proxy_role (void); + void connection_role (char); + char connection_role (void); // = The total number of bytes sent/received on this proxy. size_t total_bytes (void); @@ -89,11 +90,11 @@ public: // Increment count by <bytes>. virtual int handle_timeout (const ACE_Time_Value &, const void *arg); - // Perform timer-based Proxy_Handler reconnection. + // Perform timer-based Connection_Handler reconnection. virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); - // Perform Proxy_Handler termination. + // Perform Connection_Handler termination. protected: enum @@ -122,27 +123,27 @@ protected: int max_timeout_; // Maximum amount of time to wait between reconnection attempts. - char proxy_role_; + char connection_role_; // Indicates which role the proxy plays ('S' == Supplier and 'C' == // Consumer). - ACE_Event_Channel *event_channel_; - // Reference to the <ACE_Event_Channel> that we use to forward all + Event_Channel *event_channel_; + // Reference to the <Event_Channel> that we use to forward all // the events from Consumers and Suppliers. }; -class Proxy_Handler_Factory : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +class Connection_Handler_Factory +{ // = TITLE - // Creates the appropriate type of <Proxy_Handler> + // Creates the appropriate type of <Connection_Handler>. // // = DESCRIPTION - // <Proxy_Handler>s can include <Consumer_Proxy>, - // <Supplier_Proxy>, <Thr_Consumer_Proxy>, or - // <Thr_Supplier_Proxy>). -{ + // <Connection_Handler>s can include <Consumer_Handler>, + // <Supplier_Handler>, <Thr_Consumer_Handler>, or + // <Thr_Supplier_Handler>). public: - Proxy_Handler *make_proxy_handler (const Proxy_Config_Info &); - // Make the appropriate type of <Proxy_Handler>. + Connection_Handler *make_connection_handler (const Connection_Config_Info &); + // Make the appropriate type of <Connection_Handler>. }; -#endif /* _PROXY_HANDLER */ +#endif /* _CONNECTION_HANDLER */ diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp new file mode 100644 index 00000000000..f38aa21a23a --- /dev/null +++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp @@ -0,0 +1,33 @@ +// $Id$ + +#include "Event_Channel.h" +#include "Connection_Handler_Acceptor.h" + +int +Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ph) +{ + ACE_ALLOCATOR_RETURN (ph, + this->connection_handler_factory_.make_connection_handler (this->connection_config_info_), + -1); + return 0; +} + +Connection_Handler_Acceptor::Connection_Handler_Acceptor (Event_Channel &ec, + char connection_role) + : event_channel_ (ec) +{ + this->connection_config_info_.connection_id_ = 0; + this->connection_config_info_.host_[0] = '\0'; + this->connection_config_info_.remote_port_ = 0; + this->connection_config_info_.connection_role_ = connection_role; + this->connection_config_info_.max_retry_timeout_ = Options::instance ()->max_timeout (); + this->connection_config_info_.local_port_ = 0; + this->connection_config_info_.priority_ = 1; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.h b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h new file mode 100644 index 00000000000..fc54a363595 --- /dev/null +++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h @@ -0,0 +1,51 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Connection_Handler_acceptor.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_CONNECTION_HANDLER_ACCEPTOR) +#define _CONNECTION_HANDLER_ACCEPTOR + +#include "ace/Acceptor.h" +#include "ace/SOCK_Acceptor.h" +#include "Connection_Handler.h" + +// Forward declaration +class Event_Channel; + +class Connection_Handler_Acceptor : public ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR> +{ + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new Connection_Handler object to do the dirty + // work... +public: + Connection_Handler_Acceptor (Event_Channel &, + char connection_role); + + virtual int make_svc_handler (Connection_Handler *&ph); + // Hook method for creating an appropriate <Connection_Handler>. + +protected: + Event_Channel &event_channel_; + // Reference to the event channel. + + Connection_Config_Info connection_config_info_; + // Keeps track of what type of proxy we need to create. + + Connection_Handler_Factory connection_handler_factory_; + // Make the appropriate type of <Connection_Handler>. +}; + +#endif /* _CONNECTION_HANDLER_ACCEPTOR */ diff --git a/apps/Gateway/Gateway/Connection_Handler_Connector.cpp b/apps/Gateway/Gateway/Connection_Handler_Connector.cpp new file mode 100644 index 00000000000..56ac92d9c2f --- /dev/null +++ b/apps/Gateway/Gateway/Connection_Handler_Connector.cpp @@ -0,0 +1,73 @@ +// $Id$ + +#include "Connection_Handler_Connector.h" + +Connection_Handler_Connector::Connection_Handler_Connector (void) +{ +} + +// Initiate (or reinitiate) a connection to the Connection_Handler. + +int +Connection_Handler_Connector::initiate_connection (Connection_Handler *connection_handler, + ACE_Synch_Options &synch_options) +{ + char addr_buf[MAXHOSTNAMELEN]; + + // Mark ourselves as idle so that the various iterators + // will ignore us until we are reconnected. + connection_handler->state (Connection_Handler::IDLE); + + // We check the remote addr second so that it remains in the addr_buf. + if (connection_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1 + || connection_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "can't obtain peer's address"), -1); + + // Try to connect to the Peer. + + if (this->connect (connection_handler, connection_handler->remote_addr (), + synch_options, connection_handler->local_addr ()) == -1) + { + if (errno != EWOULDBLOCK) + { + connection_handler->state (Connection_Handler::FAILED); + ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", + "connect", addr_buf)); + + return -1; + } + else + { + connection_handler->state (Connection_Handler::CONNECTING); + ACE_DEBUG ((LM_DEBUG, + "(%t) in the process of connecting to %s\n", + addr_buf)); + } + } + else + { + connection_handler->state (Connection_Handler::ESTABLISHED); + ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", + addr_buf, connection_handler->get_handle ())); + } + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR>; +template class ACE_Svc_Tuple<Connection_Handler>; +template class ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>; +template class ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR> +#pragma instantiate ACE_Svc_Tuple<Connection_Handler> +#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX> +#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.h b/apps/Gateway/Gateway/Connection_Handler_Connector.h index 4e5cc79640e..e81d66d9694 100644 --- a/apps/Gateway/Gateway/Proxy_Handler_Connector.h +++ b/apps/Gateway/Gateway/Connection_Handler_Connector.h @@ -7,7 +7,7 @@ // gateway // // = FILENAME -// Proxy_Handler_Connector.h +// Connection_Handler_Connector.h // // = AUTHOR // Doug Schmidt @@ -19,18 +19,19 @@ #include "ace/Connector.h" #include "ace/SOCK_Connector.h" -#include "Proxy_Handler.h" +#include "Connection_Handler.h" -class Proxy_Handler_Connector : public ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR> - // = TITLE - // A concrete factory class that setups connections to peerds - // and produces a new Proxy_Handler object to do the dirty work... +class Connection_Handler_Connector : public ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR> { + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new Connection_Handler object to do the dirty + // work... public: - Proxy_Handler_Connector (void); + Connection_Handler_Connector (void); - // Initiate (or reinitiate) a connection on the Proxy_Handler. - int initiate_connection (Proxy_Handler *, + // Initiate (or reinitiate) a connection on the Connection_Handler. + int initiate_connection (Connection_Handler *, ACE_Synch_Options & = ACE_Synch_Options::synch); }; diff --git a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h index 5ff672679c2..2f89143460d 100644 --- a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h +++ b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h @@ -14,15 +14,15 @@ // // ============================================================================ -#if !defined (_DISPATCH_SET) -#define _DISPATCH_SET +#if !defined (CONSUMER_DISPATCH_SET) +#define CONSUMER_DISPATCH_SET #include "ace/Containers.h" // Forward reference. -class Proxy_Handler; +class Connection_Handler; -typedef ACE_Unbounded_Set<Proxy_Handler *> Consumer_Dispatch_Set; -typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Consumer_Dispatch_Set_Iterator; +typedef ACE_Unbounded_Set<Connection_Handler *> Consumer_Dispatch_Set; +typedef ACE_Unbounded_Set_Iterator<Connection_Handler *> Consumer_Dispatch_Set_Iterator; -#endif /* _DISPATCH_SET */ +#endif /* CONSUMER_DISPATCH_SET */ diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h index f88b1770d2f..58ef1f0a97b 100644 --- a/apps/Gateway/Gateway/Event.h +++ b/apps/Gateway/Gateway/Event.h @@ -19,6 +19,11 @@ #include "ace/OS.h" +// = The following #defines should really be in a separate include +// file that is shared with the ../Peer/ directory. For now, we'll +// keep them here to simplify the sharing between the two directories. +// BTW, this is also the reason why all the methods are inlined... + // Used by Peers to create Consumers in a Gateway. #if !defined (DEFAULT_GATEWAY_CONSUMER_PORT) #define DEFAULT_GATEWAY_CONSUMER_PORT 10009 @@ -39,11 +44,12 @@ #define DEFAULT_PEER_SUPPLIER_PORT 10012 #endif /* DEFAULT_PEER_SUPPLIER_PORT */ -// This is the unique connection identifier that denotes a particular -// Proxy_Handler in the Gateway. -typedef ACE_INT32 ACE_INT32; +// This is the unique supplier identifier that denotes a particular +// <Connection_Handler> in the Gateway. +typedef ACE_INT32 CONNECTION_ID; class Event_Key +{ // = TITLE // Address used to identify the source/destination of an event. // @@ -51,45 +57,35 @@ class Event_Key // This is really a "virtual forwarding address" thatis used to // decouple the filtering and forwarding logic of the Event // Channel from the format of the data. -{ public: - Event_Key (ACE_INT32 cid = -1, - u_char sid = 0, - u_char type = 0) - : proxy_id_ (cid), - supplier_id_ (sid), + Event_Key (CONNECTION_ID cid = -1, + u_char type = 0) + : connection_id_ (cid), type_ (type) {} int operator== (const Event_Key &event_addr) const { - return this->proxy_id_ == event_addr.proxy_id_ - && this->supplier_id_ == event_addr.supplier_id_ + return this->connection_id_ == event_addr.connection_id_ && this->type_ == event_addr.type_; } - ACE_INT32 proxy_id_; + CONNECTION_ID connection_id_; // Unique connection identifier that denotes a particular - // Proxy_Handler. - - ACE_INT32 supplier_id_; - // Logical ID. + // Connection_Handler. ACE_INT32 type_; // Event type. }; class Event_Header +{ // = TITLE // Fixed sized header. // // = DESCRIPTION // This is designed to have a sizeof (16) to avoid alignment // problems on most platforms. -{ public: - typedef ACE_INT32 SUPPLIER_ID; - // Type used to forward events from gatewayd. - enum { INVALID_ID = -1 // No peer can validly use this number. @@ -98,7 +94,6 @@ public: void decode (void) { this->len_ = ntohl (this->len_); - this->supplier_id_ = ntohl (this->supplier_id_); this->type_ = ntohl (this->type_); this->priority_ = ntohl (this->priority_); } @@ -107,7 +102,6 @@ public: void encode (void) { this->len_ = htonl (this->len_); - this->supplier_id_ = htonl (this->supplier_id_); this->type_ = htonl (this->type_); this->priority_ = htonl (this->priority_); } @@ -116,8 +110,9 @@ public: size_t len_; // Length of the data_ payload, in bytes. - SUPPLIER_ID supplier_id_; - // Source ID. + CONNECTION_ID connection_id_; + // Unique connection identifier that denotes a particular + // Connection_Handler. ACE_INT32 type_; // Event type. @@ -127,10 +122,10 @@ public: }; class Event +{ // = TITLE // Variable-sized event (data_ may be variable-sized between // 0 and MAX_PAYLOAD_SIZE). -{ public: enum { MAX_PAYLOAD_SIZE = 1024 }; // The maximum size of an Event. diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index ac09c0e239c..c430324b42e 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -2,24 +2,24 @@ // $Id$ #define ACE_BUILD_SVC_DLL -#include "Proxy_Handler_Connector.h" +#include "Connection_Handler_Connector.h" #include "Event_Channel.h" -ACE_Event_Channel::~ACE_Event_Channel (void) +Event_Channel::~Event_Channel (void) { } -ACE_Event_Channel::ACE_Event_Channel (void) - : supplier_acceptor_ (*this), - consumer_acceptor_ (*this) +Event_Channel::Event_Channel (void) + : supplier_acceptor_ (*this, 'S'), + consumer_acceptor_ (*this, 'C') { } int -ACE_Event_Channel::compute_performance_statistics (void) +Event_Channel::compute_performance_statistics (void) { ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); - PROXY_MAP_ITERATOR cmi (this->proxy_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // If we've got a ACE_Thread Manager then use it to suspend all the // threads. This will enable us to get an accurate count. @@ -38,44 +38,32 @@ ACE_Event_Channel::compute_performance_statistics (void) // Iterate through the connection map summing up the number of bytes // sent/received. - for (PROXY_MAP_ENTRY *me = 0; + for (CONNECTION_MAP_ENTRY *me = 0; cmi.next (me) != 0; cmi.advance ()) { - Proxy_Handler *proxy_handler = me->int_id_; + Connection_Handler *connection_handler = me->int_id_; - if (proxy_handler->proxy_role () == 'C') - total_bytes_out += proxy_handler->total_bytes (); - else // proxy_handler->proxy_role () == 'S' - total_bytes_in += proxy_handler->total_bytes (); + if (connection_handler->connection_role () == 'C') + total_bytes_out += connection_handler->total_bytes (); + else // connection_handler->connection_role () == 'S' + total_bytes_in += connection_handler->total_bytes (); } -#if defined (ACE_NLOGGING) - ACE_OS::fprintf (stderr, - "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - performance_window_, - total_bytes_in, - total_bytes_out); - - ACE_OS::fprintf (stderr, "%f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_))); - - ACE_OS::fprintf (stderr, "%f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))); -#else ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", Options::instance ()->performance_window (), total_bytes_in, total_bytes_out)); + ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", (float) (total_bytes_in * 8 / (float) (1024 * 1024 * Options::instance ()->performance_window ())))); + ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", (float) (total_bytes_out * 8 / (float) (1024 * 1024 * Options::instance ()->performance_window ())))); -#endif /* ACE_NLOGGING */ // Resume all the threads again. @@ -91,7 +79,7 @@ ACE_Event_Channel::compute_performance_statistics (void) } int -ACE_Event_Channel::handle_timeout (const ACE_Time_Value &, +Event_Channel::handle_timeout (const ACE_Time_Value &, const void *) { return this->compute_performance_statistics (); @@ -101,7 +89,7 @@ ACE_Event_Channel::handle_timeout (const ACE_Time_Value &, // to receive it. int -ACE_Event_Channel::put (ACE_Message_Block *event, +Event_Channel::put (ACE_Message_Block *event, ACE_Time_Value *) { // We got a valid event, so determine its virtual forwarding @@ -120,9 +108,8 @@ ACE_Event_Channel::put (ACE_Message_Block *event, if (this->efd_.find (*forwarding_addr, dispatch_set) == -1) // Failure. ACE_ERROR ((LM_DEBUG, - "(%t) find failed on conn id = %d, supplier id = %d, type = %d\n", - forwarding_addr->proxy_id_, - forwarding_addr->supplier_id_, + "(%t) find failed on connection id = %d, type = %d\n", + forwarding_addr->connection_id_, forwarding_addr->type_)); else { @@ -140,28 +127,28 @@ ACE_Event_Channel::put (ACE_Message_Block *event, // multi-threaded configuration. // data->locking_strategy (MB_Locking_Strategy::instance ()); - for (Proxy_Handler **proxy_handler = 0; - dsi.next (proxy_handler) != 0; + for (Connection_Handler **connection_handler = 0; + dsi.next (connection_handler) != 0; dsi.advance ()) { - // Only process active proxy_handlers. - if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) + // Only process active connection_handlers. + if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED) { // Duplicate the event portion via reference // counting. ACE_Message_Block *dup_msg = data->duplicate (); ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n", - (*proxy_handler)->id ())); + (*connection_handler)->id ())); - if ((*proxy_handler)->put (dup_msg) == -1) + if ((*connection_handler)->put (dup_msg) == -1) { if (errno == EWOULDBLOCK) // The queue has filled up! ACE_ERROR ((LM_ERROR, "(%t) %p\n", "gateway is flow controlled, so we're dropping events")); else ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", - "put", (*proxy_handler)->id ())); + "put", (*connection_handler)->id ())); // We are responsible for releasing an // ACE_Message_Block if failures occur. @@ -178,13 +165,13 @@ ACE_Event_Channel::put (ACE_Message_Block *event, } int -ACE_Event_Channel::svc (void) +Event_Channel::svc (void) { return 0; } int -ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler) +Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler) { ACE_Synch_Options synch_options; @@ -193,36 +180,36 @@ ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler) else synch_options = ACE_Synch_Options::synch; - return this->connector_.initiate_connection (proxy_handler, + return this->connector_.initiate_connection (connection_handler, synch_options); } int -ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler) +Event_Channel::complete_connection_connection (Connection_Handler *connection_handler) { - int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; + int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; int socket_queue_size = Options::instance ()->socket_queue_size (); if (socket_queue_size > 0) - if (proxy_handler->peer ().set_option (SOL_SOCKET, + if (connection_handler->peer ().set_option (SOL_SOCKET, option, &socket_queue_size, sizeof (int)) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); - proxy_handler->thr_mgr (ACE_Thread_Manager::instance ()); + connection_handler->thr_mgr (ACE_Thread_Manager::instance ()); // Our state is now "established." - proxy_handler->state (Proxy_Handler::ESTABLISHED); + connection_handler->state (Connection_Handler::ESTABLISHED); // Restart the timeout to 1. - proxy_handler->timeout (1); + connection_handler->timeout (1); - ACE_INT32 id = htonl (proxy_handler->id ()); + ACE_INT32 id = htonl (connection_handler->id ()); // Send the connection id to the peerd. - ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id); + ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id); if (n != sizeof id) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", @@ -235,24 +222,24 @@ ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler) // synchronously or asynchronously). int -ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler) +Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler) { // Skip over proxies with deactivated handles. - if (proxy_handler->get_handle () != ACE_INVALID_HANDLE) + if (connection_handler->get_handle () != ACE_INVALID_HANDLE) { // Make sure to close down peer to reclaim descriptor. - proxy_handler->peer ().close (); + connection_handler->peer ().close (); } - if (proxy_handler->state () != Proxy_Handler::DISCONNECTING) + if (connection_handler->state () != Connection_Handler::DISCONNECTING) { ACE_DEBUG ((LM_DEBUG, - "(%t) scheduling reinitiation of Proxy_Handler %d\n", - proxy_handler->id ())); + "(%t) scheduling reinitiation of Connection_Handler %d\n", + connection_handler->id ())); // Reschedule ourselves to try and connect again. if (ACE_Reactor::instance ()->schedule_timer - (proxy_handler, 0, proxy_handler->timeout ()) == -1) + (connection_handler, 0, connection_handler->timeout ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); } @@ -262,22 +249,22 @@ ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler) // Initiate active connections with the Consumer and Supplier Peers. void -ACE_Event_Channel::initiate_connector (void) +Event_Channel::initiate_connector (void) { if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) { - PROXY_MAP_ITERATOR cmi (this->proxy_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // Iterate through the Consumer Map connecting all the - // Proxy_Handlers. + // Connection_Handlers. - for (PROXY_MAP_ENTRY *me = 0; + for (CONNECTION_MAP_ENTRY *me = 0; cmi.next (me) != 0; cmi.advance ()) { - Proxy_Handler *proxy_handler = me->int_id_; + Connection_Handler *connection_handler = me->int_id_; - if (this->initiate_proxy_connection (proxy_handler) == -1) + if (this->initiate_connection_connection (connection_handler) == -1) continue; // Failures are handled elsewhere... } } @@ -287,7 +274,7 @@ ACE_Event_Channel::initiate_connector (void) // to accept. void -ACE_Event_Channel::initiate_acceptors (void) +Event_Channel::initiate_acceptors (void) { if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR) && this->consumer_acceptor_.open @@ -307,10 +294,10 @@ ACE_Event_Channel::initiate_acceptors (void) } // This method gracefully shuts down all the Handlers in the -// Proxy_Handler Connection Map. +// Connection_Handler Connection Map. int -ACE_Event_Channel::close (u_long) +Event_Channel::close (u_long) { if (Options::instance ()->threading_strategy () != Options::REACTIVE) @@ -322,22 +309,22 @@ ACE_Event_Channel::close (u_long) // First tell everyone that the spaceship is here... { - PROXY_MAP_ITERATOR cmi (this->proxy_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // Iterate over all the handlers and shut them down. - for (PROXY_MAP_ENTRY *me; + for (CONNECTION_MAP_ENTRY *me; cmi.next (me) != 0; cmi.advance ()) { - Proxy_Handler *proxy_handler = me->int_id_; + Connection_Handler *connection_handler = me->int_id_; ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n", - proxy_handler->id ())); + connection_handler->id ())); - // Mark Proxy_Handler as DISCONNECTING so we don't try to + // Mark Connection_Handler as DISCONNECTING so we don't try to // reconnect... - proxy_handler->state (Proxy_Handler::DISCONNECTING); + connection_handler->state (Connection_Handler::DISCONNECTING); } } @@ -352,16 +339,16 @@ ACE_Event_Channel::close (u_long) // Now tell everyone that it is now time to commit suicide. { - PROXY_MAP_ITERATOR cmi (this->proxy_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); - for (PROXY_MAP_ENTRY *me; + for (CONNECTION_MAP_ENTRY *me; cmi.next (me) != 0; cmi.advance ()) { - Proxy_Handler *proxy_handler = me->int_id_; + Connection_Handler *connection_handler = me->int_id_; - // Deallocate Proxy_Handler resources. - proxy_handler->destroy (); // Will trigger a delete. + // Deallocate Connection_Handler resources. + connection_handler->destroy (); // Will trigger a delete. } } @@ -369,28 +356,28 @@ ACE_Event_Channel::close (u_long) } int -ACE_Event_Channel::find_proxy (ACE_INT32 proxy_id, - Proxy_Handler *&proxy_handler) +Event_Channel::find_proxy (ACE_INT32 connection_id, + Connection_Handler *&connection_handler) { - return this->proxy_map_.find (proxy_id, proxy_handler); + return this->connection_map_.find (connection_id, connection_handler); } int -ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler) +Event_Channel::bind_proxy (Connection_Handler *connection_handler) { - int result = this->proxy_map_.bind (proxy_handler->id (), proxy_handler); + int result = this->connection_map_.bind (connection_handler->id (), connection_handler); switch (result) { case -1: ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - proxy_handler->id ()), -1); + connection_handler->id ()), -1); /* NOTREACHED */ case 1: // Oops, found a duplicate! ACE_ERROR_RETURN ((LM_ERROR, "(%t) duplicate connection %d, already bound\n", - proxy_handler->id ()), -1); + connection_handler->id ()), -1); /* NOTREACHED */ case 0: // Success. @@ -406,7 +393,7 @@ ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler) } int -ACE_Event_Channel::subscribe (const Event_Key &event_addr, +Event_Channel::subscribe (const Event_Key &event_addr, Consumer_Dispatch_Set *cds) { int result = this->efd_.bind (event_addr, cds); @@ -417,12 +404,12 @@ ACE_Event_Channel::subscribe (const Event_Key &event_addr, case -1: ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - event_addr.proxy_id_), -1); + event_addr.connection_id_), -1); /* NOTREACHED */ case 1: // Oops, found a duplicate! ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " - "already bound\n", event_addr.proxy_id_), -1); + "already bound\n", event_addr.connection_id_), -1); /* NOTREACHED */ case 0: // Success. @@ -437,9 +424,9 @@ ACE_Event_Channel::subscribe (const Event_Key &event_addr, } int -ACE_Event_Channel::open (void *) +Event_Channel::open (void *) { - // Ignore SIPPIPE so each Consumer_Proxy can handle it. + // Ignore SIPPIPE so each Consumer_Handler can handle it. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); ACE_UNUSED_ARG (sig); @@ -469,18 +456,18 @@ ACE_Event_Channel::open (void *) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>; -template class ACE_Map_Entry<ACE_INT32, Proxy_Handler *>; -template class ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX>; -template class ACE_Map_Reverse_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX>; -template class ACE_Map_Iterator_Base<ACE_INT32, Proxy_Handler *, MAP_MUTEX>; -template class ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX>; -template class ACE_Unbounded_Set_Iterator<Proxy_Handler *>; +template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>; +template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>; +template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>; +template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>; +template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>; +template class ACE_Unbounded_Set_Iterator<Connection_Handler *>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX> -#pragma instantiate ACE_Map_Entry<ACE_INT32, Proxy_Handler *> -#pragma instantiate ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Proxy_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> -#pragma instantiate ACE_Unbounded_Set_Iterator<Proxy_Handler *> +#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *> +#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX> +#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX> +#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX> +#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX> +#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index 600c63060f9..4735c7b9d44 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -17,23 +17,25 @@ #if !defined (ACE_EVENT_CHANNEL) #define ACE_EVENT_CHANNEL -#include "Proxy_Handler_Connector.h" -#include "Proxy_Handler_Acceptor.h" +#include "Connection_Handler_Connector.h" +#include "Connection_Handler_Acceptor.h" #include "Consumer_Dispatch_Set.h" #include "Event_Forwarding_Discriminator.h" typedef ACE_Null_Mutex MAP_MUTEX; -class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<ACE_SYNCH> +class ACE_Svc_Export Event_Channel : public ACE_Event_Handler +{ // = TITLE // Define a generic Event_Channel. // // = DESCRIPTION -{ + // We inherit from <ACE_Event_Handler> so that we can be + // registered with an <ACE_Reactor> to handle timeouts. public: // = Initialization and termination methods. - ACE_Event_Channel (void); - ~ACE_Event_Channel (void); + Event_Channel (void); + ~Event_Channel (void); virtual int open (void * = 0); // Open the channel. @@ -42,22 +44,22 @@ public: // Close down the Channel. // = Proxy management methods. - int initiate_proxy_connection (Proxy_Handler *); - // Initiate the connection of the <Proxy_Handler> to its peer. + int initiate_connection_connection (Connection_Handler *); + // Initiate the connection of the <Connection_Handler> to its peer. - int complete_proxy_connection (Proxy_Handler *); - // Complete the initialization of the <Proxy_Handler> once it's + int complete_connection_connection (Connection_Handler *); + // Complete the initialization of the <Connection_Handler> once it's // connected to its Peer. - int reinitiate_proxy_connection (Proxy_Handler *); + int reinitiate_connection_connection (Connection_Handler *); // Reinitiate a connection asynchronously when the Peer fails. - int bind_proxy (Proxy_Handler *); - // Bind the <Proxy_Handler> to the <proxy_map_>. + int bind_proxy (Connection_Handler *); + // Bind the <Connection_Handler> to the <connection_map_>. - int find_proxy (ACE_INT32 proxy_id, - Proxy_Handler *&); - // Locate the <Proxy_Handler> with <proxy_id>. + int find_proxy (ACE_INT32 connection_id, + Connection_Handler *&); + // Locate the <Connection_Handler> with <connection_id>. int subscribe (const Event_Key &event_addr, Consumer_Dispatch_Set *cds); @@ -91,28 +93,28 @@ private: // Periodically callback to perform timer-based performance // profiling. - Proxy_Handler_Connector connector_; + Connection_Handler_Connector connector_; // Used to establish the connections actively. - Proxy_Handler_Acceptor supplier_acceptor_; + Connection_Handler_Acceptor supplier_acceptor_; // Used to establish connections passively and create Suppliers. - Proxy_Handler_Acceptor consumer_acceptor_; + Connection_Handler_Acceptor consumer_acceptor_; // Used to establish connections passively and create Consumers. // = Make life easier by defining typedefs. - typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> - PROXY_MAP; - typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> - PROXY_MAP_ITERATOR; - typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> - PROXY_MAP_ENTRY; + typedef ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX> + CONNECTION_MAP; + typedef ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX> + CONNECTION_MAP_ITERATOR; + typedef ACE_Map_Entry<ACE_INT32, Connection_Handler *> + CONNECTION_MAP_ENTRY; - PROXY_MAP proxy_map_; - // Table that maps Connection IDs to Proxy_Handler *'s. + CONNECTION_MAP connection_map_; + // Table that maps Connection IDs to Connection_Handler *'s. Event_Forwarding_Discriminator efd_; - // Map that associates an event to a set of Consumer_Proxy *'s. + // Map that associates an event to a set of Consumer_Handler *'s. }; #endif /* ACE_EVENT_CHANNEL */ diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index 64b4d49db59..22490c6329b 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -20,9 +20,9 @@ #include "ace/OS.h" class FP - // = TITLE - // This class serves as a namespace for the Return_Type { + // = TITLE + // This class serves as a namespace for the <Return_Type>. public: enum Return_Type { @@ -37,10 +37,10 @@ public: template <class ENTRY> class File_Parser - // = TITLE - // Class used to parse the configuration file for the Consumer - // Map. { + // = TITLE + // Class used to parse the configuration file for the + // <Consumer_Map>. public: // = Open and Close the file specified int open (const char filename[]); diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index d7df1458299..d86afab90a4 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -12,7 +12,7 @@ class ACE_Svc_Export Gateway : public ACE_Service_Object // Integrates the whole Gateway application. // // = DESCRIPTION - // This implementation uses the <ACE_Event_Channel> as the basis + // This implementation uses the <Event_Channel> as the basis // for the <Gateway> routing. { protected: @@ -27,7 +27,7 @@ protected: // Return info about this service. // = Configuration methods. - int parse_proxy_config_file (void); + int parse_connection_config_file (void); // Parse the proxy configuration file. int parse_consumer_config_file (void); @@ -41,12 +41,12 @@ protected: int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); // Shut down the Gateway when a signal arrives. - ACE_Event_Channel event_channel_; + Event_Channel event_channel_; // The Event Channel routes events from Supplier(s) to Consumer(s) - // using <Supplier_Proxy> and <Consumer_Proxy> objects. + // using <Supplier_Handler> and <Consumer_Handler> objects. - Proxy_Handler_Factory proxy_handler_factory_; - // Creates the appropriate type of <Proxy_Handlers>. + Connection_Handler_Factory connection_handler_factory_; + // Creates the appropriate type of <Connection_Handlers>. int debug_; // Are we debugging? @@ -120,10 +120,11 @@ Gateway::init (int argc, char *argv[]) Options::instance ()->performance_window ())); } - if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) + if (Options::instance ()->enabled + (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) { // Parse the proxy configuration file. - this->parse_proxy_config_file (); + this->parse_connection_config_file (); // Parse the consumer config file and build the event forwarding // discriminator. @@ -172,22 +173,22 @@ Gateway::info (char **strp, size_t length) const // Parse and build the proxy table. int -Gateway::parse_proxy_config_file (void) +Gateway::parse_connection_config_file (void) { // File that contains the proxy configuration information. - Proxy_Config_File_Parser proxy_file; + Connection_Config_File_Parser connection_file; int file_empty = 1; int line_number = 0; - if (proxy_file.open (Options::instance ()->proxy_config_file ()) == -1) + if (connection_file.open (Options::instance ()->connection_config_file ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - Options::instance ()->proxy_config_file ()), + Options::instance ()->connection_config_file ()), -1); // Read config file one line at a time. - for (Proxy_Config_Info pci; - proxy_file.read_entry (pci, line_number) != FP::EOFILE; + for (Connection_Config_Info pci; + connection_file.read_entry (pci, line_number) != FP::EOFILE; ) { file_empty = 0; @@ -196,10 +197,10 @@ Gateway::parse_proxy_config_file (void) ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, proxy role = %c, " "max retry timeout = %d, local port = %d, priority = %d\n", - pci.proxy_id_, + pci.connection_id_, pci.host_, pci.remote_port_, - pci.proxy_role_, + pci.connection_role_, pci.max_retry_timeout_, pci.local_port_, pci.priority_)); @@ -207,22 +208,22 @@ Gateway::parse_proxy_config_file (void) pci.event_channel_ = &this->event_channel_; // Create the appropriate type of Proxy. - Proxy_Handler *proxy_handler = - this->proxy_handler_factory_.make_proxy_handler (pci); + Connection_Handler *connection_handler = + this->connection_handler_factory_.make_connection_handler (pci); - if (proxy_handler == 0) + if (connection_handler == 0) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", - "make_proxy_handler"), + "make_connection_handler"), -1); - // Bind the new Proxy_Handler to the connection ID. - this->event_channel_.bind_proxy (proxy_handler); + // Bind the new Connection_Handler to the connection ID. + this->event_channel_.bind_proxy (connection_handler); } if (file_empty) ACE_ERROR ((LM_WARNING, - "warning: connection proxy_handler configuration file was empty\n")); + "warning: connection connection_handler configuration file was empty\n")); return 0; } @@ -250,10 +251,9 @@ Gateway::parse_consumer_config_file (void) if (Options::instance ()->enabled (Options::DEBUG)) { ACE_DEBUG ((LM_DEBUG, - "(%t) conn id = %d, supplier id = %d, payload = %d, " + "(%t) connection id = %d, payload = %d, " "number of consumers = %d\n", - cci.proxy_id_, - cci.supplier_id_, + cci.connection_id_, cci.type_, cci.total_consumers_)); @@ -267,20 +267,19 @@ Gateway::parse_consumer_config_file (void) Consumer_Dispatch_Set *dispatch_set; ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1); - Event_Key event_addr (cci.proxy_id_, - cci.supplier_id_, + Event_Key event_addr (cci.connection_id_, cci.type_); // Add the Consumers to the Dispatch_Set. for (int i = 0; i < cci.total_consumers_; i++) { - Proxy_Handler *proxy_handler = 0; + Connection_Handler *connection_handler = 0; // Lookup destination and add to Consumer_Dispatch_Set set // if found. if (this->event_channel_.find_proxy (cci.consumers_[i], - proxy_handler) != -1) - dispatch_set->insert (proxy_handler); + connection_handler) != -1) + dispatch_set->insert (connection_handler); else ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", i, cci.consumers_[i])); @@ -301,10 +300,10 @@ Gateway::parse_consumer_config_file (void) ACE_SVC_FACTORY_DEFINE (Gateway) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Node<Proxy_Handler *>; -template class ACE_Unbounded_Set<Proxy_Handler *>; +template class ACE_Node<Connection_Handler *>; +template class ACE_Unbounded_Set<Connection_Handler *>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Node<Proxy_Handler *> -#pragma instantiate ACE_Unbounded_Set<Proxy_Handler *> +#pragma instantiate ACE_Node<Connection_Handler *> +#pragma instantiate ACE_Unbounded_Set<Connection_Handler *> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile index 31d524bdbfc..9004ef6e63c 100644 --- a/apps/Gateway/Gateway/Makefile +++ b/apps/Gateway/Gateway/Makefile @@ -12,16 +12,16 @@ BIN = gatewayd LIB = libGateway.a SHLIB = libGateway.$(SOEXT) -FILES = Concrete_Proxy_Handlers \ +FILES = Concrete_Connection_Handlers \ Config_Files \ File_Parser \ Gateway \ Event_Channel \ Event_Forwarding_Discriminator \ Options \ - Proxy_Handler \ - Proxy_Handler_Acceptor \ - Proxy_Handler_Connector + Connection_Handler \ + Connection_Handler_Acceptor \ + Connection_Handler_Connector LSRC = $(addsuffix .cpp,$(FILES)) LDLIBS = -lGateway @@ -52,8 +52,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Concrete_Proxy_Handlers.o .shobj/Concrete_Proxy_Handlers.so: Concrete_Proxy_Handlers.cpp Event_Channel.h \ - Proxy_Handler_Connector.h \ +.obj/Concrete_Connection_Handlers.o .shobj/Concrete_Connection_Handlers.so: Concrete_Connection_Handlers.cpp Event_Channel.h \ + Connection_Handler_Connector.h \ $(ACE_ROOT)/ace/Connector.h \ $(ACE_ROOT)/ace/Service_Config.h \ $(ACE_ROOT)/ace/Service_Object.h \ @@ -152,14 +152,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/SOCK_Stream.i \ $(ACE_ROOT)/ace/Time_Value.h \ $(ACE_ROOT)/ace/SOCK_Connector.i \ - Proxy_Handler.h Config_Files.h File_Parser.h Event.h \ - Proxy_Handler_Acceptor.h \ + Connection_Handler.h Config_Files.h File_Parser.h Event.h \ + Connection_Handler_Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.i \ $(ACE_ROOT)/ace/SOCK_Acceptor.h \ $(ACE_ROOT)/ace/SOCK_Acceptor.i \ Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h \ - Concrete_Proxy_Handlers.h + Concrete_Connection_Handlers.h .obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/config.h \ @@ -242,7 +242,7 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Reactor.i \ $(ACE_ROOT)/ace/Reactor_Impl.h \ $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ - Event_Channel.h Proxy_Handler_Connector.h \ + Event_Channel.h Connection_Handler_Connector.h \ $(ACE_ROOT)/ace/Connector.h \ $(ACE_ROOT)/ace/Map_Manager.h \ $(ACE_ROOT)/ace/Map_Manager.i \ @@ -291,13 +291,13 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/SOCK_Stream.i \ $(ACE_ROOT)/ace/Time_Value.h \ $(ACE_ROOT)/ace/SOCK_Connector.i \ - Proxy_Handler.h Event.h Proxy_Handler_Acceptor.h \ + Connection_Handler.h Event.h Connection_Handler_Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.i \ $(ACE_ROOT)/ace/SOCK_Acceptor.h \ $(ACE_ROOT)/ace/SOCK_Acceptor.i \ Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h Gateway.h -.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp Proxy_Handler_Connector.h \ +.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp Connection_Handler_Connector.h \ $(ACE_ROOT)/ace/Connector.h \ $(ACE_ROOT)/ace/Service_Config.h \ $(ACE_ROOT)/ace/Service_Object.h \ @@ -396,8 +396,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/SOCK_Stream.i \ $(ACE_ROOT)/ace/Time_Value.h \ $(ACE_ROOT)/ace/SOCK_Connector.i \ - Proxy_Handler.h Config_Files.h File_Parser.h Event.h Event_Channel.h \ - Proxy_Handler_Acceptor.h \ + Connection_Handler.h Config_Files.h File_Parser.h Event.h Event_Channel.h \ + Connection_Handler_Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.i \ $(ACE_ROOT)/ace/SOCK_Acceptor.h \ @@ -435,8 +435,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU Consumer_Dispatch_Set.h \ $(ACE_ROOT)/ace/Containers.h \ $(ACE_ROOT)/ace/Containers.i -.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Event_Channel.h \ - Proxy_Handler_Connector.h \ +.obj/Connection_Handler.o .shobj/Connection_Handler.so: Connection_Handler.cpp Event_Channel.h \ + Connection_Handler_Connector.h \ $(ACE_ROOT)/ace/Connector.h \ $(ACE_ROOT)/ace/Service_Config.h \ $(ACE_ROOT)/ace/Service_Object.h \ @@ -535,16 +535,16 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/SOCK_Stream.i \ $(ACE_ROOT)/ace/Time_Value.h \ $(ACE_ROOT)/ace/SOCK_Connector.i \ - Proxy_Handler.h Config_Files.h File_Parser.h Event.h \ - Proxy_Handler_Acceptor.h \ + Connection_Handler.h Config_Files.h File_Parser.h Event.h \ + Connection_Handler_Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.i \ $(ACE_ROOT)/ace/SOCK_Acceptor.h \ $(ACE_ROOT)/ace/SOCK_Acceptor.i \ Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h \ - Concrete_Proxy_Handlers.h -.obj/Proxy_Handler_Acceptor.o .shobj/Proxy_Handler_Acceptor.so: Proxy_Handler_Acceptor.cpp Event_Channel.h \ - Proxy_Handler_Connector.h \ + Concrete_Connection_Handlers.h +.obj/Connection_Handler_Acceptor.o .shobj/Connection_Handler_Acceptor.so: Connection_Handler_Acceptor.cpp Event_Channel.h \ + Connection_Handler_Connector.h \ $(ACE_ROOT)/ace/Connector.h \ $(ACE_ROOT)/ace/Service_Config.h \ $(ACE_ROOT)/ace/Service_Object.h \ @@ -643,15 +643,15 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/SOCK_Stream.i \ $(ACE_ROOT)/ace/Time_Value.h \ $(ACE_ROOT)/ace/SOCK_Connector.i \ - Proxy_Handler.h Config_Files.h File_Parser.h Event.h \ - Proxy_Handler_Acceptor.h \ + Connection_Handler.h Config_Files.h File_Parser.h Event.h \ + Connection_Handler_Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.h \ $(ACE_ROOT)/ace/Acceptor.i \ $(ACE_ROOT)/ace/SOCK_Acceptor.h \ $(ACE_ROOT)/ace/SOCK_Acceptor.i \ Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h -.obj/Proxy_Handler_Connector.o .shobj/Proxy_Handler_Connector.so: Proxy_Handler_Connector.cpp \ - Proxy_Handler_Connector.h \ +.obj/Connection_Handler_Connector.o .shobj/Connection_Handler_Connector.so: Connection_Handler_Connector.cpp \ + Connection_Handler_Connector.h \ $(ACE_ROOT)/ace/Connector.h \ $(ACE_ROOT)/ace/Service_Config.h \ $(ACE_ROOT)/ace/Service_Object.h \ @@ -750,6 +750,6 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/SOCK_Stream.i \ $(ACE_ROOT)/ace/Time_Value.h \ $(ACE_ROOT)/ace/SOCK_Connector.i \ - Proxy_Handler.h Config_Files.h File_Parser.h Event.h + Connection_Handler.h Config_Files.h File_Parser.h Event.h # IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp index 48f4834bfe9..073eea17494 100644 --- a/apps/Gateway/Gateway/Options.cpp +++ b/apps/Gateway/Gateway/Options.cpp @@ -29,9 +29,10 @@ Options::Options (void) supplier_acceptor_port_ (DEFAULT_PEER_SUPPLIER_PORT), supplier_connector_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT), consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT), - max_timeout_ (MAX_TIMEOUT) + max_timeout_ (MAX_TIMEOUT), + max_queue_size_ (MAX_QUEUE_SIZE) { - ACE_OS::strcpy (this->proxy_config_file_, "proxy_config"); + ACE_OS::strcpy (this->connection_config_file_, "connection_config"); ACE_OS::strcpy (this->consumer_config_file_, "consumer_config"); } @@ -89,9 +90,9 @@ Options::threading_strategy (void) const } const char * -Options::proxy_config_file (void) const +Options::connection_config_file (void) const { - return this->proxy_config_file_; + return this->connection_config_file_; } const char * @@ -118,6 +119,12 @@ Options::consumer_connector_port (void) const return this->consumer_connector_port_; } +long +Options::max_queue_size (void) const +{ + return this->max_queue_size_; +} + u_short Options::supplier_connector_port (void) const { @@ -132,7 +139,7 @@ Options::parse_args (int argc, char *argv[]) // Assign defaults. ACE_Get_Opt get_opt (argc, argv, - "a:bC:c:dP:p:q:r:t:vw:", + "a:bC:c:dm:P:p:q:r:t:vw:", 0); for (int c; (c = get_opt ()) != EOF; ) @@ -200,9 +207,9 @@ Options::parse_args (int argc, char *argv[]) Options::DEBUG); break; case 'P': // Use a different consumer config filename. - ACE_OS::strncpy (this->proxy_config_file_, + ACE_OS::strncpy (this->connection_config_file_, get_opt.optarg, - sizeof this->proxy_config_file_); + sizeof this->connection_config_file_); break; case 'q': // Use a different socket queue size. this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg); diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h index 704d706edd7..7279e61f839 100644 --- a/apps/Gateway/Gateway/Options.h +++ b/apps/Gateway/Gateway/Options.h @@ -20,9 +20,9 @@ #include "ace/Synch.h" class ACE_Svc_Export Options +{ // = TITLE // Options Singleton for a gatewayd. -{ public: // = Options that can be enabled/disabled. enum @@ -38,10 +38,7 @@ public: SUPPLIER_ACCEPTOR = 04, CONSUMER_ACCEPTOR = 010, SUPPLIER_CONNECTOR = 020, - CONSUMER_CONNECTOR = 040, - - MAX_TIMEOUT = 32 - // The maximum timeout for trying to re-establish connections. + CONSUMER_CONNECTOR = 040 }; static Options *instance (void); @@ -102,7 +99,7 @@ public: // Our connector port host, i.e., the host running the gatewayd // process. - const char *proxy_config_file (void) const; + const char *connection_config_file (void) const; // Name of the connection configuration file. const char *consumer_config_file (void) const; @@ -111,7 +108,19 @@ public: long max_timeout (void) const; // The maximum retry timeout delay. + long max_queue_size (void) const; + // The maximum size of the queue. + private: + enum + { + MAX_QUEUE_SIZE = 1024 * 1024 * 16, + // We'll allow up to 16 megabytes to be queued per-output proxy. + + MAX_TIMEOUT = 32 + // The maximum timeout for trying to re-establish connections. + }; + Options (void); // Initialization. @@ -161,7 +170,10 @@ private: long max_timeout_; // The maximum retry timeout delay. - char proxy_config_file_[MAXPATHLEN + 1]; + long max_queue_size_; + // The maximum size of the queue. + + char connection_config_file_[MAXPATHLEN + 1]; // Name of the connection configuration file. char consumer_config_file_[MAXPATHLEN + 1]; diff --git a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp deleted file mode 100644 index 487c9e78f9c..00000000000 --- a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp +++ /dev/null @@ -1,16 +0,0 @@ -// $Id$ - -#include "Event_Channel.h" -#include "Proxy_Handler_Acceptor.h" - -Proxy_Handler_Acceptor::Proxy_Handler_Acceptor (ACE_Event_Channel &ec) - : event_channel_ (ec) -{ -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Acceptor<Proxy_Handler_Factory, ACE_SOCK_ACCEPTOR>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Acceptor<Proxy_Handler_Factory, ACE_SOCK_ACCEPTOR> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - diff --git a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h deleted file mode 100644 index df1aaa64e86..00000000000 --- a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h +++ /dev/null @@ -1,40 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// gateway -// -// = FILENAME -// Proxy_Handler_acceptor.h -// -// = AUTHOR -// Doug Schmidt -// -// ============================================================================ - -#if !defined (_PROXY_HANDLER_ACCEPTOR) -#define _PROXY_HANDLER_ACCEPTOR - -#include "ace/Acceptor.h" -#include "ace/SOCK_Acceptor.h" -#include "Proxy_Handler.h" - -// Forward declaration -class ACE_Event_Channel; - -class Proxy_Handler_Acceptor : public ACE_Acceptor<Proxy_Handler_Factory, ACE_SOCK_ACCEPTOR> - // = TITLE - // A concrete factory class that setups connections to peerds - // and produces a new Proxy_Handler object to do the dirty work... -{ -public: - Proxy_Handler_Acceptor (ACE_Event_Channel &); - -protected: - ACE_Event_Channel &event_channel_; - // Reference to the event channel. -}; - -#endif /* _PROXY_HANDLER_ACCEPTOR */ diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp deleted file mode 100644 index 4799fbacbd4..00000000000 --- a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp +++ /dev/null @@ -1,73 +0,0 @@ -// $Id$ - -#include "Proxy_Handler_Connector.h" - -Proxy_Handler_Connector::Proxy_Handler_Connector (void) -{ -} - -// Initiate (or reinitiate) a connection to the Proxy_Handler. - -int -Proxy_Handler_Connector::initiate_connection (Proxy_Handler *proxy_handler, - ACE_Synch_Options &synch_options) -{ - char addr_buf[MAXHOSTNAMELEN]; - - // Mark ourselves as idle so that the various iterators - // will ignore us until we are reconnected. - proxy_handler->state (Proxy_Handler::IDLE); - - // We check the remote addr second so that it remains in the addr_buf. - if (proxy_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1 - || proxy_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "can't obtain peer's address"), -1); - - // Try to connect to the Peer. - - if (this->connect (proxy_handler, proxy_handler->remote_addr (), - synch_options, proxy_handler->local_addr ()) == -1) - { - if (errno != EWOULDBLOCK) - { - proxy_handler->state (Proxy_Handler::FAILED); - ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", - "connect", addr_buf)); - - return -1; - } - else - { - proxy_handler->state (Proxy_Handler::CONNECTING); - ACE_DEBUG ((LM_DEBUG, - "(%t) in the process of connecting to %s\n", - addr_buf)); - } - } - else - { - proxy_handler->state (Proxy_Handler::ESTABLISHED); - ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", - addr_buf, proxy_handler->get_handle ())); - } - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>; -template class ACE_Svc_Tuple<Proxy_Handler>; -template class ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>; -template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>; -template class ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>; -template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>; -template class ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR> -#pragma instantiate ACE_Svc_Tuple<Proxy_Handler> -#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX> -#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX> -#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX> -#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX> -#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/Gateway/Gateway/connection_config b/apps/Gateway/Gateway/connection_config new file mode 100644 index 00000000000..10813942dd0 --- /dev/null +++ b/apps/Gateway/Gateway/connection_config @@ -0,0 +1,52 @@ +# Configuration file that the gatewayd process uses to determine +# connection information about proxies. +# +# The following provides an explanation for the fields in this file, +# and how they relate to fields in the corresponding "consumer_config" +# file. +# +# 1. Connection ID -- Each Connection Handler is given a unique ID +# that is used in the "consumer_config" file to specify to which +# Consumers the Event Channel will forward incoming events from +# Suppliers using that connection. The Connection ID field is the +# "key" that is used to match up connections in this file with the +# Consumer subscription requests in the "consumer_config" file. +# +# 2. Host -- The host name where the Supplier/Consumer peerd +# process is running. +# +# 3. Remote Port -- The port number where the remote +# Supplier/Consumer peerd process is listening on. +# If this is a '*' character it is an indication to the +# Gateway to use the "default value," e.g., which can be provided +# on the command-line, etc. +# +# 4. Handler Role -- i.e., Consumer ('C') or Supplier ('S') +# +# 5. Max Retry Timeout -- The maximum amount of time that we'll +# wait between retry attempts (these start at 1 second and +# double until they reach the Max Retry Timeout). +# If this is a '*' character it is an indication to the +# Gateway to use the "default value," e.g., which can be provided +# on the command-line, etc. +# +# 6. Local Port -- The port number that we want to use for +# our local Proxy connection. If this is the value 0 or the '*' +# character, then we'll let the socket implementation pick this +# value for us. +# +# 7. Priority -- Each Consumer/Supplier can be given a priority +# that will determine its importance relative to other +# Consumers/Suppliers (this feature isn't implemented yet). +# +# Connection Host Remote Handler Max Retry Local Priority +# ID Port Role Timeout Port +# ---------- -------- ------ ------ ---------- ----- -------- + 1 flamenco * S * * 1 + 2 mambo * C * * 1 +# 3 mambo.cs * C * * 1 +# 4 lambada.cs * C * * 1 +# 5 lambada.cs * C * * 1 +# 6 tango.cs * C * * 1 +# 7 tango.cs * S * * 1 +# 8 tango.cs * C * * 1 diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config index 7d20a50a579..1aaa3fc4028 100644 --- a/apps/Gateway/Gateway/consumer_config +++ b/apps/Gateway/Gateway/consumer_config @@ -5,34 +5,31 @@ # Consumers to subscribe to particular types of events, as well. # # The following provides an explanation for the fields in this file, -# and how they relate to fields in the corresponding "proxy_config" +# and how they relate to fields in the corresponding "connection_config" # file. # -# 1. Proxy ID -- Each Proxy is given a unique ID that is used -# in the "consumer_config" file to specify to which Consumers -# the Event Channel will forward incoming events from Suppliers. -# The Proxy ID field is the "key" that is used to match up -# Consumer subscription requests in this file with Proxy -# connections in the "proxy_config" file. +# 1. Connection ID -- Each Connection Handler is given a unique ID +# that is used in the "consumer_config" file to specify to which +# Consumers the Event Channel will forward incoming events from +# Suppliers. The Connection ID field is the "key" that is used to +# match up Consumer subscription requests in this file with +# connections in the "connection_config" file. # -# 2. Supplier ID -- Currently, this has the same meaning as the -# Proxy ID, though a more sophisticated implementation might change -# this... +# 2. Event Type -- Indicates the type of the event. Consumers +# can use this to only subscribe to certain types of events. +# This feature is currently not implemented. # -# 3. Type -- Indicates the type of the event. Consumers -# can use this to only subscribe to certain types of events. This -# feature is currently not implemented. -# -# 4. Consumers -- Indicates which Consumers will receive events sent +# 3. Consumers -- Indicates which Consumers will receive events sent # from this Proxy/Supplier ID, i.e., Consumers can subscribe to # receive events from particular Suppliers. Note that more than # one Consumer can subscribe to the same Supplier event, i.e., # we support logical "multicast" (which is currently implemented # using multi-point unicast via TCP/IP). # -# Proxy ID Supplier ID Type Consumers -# -------- ----------- ------- ------------ - 1 1 0 2 -# 2 2 0 3,4 -# 3 3 0 4 -# 4 4 0 5 +# Connection Event Consumers +# ID Type +# ---------- ---- --------- + 1 0 2 +# 2 0 3,4 +# 3 0 4 +# 4 0 5 diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config deleted file mode 100644 index cdc7d07ffa3..00000000000 --- a/apps/Gateway/Gateway/proxy_config +++ /dev/null @@ -1,51 +0,0 @@ -# Configuration file that the gatewayd process uses to determine -# connection information about proxies. -# -# The following provides an explanation for the fields in this file, -# and how they relate to fields in the corresponding "consumer_config" -# file. -# -# 1. Proxy ID -- Each Proxy is given a unique ID that is used -# in the "consumer_config" file to specify to which Consumers -# the Event Channel will forward incoming events from Suppliers. -# The Proxy ID field is the "key" that is used to match up Proxy -# connections in this file with the Consumer subscription requests -# in the "consumer_config" file. -# -# 2. Host -- The host name where the Supplier/Consumer peerd -# process is running. -# -# 3. Remote Port -- The port number where the remote -# Supplier/Consumer peerd process is listening on. -# If this is a '*' character it is an indication to the -# Gateway to use the "default value," e.g., which can be provided -# on the command-line, etc. -# -# 4. Proxy Role -- i.e., Consumer ('C') or Supplier ('S') -# -# 5. Max Retry Timeout -- The maximum amount of time that we'll -# wait between retry attempts (these start at 1 second and -# double until they reach the Max Retry Timeout). -# If this is a '*' character it is an indication to the -# Gateway to use the "default value," e.g., which can be provided -# on the command-line, etc. -# -# 6. Local Port -- The port number that we want to use for -# our local Proxy connection. If this is the value 0, then -# we'll let the socket implementation pick this value for us. -# -# 7. Priority -- Each Consumer/Supplier can be given a priority -# that will determine its importance relative to other -# Consumers/Suppliers (this feature isn't implemented yet). -# -# Proxy Host Remote Proxy Max Retry Local Priority -# ID Port Role Timeout Port -# ---- -------- ------ ------ ---------- ----- -------- - 1 flamenco 10004 S 32 0 1 - 2 tango 10004 C 32 0 1 -# 3 mambo.cs 10002 C 32 0 1 -# 4 lambada.cs 10002 C 32 0 1 -# 5 lambada.cs 10002 C 32 0 1 -# 6 tango.cs 10002 C 32 0 1 -# 7 tango.cs 5001 S 32 0 1 -# 8 tango.cs 5002 C 32 0 1 diff --git a/apps/Gateway/Peer/Options.cpp b/apps/Gateway/Peer/Options.cpp index fb6daccce91..8931e548f08 100644 --- a/apps/Gateway/Peer/Options.cpp +++ b/apps/Gateway/Peer/Options.cpp @@ -88,7 +88,7 @@ Options::enabled (int option) const void Options::parse_args (int argc, char *argv[]) { - ACE_Get_Opt get_opt (argc, argv, "a:c:h:q:t:v", 0); + ACE_Get_Opt get_opt (argc, argv, "a:c:h:m:t:v", 0); for (int c; (c = get_opt ()) != -1; ) { @@ -147,7 +147,7 @@ Options::parse_args (int argc, char *argv[]) this->connector_host_ = get_opt.optarg; break; /* NOTREACHED */ - case 'q': + case 'm': // max queue size. this->max_queue_size_ = ACE_OS::atoi (get_opt.optarg); break; diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp index b67867c9be3..bc1d7a24c02 100644 --- a/apps/Gateway/Peer/Peer.cpp +++ b/apps/Gateway/Peer/Peer.cpp @@ -6,7 +6,7 @@ #include "Options.h" Peer_Handler::Peer_Handler (void) - : proxy_id_ (0), + : connection_id_ (0), msg_frag_ (0), total_bytes_ (0) { @@ -53,8 +53,8 @@ Peer_Handler::open (void *a) "schedule_wakeup"), -1); - // First action is to wait to be notified of our supplier id. - this->do_action_ = &Peer_Handler::await_supplier_id; + // First action is to wait to be notified of our connection id. + this->do_action_ = &Peer_Handler::await_connection_id; return 0; } @@ -63,7 +63,7 @@ Peer_Handler::open (void *a) int Peer_Handler::xmit_stdin (void) { - if (this->proxy_id_ != -1) + if (this->connection_id_ != -1) { ACE_Message_Block *mb; @@ -94,9 +94,7 @@ Peer_Handler::xmit_stdin (void) ACE_ERROR ((LM_ERROR, "%p\n", "read")); break; default: - // For simplicity, we'll use our proxy id as the supplier id - // (which we must store in network byte order, of course). - event->header_.supplier_id_ = this->proxy_id_; + event->header_.connection_id_ = this->connection_id_; event->header_.len_ = n; event->header_.priority_ = 0; event->header_.type_ = 0; @@ -148,7 +146,8 @@ Peer_Handler::nonblk_put (ACE_Message_Block *mb) // We didn't manage to send everything, so requeue. ACE_DEBUG ((LM_DEBUG, "queueing activated on handle %d to supplier id %d\n", - this->get_handle (), this->proxy_id_)); + this->get_handle (), + this->connection_id_)); // Re-queue in *front* of the list to preserve order. if (this->msg_queue ()->enqueue_head @@ -211,7 +210,7 @@ Peer_Handler::handle_output (ACE_HANDLE) ACE_DEBUG ((LM_DEBUG, "queue now empty on handle %d to supplier id %d\n", this->get_handle (), - this->proxy_id_)); + this->connection_id_)); if (ACE_Reactor::instance ()->cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) @@ -224,7 +223,10 @@ Peer_Handler::handle_output (ACE_HANDLE) } else // If the list is empty there's a bug! - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "dequeue_head"), 0); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "dequeue_head"), + 0); } // Send an event to a peer (may block if necessary). @@ -394,7 +396,7 @@ Peer_Handler::recv (ACE_Message_Block *&mb) ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", - event->header_.supplier_id_, + event->header_.connection_id_, event->header_.len_, data_received + header_received)); if (Options::instance ()->enabled (Options::VERBOSE)) @@ -424,12 +426,12 @@ Peer_Handler::handle_input (ACE_HANDLE sd) // Action that receives our supplier id from the Gateway. int -Peer_Handler::await_supplier_id (void) +Peer_Handler::await_connection_id (void) { - ssize_t n = this->peer ().recv (&this->proxy_id_, - sizeof this->proxy_id_); + ssize_t n = this->peer ().recv (&this->connection_id_, + sizeof this->connection_id_); - if (n != sizeof this->proxy_id_) + if (n != sizeof this->connection_id_) { if (n == 0) ACE_ERROR_RETURN ((LM_ERROR, @@ -445,10 +447,10 @@ Peer_Handler::await_supplier_id (void) } else { - this->proxy_id_ = ntohl (this->proxy_id_); + this->connection_id_ = ntohl (this->connection_id_); ACE_DEBUG ((LM_DEBUG, - "assigned proxy id %d\n", - this->proxy_id_)); + "assigned connection id %d\n", + this->connection_id_)); } // Transition to the action that waits for Peer events. @@ -502,7 +504,7 @@ Peer_Handler::await_events (void) ACE_DEBUG ((LM_DEBUG, "route id = %d, cur len = %d, total len = %d\n", - event->header_.supplier_id_, + event->header_.connection_id_, event->header_.len_, this->total_bytes_)); if (Options::instance ()->enabled (Options::VERBOSE)) diff --git a/apps/Gateway/Peer/Peer.h b/apps/Gateway/Peer/Peer.h index fa78f786fde..084c2b6d3b8 100644 --- a/apps/Gateway/Peer/Peer.h +++ b/apps/Gateway/Peer/Peer.h @@ -117,10 +117,8 @@ protected: // Pointer-to-member-function for the current action to run in this // state. This points to one of the preceding 3 methods. - ACE_INT32 proxy_id_; - // Proxy ID of the peer, which is obtained from the gatewayd. For - // simplicity, in this implementation we also use the Proxy ID as - // the Supplier ID. This might change in future releases. + CONNECTION_ID connection_id_; + // Connection ID of the peer, which is obtained from the gatewayd. ACE_Message_Block *msg_frag_; // Keep track of event fragments that arrive in non-blocking recv's |