summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/LWFT_Service
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/LWFT_Service')
-rw-r--r--TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc50
-rw-r--r--TAO/orbsvcs/LWFT_Service/RMOptions.cpp110
-rw-r--r--TAO/orbsvcs/LWFT_Service/RMOptions.h56
-rw-r--r--TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp242
-rw-r--r--TAO/orbsvcs/LWFT_Service/host_monitor.cpp77
5 files changed, 535 insertions, 0 deletions
diff --git a/TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc b/TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc
new file mode 100644
index 00000000000..c3489a2cfe8
--- /dev/null
+++ b/TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc
@@ -0,0 +1,50 @@
+// -*- MPC -*-
+// $Id$
+
+project(ReplicationManager) : taoserver, naming, lwft_server {
+ after += LWFT_ReplicationManagerImpl
+ libs += LWFT_ReplicationManagerImpl
+ exename = ReplicationManager
+
+ IDL_Files {
+ }
+
+ Source_Files {
+ ReplicationManager_process.cpp
+ RMOptions.cpp
+ }
+
+ Header_Files {
+ RMOptions.h
+ }
+
+ Inline_Files {
+ }
+
+ Template_Files {
+ }
+}
+
+project(HostMonitor) : taoserver, portableserver, lwft_client {
+ after += LWFT_HostMonitor
+ libs += LWFT_Server \
+ LWFT_HostMonitor
+ exename = HostMonitor
+
+ IDL_Files {
+ }
+
+ Source_Files {
+ host_monitor.cpp
+ }
+
+ Header_Files {
+ }
+
+ Inline_Files {
+ }
+
+ Template_Files {
+ }
+}
+
diff --git a/TAO/orbsvcs/LWFT_Service/RMOptions.cpp b/TAO/orbsvcs/LWFT_Service/RMOptions.cpp
new file mode 100644
index 00000000000..0b95d4ef144
--- /dev/null
+++ b/TAO/orbsvcs/LWFT_Service/RMOptions.cpp
@@ -0,0 +1,110 @@
+// -*- C++ -*-
+// $Id$
+
+#include <sstream>
+
+#include "ace/Global_Macros.h"
+#include "ace/Guard_T.h"
+#include "ace/Log_Msg.h"
+#include "ace/Arg_Shifter.h"
+
+#include "RMOptions.h"
+
+/// Initialize the static data member.
+RMOptions * volatile RMOptions::instance_ = 0;
+ACE_Auto_Ptr<RMOptions> RMOptions::deleter_;
+ACE_Thread_Mutex RMOptions::lock_;
+
+RMOptions::RMOptions (void)
+ : hertz_ (0.2),
+ proactive_ (true),
+ static_mode_ (false),
+ use_naming_service_ (false)
+{
+}
+
+RMOptions *
+RMOptions::instance (void)
+{
+ if (instance_ == 0)
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, 0);
+
+ if (instance_ == 0)
+ {
+ instance_ = new RMOptions;
+ deleter_.reset (instance_);
+ }
+ }
+
+ return instance_;
+}
+
+bool
+RMOptions::parse_args (int &argc, char **argv)
+{
+ ACE_Arg_Shifter as (argc, argv);
+
+ while (as.is_anything_left ())
+ {
+ const ACE_TCHAR *arg = 0;
+
+ if (0 != (arg = as.get_the_parameter (ACE_TEXT ("-hertz"))))
+ {
+ std::istringstream istr (arg);
+
+ if (!(istr >> hertz_))
+ {
+ return false;
+ }
+
+ as.consume_arg ();
+ }
+ else if (as.cur_arg_strncasecmp (ACE_TEXT ("-proactive")) == 0)
+ {
+ proactive_ = true;
+ as.consume_arg ();
+ }
+ else if (as.cur_arg_strncasecmp (ACE_TEXT ("-static")) == 0)
+ {
+ static_mode_ = true;
+ as.consume_arg ();
+ }
+ else if (as.cur_arg_strncasecmp (ACE_TEXT ("-use_ns")) == 0)
+ {
+ use_naming_service_ = true;
+ as.consume_arg ();
+ }
+ else
+ {
+ as.ignore_arg ();
+ }
+ }
+
+ return true;
+};
+
+
+bool
+RMOptions::proactive (void) const
+{
+ return proactive_;
+}
+
+double
+RMOptions::hertz (void) const
+{
+ return hertz_;
+}
+
+bool
+RMOptions::static_mode (void) const
+{
+ return static_mode_;
+}
+
+bool
+RMOptions::use_naming_service (void) const
+{
+ return use_naming_service_;
+}
diff --git a/TAO/orbsvcs/LWFT_Service/RMOptions.h b/TAO/orbsvcs/LWFT_Service/RMOptions.h
new file mode 100644
index 00000000000..f5ed455bcc2
--- /dev/null
+++ b/TAO/orbsvcs/LWFT_Service/RMOptions.h
@@ -0,0 +1,56 @@
+// -*- C++ -*-
+// $Id$
+
+/**
+ * @file RMOptions.h
+ *
+ * @brief Declared the RMOptions class interface.
+ *
+ */
+
+#ifndef _RMOPTIONS_H
+#define _RMOPTIONS_H
+
+#include <string>
+
+#include "ace/Thread_Mutex.h"
+#include "ace/Auto_Ptr.h"
+
+/**
+ * @class RMOptions
+ *
+ * @brief Declares RMOptions singleton to hold the command line options.
+ */
+
+class RMOptions
+/// TITLE
+/// Singleton class for the program options.
+{
+public:
+ /// Singleton access method.
+ static RMOptions *instance (void);
+
+ bool parse_args (int &argc, char **argv);
+
+ /// Member accessors.
+ bool proactive (void) const;
+ double hertz (void) const;
+ bool static_mode (void) const;
+ bool use_naming_service (void) const;
+
+protected:
+ /// Constructor is protected to ensure Singleton access.
+ RMOptions (void);
+
+ double hertz_;
+ bool proactive_;
+ bool static_mode_;
+ bool use_naming_service_;
+
+ /// Singleton-related stuff.
+ static RMOptions * volatile instance_;
+ static ACE_Auto_Ptr<RMOptions> deleter_;
+ static ACE_Thread_Mutex lock_;
+};
+
+#endif /* _RMOPTIONS_H */
diff --git a/TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp b/TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp
new file mode 100644
index 00000000000..e9c046eee69
--- /dev/null
+++ b/TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp
@@ -0,0 +1,242 @@
+// -*- C++ -*-
+// $Id$
+
+#include "ace/Get_Opt.h"
+
+#include "orbsvcs/orbsvcs/Naming/Naming_Client.h"
+
+#include "orbsvcs/orbsvcs/LWFT/ForwardingAgent.h"
+#include "orbsvcs/orbsvcs/LWFT/ReplicationManager.h"
+#include "orbsvcs/orbsvcs/LWFT/LWFT_Server_Init.h"
+#include "orbsvcs/orbsvcs/LWFT/AppOptions.h"
+#include "orbsvcs/orbsvcs/LWFT/StateSyncAgentTask.h"
+
+#include "RMOptions.h"
+
+char *ior_output_file = "rm.ior";
+
+class RegistrationTask : public ACE_Task_Base
+{
+public:
+ RegistrationTask (ReplicationManager_ptr primary_rm,
+ ReplicationManager_ptr local_rm,
+ CORBA::ORB_ptr orb)
+ : primary_rm_ (ReplicationManager::_duplicate (primary_rm)),
+ local_rm_ (ReplicationManager::_duplicate (local_rm)),
+ orb_ (CORBA::ORB::_duplicate (orb))
+ {
+ }
+
+ int svc (void)
+ {
+ try
+ {
+ // register ReplicationManager with itself
+ primary_rm_->register_application (
+ local_rm_->object_id (),
+ 0.0,
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ AppOptions::instance ()->role (),
+ local_rm_.in ());
+ }
+ catch (CORBA::Exception &)
+ {
+ ACE_ERROR ((LM_ERROR, "RM: Could not register reference to "
+ " the ReplicationManager with the primary "
+ "ReplicationManager.\n"));
+ return 1;
+ }
+
+ return 0;
+ }
+
+private:
+ ReplicationManager_var primary_rm_;
+ ReplicationManager_var local_rm_;
+ CORBA::ORB_var orb_;
+};
+
+int
+main (int argc, char *argv[])
+{
+ try
+ {
+ CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
+
+ AppOptions::instance ()->parse_args (argc, argv);
+
+ CORBA::Object_var object =
+ orb->resolve_initial_references ("RootPOA");
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (object.in ());
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager ();
+
+ CORBA::PolicyList policies;
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_lifespan_policy (PortableServer::PERSISTENT);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_id_assignment_policy (PortableServer::USER_ID);
+
+ PortableServer::POA_var poa =
+ root_poa->create_POA ("Servant POA",
+ poa_manager.in (),
+ policies);
+
+ poa_manager->activate ();
+
+ // std::cout << "Before StateSyncAgent." << std::endl;
+
+ StateSynchronizationAgent_i* ssa_servant =
+ new StateSynchronizationAgent_i (
+ orb.in (),
+ AppOptions::instance ()->host_id (),
+ AppOptions::instance ()->process_id (),
+ !(AppOptions::instance ()->use_dds ()));
+
+ PortableServer::ServantBase_var owner_xfer_ssa (ssa_servant);
+
+ // create task for state synchronization agent
+ StateSyncAgentTask sync_agent_thread (orb.in (),
+ ssa_servant);
+
+ int result = sync_agent_thread.activate ();
+
+ if (result != 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "StateSyncAgentTask::activate () "
+ "returned %d, errno = %d\n",
+ result,
+ errno),
+ -1);
+ }
+
+ if (! RMOptions::instance ()->parse_args (argc, argv))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Replication Manager options "
+ "are incorrect.\n"),
+ -1);
+ }
+
+ ReplicationManager_i *rm_i = 0;
+ ACE_NEW_RETURN (rm_i,
+ ReplicationManager_i (orb.in (),
+ RMOptions::instance()->hertz(),
+ RMOptions::instance()->proactive(),
+ RMOptions::instance()->static_mode ()),
+ 1);
+
+ PortableServer::ServantBase_var owner_transfer (rm_i);
+
+ PortableServer::ObjectId_var oid =
+ PortableServer::string_to_ObjectId (rm_i->object_id ());
+
+ poa->activate_object_with_id (oid.in (), rm_i);
+
+ CORBA::Object_var rm_object = poa->id_to_reference (oid.in ());
+ ReplicationManager_var rm = ReplicationManager::_narrow (rm_object.in ());
+
+ CORBA::String_var ior =
+ orb->object_to_string (rm.in ());
+
+// ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ()));
+
+ if (ior_output_file != 0)
+ {
+ FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
+
+ if (output_file == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file "
+ "for writing IOR: %s",
+ ior_output_file),
+ 1);
+ }
+
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+ }
+
+ ReplicationManager_var primary_rm;
+
+ if (RMOptions::instance ()->use_naming_service ())
+ {
+ TAO_Naming_Client naming_client;
+ naming_client.init (orb.in ());
+
+ CosNaming::Name rm_name;
+ rm_name.length (1UL);
+ rm_name[0UL].id = "ReplicationManager";
+
+ if (AppOptions::instance ()->role () == PRIMARY)
+ {
+ try
+ {
+ naming_client->bind (rm_name, rm.in ());
+ }
+ catch (CosNaming::NamingContext::AlreadyBound &)
+ {
+ naming_client->rebind (rm_name, rm.in ());
+ }
+
+ primary_rm = ReplicationManager::_duplicate (rm.in ());
+
+ // ACE_DEBUG ((LM_INFO,
+ // "ReplicationManager registered with Naming Service\n"));
+ }
+ else
+ {
+ CORBA::Object_var rm_obj = naming_client->resolve (rm_name);
+
+ primary_rm = ReplicationManager::_narrow (rm_obj.in ());
+ }
+ }
+ else
+ {
+ primary_rm = ReplicationManager::_duplicate (rm.in ());
+ }
+
+ // add reference for state synchronization of the RM itself
+ rm_i->agent (sync_agent_thread.agent_ref ());
+
+ // register its own StateSynchronizationAgent
+ rm_i->register_state_synchronization_agent (
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ sync_agent_thread.agent_ref ());
+
+ sync_agent_thread.agent_ref ()->register_application (rm_i->object_id (),
+ rm.in ());
+
+ RegistrationTask registration_task (
+ primary_rm.in (),
+ rm.in (),
+ orb.in ());
+
+ if (registration_task.activate () == 0)
+ {
+ orb->run ();
+ }
+
+ // ACE_DEBUG ((LM_DEBUG, "(%P|%t) RM - event loop finished\n"));
+
+ poa->destroy (true, true);
+ orb->destroy ();
+ }
+ catch (CORBA::Exception &ex)
+ {
+ ex._tao_print_exception ("RM Process: caught exception:");
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/LWFT_Service/host_monitor.cpp b/TAO/orbsvcs/LWFT_Service/host_monitor.cpp
new file mode 100644
index 00000000000..66cfd22ad29
--- /dev/null
+++ b/TAO/orbsvcs/LWFT_Service/host_monitor.cpp
@@ -0,0 +1,77 @@
+// -*- C++ -*-
+// $Id$
+
+#include <iostream>
+#include <fstream>
+#include <string>
+#include <iterator>
+#include <algorithm>
+
+#include "orbsvcs/orbsvcs/LWFT/HostMonitorImpl.h"
+#include "orbsvcs/orbsvcs/LWFT/Monitor_Thread.h"
+#include "orbsvcs/orbsvcs/LWFT/HMOptions.h"
+#include "orbsvcs/orbsvcs/LWFT/LWFT_Client_Init.h"
+
+int main (int argc, char* argv[])
+{
+ try
+ {
+ /// First initialize the ORB, that will remove some arguments...
+ LWFT_Client_Init ci;
+ CORBA::ORB_var orb = ci.init (argc, argv);
+
+ /// Initilize RootPOA.
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references ("RootPOA");
+
+ /// Create the POA object reference to type POA.
+ PortableServer::POA_var poa =
+ PortableServer::POA::_narrow (poa_object.in ());
+
+ /// Activate the POA manager.
+ PortableServer::POAManager_var poa_manager =
+ poa->the_POAManager ();
+ poa_manager->activate ();
+
+ HMOptions *opts = HMOptions::instance ();
+
+ if (! opts->parse_args (argc, argv))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Host Monitor options "
+ "are incorrect.\n"),
+ -1);
+ }
+
+ Monitor_Thread monitor_thread;
+
+ /// Initilize the timedate object on heap.
+ HostMonitorImpl * host_monitor =
+ new HostMonitorImpl (orb, &monitor_thread);
+ PortableServer::ServantBase_var safe_host (host_monitor);
+ ACE_UNUSED_ARG (safe_host);
+
+ HostMonitor_var hmvar = host_monitor->_this ();
+ CORBA::String_var hmstr = orb->object_to_string (hmvar.in ());
+
+ /// Copy the IOR in the IORFILE.
+ std::string ior_file (opts->HM_ior_file ());
+ std::ofstream outfile (ior_file.c_str ());
+ outfile << hmstr;
+ outfile.close ();
+
+ monitor_thread.activate ();
+ orb->run ();
+
+ /// Destroy the POA, waiting until the destruction terminates
+ poa->destroy (true, true);
+ orb->destroy ();
+ }
+ catch (CORBA::Exception &ex)
+ {
+ ACE_PRINT_EXCEPTION (ex, "A CORBA exception was raised:");
+ return -1;
+ }
+
+ return 0;
+}