summaryrefslogtreecommitdiff
path: root/ACE/apps/Gateway/Gateway/Gateway.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/apps/Gateway/Gateway/Gateway.cpp')
-rw-r--r--ACE/apps/Gateway/Gateway/Gateway.cpp338
1 files changed, 338 insertions, 0 deletions
diff --git a/ACE/apps/Gateway/Gateway/Gateway.cpp b/ACE/apps/Gateway/Gateway/Gateway.cpp
new file mode 100644
index 00000000000..7f14f27603e
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Gateway.cpp
@@ -0,0 +1,338 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/Service_Config.h"
+#include "ace/Signal.h"
+#include "Config_Files.h"
+#include "Event_Channel.h"
+#include "Gateway.h"
+
+class ACE_Svc_Export Gateway : public ACE_Service_Object
+{
+ // = TITLE
+ // Integrates the whole Gateway application.
+ //
+ // = DESCRIPTION
+ // This implementation uses the <Event_Channel> as the basis
+ // for the <Gateway> routing.
+protected:
+ // = Service configurator hooks.
+ virtual int init (int argc, ACE_TCHAR *argv[]);
+ // Perform initialization.
+
+ virtual int fini (void);
+ // Perform termination when unlinked dynamically.
+
+ virtual int info (ACE_TCHAR **, size_t) const;
+ // Return info about this service.
+
+ // = Configuration methods.
+ int parse_connection_config_file (void);
+ // Parse the proxy configuration file.
+
+ int parse_consumer_config_file (void);
+ // Parse the consumer configuration file.
+
+ // = Lifecycle management methods.
+ int handle_input (ACE_HANDLE);
+ // Shut down the Gateway when input comes in from the controlling
+ // console.
+
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ // Shut down the Gateway when a signal arrives.
+
+ Event_Channel event_channel_;
+ // The Event Channel routes events from Supplier(s) to Consumer(s)
+ // using <Supplier_Handler> and <Consumer_Handler> objects.
+
+ Connection_Handler_Factory connection_handler_factory_;
+ // Creates the appropriate type of <Connection_Handlers>.
+};
+
+int
+Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *)
+{
+ ACE_UNUSED_ARG (signum);
+
+ // Shut down the main event loop.
+ ACE_Reactor::end_event_loop ();
+ return 0;
+}
+
+int
+Gateway::handle_input (ACE_HANDLE h)
+{
+ char buf[BUFSIZ];
+ // Consume the input...
+ ACE_OS::read (h, buf, sizeof (buf));
+
+ // Shut us down.
+ return this->handle_signal ((int) h);
+}
+
+int
+Gateway::init (int argc, ACE_TCHAR *argv[])
+{
+ // Parse the "command-line" arguments.
+ Options::instance ()->parse_args (argc, argv);
+
+ ACE_Sig_Set sig_set;
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ // Register ourselves to receive signals so we can shut down
+ // gracefully.
+
+ if (ACE_Reactor::instance ()->register_handler (sig_set,
+ this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("register_handler")),
+ -1);
+
+ // Register this handler to receive events on stdin. We use this to
+ // shutdown the Gateway gracefully.
+ if (ACE_Event_Handler::register_stdin_handler (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("register_stdin_handler")),
+ -1);
+
+ // If this->performance_window_ > 0 start a timer.
+
+ if (Options::instance ()->performance_window () > 0)
+ {
+ ACE_Time_Value const performance_time (Options::instance ()->performance_window ());
+ if (ACE_Reactor::instance ()->schedule_timer
+ (&this->event_channel_, 0,
+ performance_time) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("schedule_timer")));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("starting timer for %d seconds...\n"),
+ Options::instance ()->performance_window ()));
+ }
+
+ // Are we running as a connector?
+ if (Options::instance ()->enabled
+ (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
+ {
+ // Parse the proxy configuration file.
+ this->parse_connection_config_file ();
+
+ // Parse the consumer config file and build the event forwarding
+ // discriminator.
+ this->parse_consumer_config_file ();
+ }
+
+ // Initialize the Event_Channel.
+ return this->event_channel_.open ();
+}
+
+// This method is automatically called when the Gateway is shutdown.
+
+int
+Gateway::fini (void)
+{
+ // Remove the handler that receive events on stdin. Otherwise, we
+ // will crash on shutdown.
+ ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ());
+
+ // Close down the event channel.
+ this->event_channel_.close ();
+
+ // Need to make sure we cleanup this Singleton.
+ delete Options::instance ();
+ return 0;
+}
+
+// Returns information on the currently active service.
+
+int
+Gateway::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+
+ ACE_OS::strcpy
+ (buf, ACE_TEXT ("Gateway daemon\t # Application-level gateway\n"));
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, buf, length);
+ return ACE_OS::strlen (buf);
+}
+
+// Parse and build the proxy table.
+
+int
+Gateway::parse_connection_config_file (void)
+{
+ // File that contains the proxy configuration information.
+ Connection_Config_File_Parser connection_file;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (connection_file.open (Options::instance ()->connection_config_file ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ Options::instance ()->connection_config_file ()),
+ -1);
+
+ // Keep track of the previous connection id to make sure the
+ // connection config file isn't corrupted.
+ int previous_connection_id = 0;
+
+ // Read config file one line at a time.
+
+ for (Connection_Config_Info pci;
+ connection_file.read_entry (pci, line_number) != FPRT::RT_EOFILE;
+ )
+ {
+ file_empty = 0;
+
+ // First time in check.
+ if (previous_connection_id == 0)
+ {
+ previous_connection_id = 1;
+
+ if (pci.connection_id_ != 1)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) warning, the first connection id should be 1 not %d\n"),
+ pci.connection_id_));
+ }
+ else if (previous_connection_id + 1 != pci.connection_id_)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) warning, connection ids should keep increasing by 1 and %d + 1 != %d\n"),
+ previous_connection_id,
+ pci.connection_id_));
+
+ // Update the last connection id to ensure that we monotonically
+ // increase by 1.
+ previous_connection_id = pci.connection_id_;
+
+ if (Options::instance ()->enabled (Options::DEBUGGING))
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) conn id = %d, ")
+ ACE_TEXT ("host = %s, ")
+ ACE_TEXT ("remote port = %d, ")
+ ACE_TEXT ("proxy role = %c, ")
+ ACE_TEXT ("max retry timeout = %d, ")
+ ACE_TEXT ("local port = %d, ")
+ ACE_TEXT ("priority = %d\n"),
+ pci.connection_id_,
+ pci.host_,
+ pci.remote_port_,
+ pci.connection_role_,
+ pci.max_retry_timeout_,
+ pci.local_port_,
+ pci.priority_));
+
+ pci.event_channel_ = &this->event_channel_;
+
+ // Create the appropriate type of Proxy.
+ Connection_Handler *connection_handler;
+
+ ACE_ALLOCATOR_RETURN (connection_handler,
+ this->connection_handler_factory_.make_connection_handler (pci),
+ -1);
+
+ // Bind the new Connection_Handler to the connection ID.
+ this->event_channel_.bind_proxy (connection_handler);
+ }
+
+ // Keep track of the next available connection id, which is
+ // necessary for Peers that connect with us, rather than vice versa.
+ Options::instance ()->connection_id () = previous_connection_id + 1;
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("warning: connection connection_handler configuration file was empty\n")));
+ return 0;
+}
+
+int
+Gateway::parse_consumer_config_file (void)
+{
+ // File that contains the consumer event forwarding information.
+ Consumer_Config_File_Parser consumer_file;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (consumer_file.open (Options::instance ()->consumer_config_file ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ Options::instance ()->consumer_config_file ()),
+ -1);
+
+ // Read config file line at a time.
+ for (Consumer_Config_Info cci_entry;
+ consumer_file.read_entry (cci_entry, line_number) != FPRT::RT_EOFILE;
+ )
+ {
+ file_empty = 0;
+
+ if (Options::instance ()->enabled (Options::DEBUGGING))
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) connection id = %d, payload = %d, ")
+ ACE_TEXT ("number of consumers = %d\n"),
+ cci_entry.connection_id_,
+ cci_entry.type_,
+ cci_entry.total_consumers_));
+
+ for (int i = 0; i < cci_entry.total_consumers_; i++)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) destination[%d] = %d\n"),
+ i,
+ cci_entry.consumers_[i]));
+ }
+
+ Consumer_Dispatch_Set *dispatch_set;
+ ACE_NEW_RETURN (dispatch_set,
+ Consumer_Dispatch_Set,
+ -1);
+
+ Event_Key event_addr (cci_entry.connection_id_,
+ cci_entry.type_);
+
+ // Add the Consumers to the Dispatch_Set.
+ for (int i = 0; i < cci_entry.total_consumers_; i++)
+ {
+ Connection_Handler *connection_handler = 0;
+
+ // Lookup destination and add to Consumer_Dispatch_Set set
+ // if found.
+ if (this->event_channel_.find_proxy (cci_entry.consumers_[i],
+ connection_handler) != -1)
+ dispatch_set->insert (connection_handler);
+ else
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) not found: destination[%d] = %d\n"),
+ i,
+ cci_entry.consumers_[i]));
+ }
+
+ this->event_channel_.subscribe (event_addr, dispatch_set);
+ }
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("warning: consumer map configuration file was empty\n")));
+ return 0;
+}
+
+// The following is a "Factory" used by the ACE_Service_Config and
+// svc.conf file to dynamically initialize the state of the Gateway.
+
+ACE_SVC_FACTORY_DEFINE (Gateway)
+