diff options
Diffstat (limited to 'TAO/orbsvcs/LWFT_Service')
-rw-r--r-- | TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc | 50 | ||||
-rw-r--r-- | TAO/orbsvcs/LWFT_Service/RMOptions.cpp | 110 | ||||
-rw-r--r-- | TAO/orbsvcs/LWFT_Service/RMOptions.h | 56 | ||||
-rw-r--r-- | TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp | 242 | ||||
-rw-r--r-- | TAO/orbsvcs/LWFT_Service/host_monitor.cpp | 77 |
5 files changed, 535 insertions, 0 deletions
diff --git a/TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc b/TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc new file mode 100644 index 00000000000..c3489a2cfe8 --- /dev/null +++ b/TAO/orbsvcs/LWFT_Service/LWFT_Service.mpc @@ -0,0 +1,50 @@ +// -*- MPC -*- +// $Id$ + +project(ReplicationManager) : taoserver, naming, lwft_server { + after += LWFT_ReplicationManagerImpl + libs += LWFT_ReplicationManagerImpl + exename = ReplicationManager + + IDL_Files { + } + + Source_Files { + ReplicationManager_process.cpp + RMOptions.cpp + } + + Header_Files { + RMOptions.h + } + + Inline_Files { + } + + Template_Files { + } +} + +project(HostMonitor) : taoserver, portableserver, lwft_client { + after += LWFT_HostMonitor + libs += LWFT_Server \ + LWFT_HostMonitor + exename = HostMonitor + + IDL_Files { + } + + Source_Files { + host_monitor.cpp + } + + Header_Files { + } + + Inline_Files { + } + + Template_Files { + } +} + diff --git a/TAO/orbsvcs/LWFT_Service/RMOptions.cpp b/TAO/orbsvcs/LWFT_Service/RMOptions.cpp new file mode 100644 index 00000000000..0b95d4ef144 --- /dev/null +++ b/TAO/orbsvcs/LWFT_Service/RMOptions.cpp @@ -0,0 +1,110 @@ +// -*- C++ -*- +// $Id$ + +#include <sstream> + +#include "ace/Global_Macros.h" +#include "ace/Guard_T.h" +#include "ace/Log_Msg.h" +#include "ace/Arg_Shifter.h" + +#include "RMOptions.h" + +/// Initialize the static data member. +RMOptions * volatile RMOptions::instance_ = 0; +ACE_Auto_Ptr<RMOptions> RMOptions::deleter_; +ACE_Thread_Mutex RMOptions::lock_; + +RMOptions::RMOptions (void) + : hertz_ (0.2), + proactive_ (true), + static_mode_ (false), + use_naming_service_ (false) +{ +} + +RMOptions * +RMOptions::instance (void) +{ + if (instance_ == 0) + { + ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, 0); + + if (instance_ == 0) + { + instance_ = new RMOptions; + deleter_.reset (instance_); + } + } + + return instance_; +} + +bool +RMOptions::parse_args (int &argc, char **argv) +{ + ACE_Arg_Shifter as (argc, argv); + + while (as.is_anything_left ()) + { + const ACE_TCHAR *arg = 0; + + if (0 != (arg = as.get_the_parameter (ACE_TEXT ("-hertz")))) + { + std::istringstream istr (arg); + + if (!(istr >> hertz_)) + { + return false; + } + + as.consume_arg (); + } + else if (as.cur_arg_strncasecmp (ACE_TEXT ("-proactive")) == 0) + { + proactive_ = true; + as.consume_arg (); + } + else if (as.cur_arg_strncasecmp (ACE_TEXT ("-static")) == 0) + { + static_mode_ = true; + as.consume_arg (); + } + else if (as.cur_arg_strncasecmp (ACE_TEXT ("-use_ns")) == 0) + { + use_naming_service_ = true; + as.consume_arg (); + } + else + { + as.ignore_arg (); + } + } + + return true; +}; + + +bool +RMOptions::proactive (void) const +{ + return proactive_; +} + +double +RMOptions::hertz (void) const +{ + return hertz_; +} + +bool +RMOptions::static_mode (void) const +{ + return static_mode_; +} + +bool +RMOptions::use_naming_service (void) const +{ + return use_naming_service_; +} diff --git a/TAO/orbsvcs/LWFT_Service/RMOptions.h b/TAO/orbsvcs/LWFT_Service/RMOptions.h new file mode 100644 index 00000000000..f5ed455bcc2 --- /dev/null +++ b/TAO/orbsvcs/LWFT_Service/RMOptions.h @@ -0,0 +1,56 @@ +// -*- C++ -*- +// $Id$ + +/** + * @file RMOptions.h + * + * @brief Declared the RMOptions class interface. + * + */ + +#ifndef _RMOPTIONS_H +#define _RMOPTIONS_H + +#include <string> + +#include "ace/Thread_Mutex.h" +#include "ace/Auto_Ptr.h" + +/** + * @class RMOptions + * + * @brief Declares RMOptions singleton to hold the command line options. + */ + +class RMOptions +/// TITLE +/// Singleton class for the program options. +{ +public: + /// Singleton access method. + static RMOptions *instance (void); + + bool parse_args (int &argc, char **argv); + + /// Member accessors. + bool proactive (void) const; + double hertz (void) const; + bool static_mode (void) const; + bool use_naming_service (void) const; + +protected: + /// Constructor is protected to ensure Singleton access. + RMOptions (void); + + double hertz_; + bool proactive_; + bool static_mode_; + bool use_naming_service_; + + /// Singleton-related stuff. + static RMOptions * volatile instance_; + static ACE_Auto_Ptr<RMOptions> deleter_; + static ACE_Thread_Mutex lock_; +}; + +#endif /* _RMOPTIONS_H */ diff --git a/TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp b/TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp new file mode 100644 index 00000000000..e9c046eee69 --- /dev/null +++ b/TAO/orbsvcs/LWFT_Service/ReplicationManager_process.cpp @@ -0,0 +1,242 @@ +// -*- C++ -*- +// $Id$ + +#include "ace/Get_Opt.h" + +#include "orbsvcs/orbsvcs/Naming/Naming_Client.h" + +#include "orbsvcs/orbsvcs/LWFT/ForwardingAgent.h" +#include "orbsvcs/orbsvcs/LWFT/ReplicationManager.h" +#include "orbsvcs/orbsvcs/LWFT/LWFT_Server_Init.h" +#include "orbsvcs/orbsvcs/LWFT/AppOptions.h" +#include "orbsvcs/orbsvcs/LWFT/StateSyncAgentTask.h" + +#include "RMOptions.h" + +char *ior_output_file = "rm.ior"; + +class RegistrationTask : public ACE_Task_Base +{ +public: + RegistrationTask (ReplicationManager_ptr primary_rm, + ReplicationManager_ptr local_rm, + CORBA::ORB_ptr orb) + : primary_rm_ (ReplicationManager::_duplicate (primary_rm)), + local_rm_ (ReplicationManager::_duplicate (local_rm)), + orb_ (CORBA::ORB::_duplicate (orb)) + { + } + + int svc (void) + { + try + { + // register ReplicationManager with itself + primary_rm_->register_application ( + local_rm_->object_id (), + 0.0, + AppOptions::instance ()->host_id ().c_str (), + AppOptions::instance ()->process_id ().c_str (), + AppOptions::instance ()->role (), + local_rm_.in ()); + } + catch (CORBA::Exception &) + { + ACE_ERROR ((LM_ERROR, "RM: Could not register reference to " + " the ReplicationManager with the primary " + "ReplicationManager.\n")); + return 1; + } + + return 0; + } + +private: + ReplicationManager_var primary_rm_; + ReplicationManager_var local_rm_; + CORBA::ORB_var orb_; +}; + +int +main (int argc, char *argv[]) +{ + try + { + CORBA::ORB_var orb = CORBA::ORB_init (argc, argv); + + AppOptions::instance ()->parse_args (argc, argv); + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA"); + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (object.in ()); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (); + + CORBA::PolicyList policies; + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_lifespan_policy (PortableServer::PERSISTENT); + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_id_assignment_policy (PortableServer::USER_ID); + + PortableServer::POA_var poa = + root_poa->create_POA ("Servant POA", + poa_manager.in (), + policies); + + poa_manager->activate (); + + // std::cout << "Before StateSyncAgent." << std::endl; + + StateSynchronizationAgent_i* ssa_servant = + new StateSynchronizationAgent_i ( + orb.in (), + AppOptions::instance ()->host_id (), + AppOptions::instance ()->process_id (), + !(AppOptions::instance ()->use_dds ())); + + PortableServer::ServantBase_var owner_xfer_ssa (ssa_servant); + + // create task for state synchronization agent + StateSyncAgentTask sync_agent_thread (orb.in (), + ssa_servant); + + int result = sync_agent_thread.activate (); + + if (result != 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "StateSyncAgentTask::activate () " + "returned %d, errno = %d\n", + result, + errno), + -1); + } + + if (! RMOptions::instance ()->parse_args (argc, argv)) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Replication Manager options " + "are incorrect.\n"), + -1); + } + + ReplicationManager_i *rm_i = 0; + ACE_NEW_RETURN (rm_i, + ReplicationManager_i (orb.in (), + RMOptions::instance()->hertz(), + RMOptions::instance()->proactive(), + RMOptions::instance()->static_mode ()), + 1); + + PortableServer::ServantBase_var owner_transfer (rm_i); + + PortableServer::ObjectId_var oid = + PortableServer::string_to_ObjectId (rm_i->object_id ()); + + poa->activate_object_with_id (oid.in (), rm_i); + + CORBA::Object_var rm_object = poa->id_to_reference (oid.in ()); + ReplicationManager_var rm = ReplicationManager::_narrow (rm_object.in ()); + + CORBA::String_var ior = + orb->object_to_string (rm.in ()); + +// ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ())); + + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + + if (output_file == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file " + "for writing IOR: %s", + ior_output_file), + 1); + } + + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + ReplicationManager_var primary_rm; + + if (RMOptions::instance ()->use_naming_service ()) + { + TAO_Naming_Client naming_client; + naming_client.init (orb.in ()); + + CosNaming::Name rm_name; + rm_name.length (1UL); + rm_name[0UL].id = "ReplicationManager"; + + if (AppOptions::instance ()->role () == PRIMARY) + { + try + { + naming_client->bind (rm_name, rm.in ()); + } + catch (CosNaming::NamingContext::AlreadyBound &) + { + naming_client->rebind (rm_name, rm.in ()); + } + + primary_rm = ReplicationManager::_duplicate (rm.in ()); + + // ACE_DEBUG ((LM_INFO, + // "ReplicationManager registered with Naming Service\n")); + } + else + { + CORBA::Object_var rm_obj = naming_client->resolve (rm_name); + + primary_rm = ReplicationManager::_narrow (rm_obj.in ()); + } + } + else + { + primary_rm = ReplicationManager::_duplicate (rm.in ()); + } + + // add reference for state synchronization of the RM itself + rm_i->agent (sync_agent_thread.agent_ref ()); + + // register its own StateSynchronizationAgent + rm_i->register_state_synchronization_agent ( + AppOptions::instance ()->host_id ().c_str (), + AppOptions::instance ()->process_id ().c_str (), + sync_agent_thread.agent_ref ()); + + sync_agent_thread.agent_ref ()->register_application (rm_i->object_id (), + rm.in ()); + + RegistrationTask registration_task ( + primary_rm.in (), + rm.in (), + orb.in ()); + + if (registration_task.activate () == 0) + { + orb->run (); + } + + // ACE_DEBUG ((LM_DEBUG, "(%P|%t) RM - event loop finished\n")); + + poa->destroy (true, true); + orb->destroy (); + } + catch (CORBA::Exception &ex) + { + ex._tao_print_exception ("RM Process: caught exception:"); + return -1; + } + + return 0; +} diff --git a/TAO/orbsvcs/LWFT_Service/host_monitor.cpp b/TAO/orbsvcs/LWFT_Service/host_monitor.cpp new file mode 100644 index 00000000000..66cfd22ad29 --- /dev/null +++ b/TAO/orbsvcs/LWFT_Service/host_monitor.cpp @@ -0,0 +1,77 @@ +// -*- C++ -*- +// $Id$ + +#include <iostream> +#include <fstream> +#include <string> +#include <iterator> +#include <algorithm> + +#include "orbsvcs/orbsvcs/LWFT/HostMonitorImpl.h" +#include "orbsvcs/orbsvcs/LWFT/Monitor_Thread.h" +#include "orbsvcs/orbsvcs/LWFT/HMOptions.h" +#include "orbsvcs/orbsvcs/LWFT/LWFT_Client_Init.h" + +int main (int argc, char* argv[]) +{ + try + { + /// First initialize the ORB, that will remove some arguments... + LWFT_Client_Init ci; + CORBA::ORB_var orb = ci.init (argc, argv); + + /// Initilize RootPOA. + CORBA::Object_var poa_object = + orb->resolve_initial_references ("RootPOA"); + + /// Create the POA object reference to type POA. + PortableServer::POA_var poa = + PortableServer::POA::_narrow (poa_object.in ()); + + /// Activate the POA manager. + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (); + poa_manager->activate (); + + HMOptions *opts = HMOptions::instance (); + + if (! opts->parse_args (argc, argv)) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Host Monitor options " + "are incorrect.\n"), + -1); + } + + Monitor_Thread monitor_thread; + + /// Initilize the timedate object on heap. + HostMonitorImpl * host_monitor = + new HostMonitorImpl (orb, &monitor_thread); + PortableServer::ServantBase_var safe_host (host_monitor); + ACE_UNUSED_ARG (safe_host); + + HostMonitor_var hmvar = host_monitor->_this (); + CORBA::String_var hmstr = orb->object_to_string (hmvar.in ()); + + /// Copy the IOR in the IORFILE. + std::string ior_file (opts->HM_ior_file ()); + std::ofstream outfile (ior_file.c_str ()); + outfile << hmstr; + outfile.close (); + + monitor_thread.activate (); + orb->run (); + + /// Destroy the POA, waiting until the destruction terminates + poa->destroy (true, true); + orb->destroy (); + } + catch (CORBA::Exception &ex) + { + ACE_PRINT_EXCEPTION (ex, "A CORBA exception was raised:"); + return -1; + } + + return 0; +} |