summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Gateway.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-01 08:00:34 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-01 08:00:34 +0000
commitea0d28240863caf437a18071bfd03e7b146c5ade (patch)
tree91b695852b885a5f44f9be8c3a22bbf7f5a96b8d /apps/Gateway/Gateway/Gateway.cpp
parenta6e2ced2f5279e011b712995095a1712a29e22f0 (diff)
downloadATCD-ea0d28240863caf437a18071bfd03e7b146c5ade.tar.gz
foo
Diffstat (limited to 'apps/Gateway/Gateway/Gateway.cpp')
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp231
1 files changed, 223 insertions, 8 deletions
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index 2c963ff3d7f..4ff09aed1b7 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -1,6 +1,8 @@
/* -*- C++ -*- */
// $Id$
+#include "ace/Get_Opt.h"
+#include "Config_Files.h"
#include "ace/Service_Config.h"
#include "Event_Channel.h"
#include "Gateway.h"
@@ -24,6 +26,12 @@ public:
virtual int info (char **, size_t) const;
// Return info about this service.
+ int parse_connection_config_file (void);
+ // Parse the connection configuration file.
+
+ int parse_consumer_config_file (void);
+ // Parse the consumer configuration file.
+
protected:
int handle_input (ACE_HANDLE);
// Shut down the Gateway when input comes in from the controlling
@@ -36,13 +44,21 @@ protected:
// Parse gateway configuration arguments obtained from svc.conf
// file.
- ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_;
+ char connection_config_file_[MAXPATHLEN + 1];
+ // Name of the connection configuration file.
+
+ char consumer_config_file_[MAXPATHLEN + 1];
+ // Name of the consumer map configuration file.
+
+ ACE_Event_Channel event_channel_;
// The Event Channel routes events from Supplier(s) to Consumer(s).
-};
-// Convenient shorthands.
-// #define IC SUPPLIER_HANDLER
-// #define OC CONSUMER_HANDLER
+ int active_connector_role_;
+ // Enabled if we are playing the role of the active Connector.
+
+ int debug_;
+ // Are we debugging?
+};
int
Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *)
@@ -70,18 +86,76 @@ Gateway::handle_input (ACE_HANDLE h)
return this->handle_signal (h);
}
+// Parse the "command-line" arguments and set the corresponding flags.
+
+int
+Gateway::parse_args (int argc, char *argv[])
+{
+ ACE_OS::strcpy (this->connection_config_file_, "connection_config");
+ ACE_OS::strcpy (this->consumer_config_file_, "consumer_config");
+ this->active_connector_role_ = 1;
+ this->debug_ = 0;
+
+ ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0);
+
+ for (int c; (c = get_opt ()) != -1; )
+ {
+ switch (c)
+ {
+ case 'b': // Use blocking connection establishment.
+ this->event_channel_.options ().blocking_semantics_ = 0;
+ break;
+ case 'c':
+ ACE_OS::strncpy (this->connection_config_file_,
+ get_opt.optarg,
+ sizeof this->connection_config_file_);
+ break;
+ case 'd':
+ this->debug_ = 1;
+ break;
+ case 'p':
+ // We are not playing the active Connector role.
+ this->active_connector_role_ = 0;
+ break;
+ case 'q':
+ this->event_channel_.options ().socket_queue_size_ =
+ ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'r':
+ ACE_OS::strncpy (this->consumer_config_file_,
+ get_opt.optarg,
+ sizeof this->consumer_config_file_);
+ break;
+ case 'w': // Time performance for a designated amount of time.
+ this->event_channel_.options ().performance_window_ =
+ ACE_OS::atoi (get_opt.optarg);
+ // Use blocking connection semantics so that we get accurate
+ // timings (since all connections start at once).
+ this->event_channel_.options ().blocking_semantics_ = 0;
+ break;
+ default:
+ break;
+ }
+ }
+ return 0;
+}
+
int
Gateway::init (int argc, char *argv[])
{
- if (this->event_channel_.open (argc, argv) == -1)
+ // Initialize the Event_Channel.
+ if (this->event_channel_.open () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1);
+ // Parse the "command-line" arguments.
+ this->parse_args (argc, argv);
+
ACE_Sig_Set sig_set;
sig_set.sig_add (SIGINT);
sig_set.sig_add (SIGQUIT);
- // Register ourselves to receive SIGINT and SIGQUIT
- // so we can shut down gracefully via signals.
+ // Register ourselves to receive SIGINT and SIGQUIT so we can shut
+ // down gracefully via signals.
if (ACE_Service_Config::reactor ()->register_handler
(sig_set, this) == -1)
@@ -90,6 +164,20 @@ Gateway::init (int argc, char *argv[])
if (ACE_Service_Config::reactor ()->register_handler
(0, this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+
+ if (this->active_connector_role_)
+ {
+ // Parse the connection configuration file.
+ this->parse_connection_config_file ();
+
+ // Parse the consumer map config file and build the consumer
+ // map.
+ this->parse_consumer_config_file ();
+
+ // Initiate connections with the peers.
+ this->event_channel_.initiate_connections ();
+ }
+
return 0;
}
@@ -120,6 +208,133 @@ Gateway::info (char **strp, size_t length) const
return ACE_OS::strlen (buf);
}
+// Parse and build the connection table.
+
+int
+Gateway::parse_connection_config_file (void)
+{
+ // File that contains the consumer map configuration information.
+ Connection_Config_File_Parser connection_file;
+ Connection_Config_File_Entry entry;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (connection_file.open (this->connection_config_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1);
+
+ // Read config file one line at a time.
+ while (connection_file.read_entry (entry, line_number) != FP::EOFILE)
+ {
+ file_empty = 0;
+
+ if (this->debug_)
+ ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, "
+ "proxy role = %c, max retry timeout = %d, local port = %d\n",
+ entry.conn_id_,
+ entry.host_,
+ entry.remote_port_,
+ entry.proxy_role_,
+ entry.max_retry_delay_,
+ entry.local_port_));
+
+ Proxy_Handler *proxy_handler = 0;
+
+ // Initialize the entry's peer addressing info.
+
+ ACE_INET_Addr remote_addr (entry.remote_port_, entry.host_);
+ ACE_INET_Addr local_addr (entry.local_port_);
+
+ // The next few lines of code are dependent on whether we are
+ // making an Supplier_Proxy or an Consumer_Proxy.
+
+ if (entry.proxy_role_ == 'C') // Configure a Consumer_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ CONSUMER_PROXY (this->event_channel_, remote_addr,
+ local_addr, entry.conn_id_),
+ -1);
+ else // proxy_role == 'S', so configure a Supplier_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ SUPPLIER_PROXY (this->event_channel_, remote_addr,
+ local_addr, entry.conn_id_),
+ -1);
+
+ // The following code is common to both Supplier_Proxys_ and
+ // Consumer_Proxys.
+
+ // Initialize max timeout.
+ proxy_handler->max_timeout (entry.max_retry_delay_);
+
+ // Bind the new Proxy_Handler to the connection ID.
+ this->event_channel_.bind_proxy (proxy_handler);
+ }
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ "warning: connection proxy_handler configuration file was empty\n"));
+ return 0;
+}
+
+int
+Gateway::parse_consumer_config_file (void)
+{
+ // File that contains the consumer event forwarding information.
+ Consumer_Config_File_Parser consumer_file;
+ Consumer_Config_File_Entry entry;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (consumer_file.open (this->consumer_config_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1);
+
+ // Read config file line at a time.
+ while (consumer_file.read_entry (entry, line_number) != FP::EOFILE)
+ {
+ file_empty = 0;
+
+ if (this->debug_)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
+ "number of consumers = %d\n",
+ entry.conn_id_,
+ entry.supplier_id_,
+ entry.type_,
+ entry.total_consumers_));
+ for (int i = 0; i < entry.total_consumers_; i++)
+ ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
+ i, entry.consumers_[i]));
+ }
+
+ Consumer_Dispatch_Set *dispatch_set;
+ ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1);
+
+ Event_Key event_addr (entry.conn_id_,
+ entry.supplier_id_,
+ entry.type_);
+
+ // Add the Consumers to the Dispatch_Set.
+ for (int i = 0; i < entry.total_consumers_; i++)
+ {
+ Proxy_Handler *proxy_handler = 0;
+
+ // Lookup destination and add to Consumer_Dispatch_Set set
+ // if found.
+ if (this->event_channel_.find_proxy (entry.consumers_[i],
+ proxy_handler) != -1)
+ dispatch_set->insert (proxy_handler);
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
+ i, entry.consumers_[i]));
+ }
+
+ this->event_channel_.subscribe (event_addr, dispatch_set);
+ }
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ "warning: consumer map configuration file was empty\n"));
+ return 0;
+}
+
// The following is a "Factory" used by the ACE_Service_Config and
// svc.conf file to dynamically initialize the state of the Gateway.