diff options
author | jai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-05-26 16:24:27 +0000 |
---|---|---|
committer | jai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-05-26 16:24:27 +0000 |
commit | 0cbddee90ca2d09a0c1b730bf3a8395637fd370c (patch) | |
tree | 8e8a0ac7e6b45d8a28f4926b30447f217167aa73 | |
parent | 0862d92f43a7ad92b18c014a37d75c32662c4f41 (diff) | |
download | ATCD-0cbddee90ca2d09a0c1b730bf3a8395637fd370c.tar.gz |
added FLARe directory
12 files changed, 1912 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppOptions.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppOptions.cpp new file mode 100644 index 00000000000..a9702009de5 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppOptions.cpp @@ -0,0 +1,117 @@ +#include <unistd.h> +#include <stdlib.h> +#include <iostream> +#include <sstream> + +#include "AppOptions.h" +#include "ace/Global_Macros.h" +#include "ace/Guard_T.h" +#include "ace/Log_Msg.h" +#include "ace/Get_Opt.h" + +/// Initialize the static data member. +AppOptions * volatile AppOptions::instance_ = 0; +std::auto_ptr <AppOptions> AppOptions::deleter_; +ACE_Thread_Mutex AppOptions::lock_; + +AppOptions::AppOptions (void) + : host_monitor_ior_ ("file://HostMonitor.ior"), + port_ (5000), + arg_pair_ (0,0) +{ + char hostname [100]; + gethostname (hostname, sizeof (hostname)); + host_id_ = hostname; + ACE_DEBUG((LM_DEBUG,"Hostname is %s.\n",hostname)); +} + +AppOptions *AppOptions::instance (void) +{ + if (! instance_) + { + ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, 0); + if (! instance_) + { + instance_ = new AppOptions (); + deleter_.reset (instance_); + } + } + return instance_; +} + +bool +AppOptions::parse_args (int argc, char **argv) +{ + bool retval = true; + this->arg_pair_ = ArgPair (argc, argv); + + ACE_Get_Opt get_opts (argc, argv, "-k:z:i:p:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'z': + { + std::istringstream istr (get_opts.opt_arg ()); + if (!(istr >> port_)) + return false; + break; + } + case 'k': + { + host_monitor_ior_ = std::string (get_opts.opt_arg ()); + break; + } +/* case 'd': + { + host_id_ = std::string (get_opts.opt_arg ()); + break; + } +*/ case 'i': + { + object_info_file_ = std::string (get_opts.opt_arg ()); + break; + } + case 'p': + { + process_id_ = std::string (get_opts.opt_arg ()); + break; + } + } + return retval; +} + +ArgPair AppOptions::arg_pair () const +{ + return this->arg_pair_; +} +std::string AppOptions::ior_output_file () const +{ + return ior_output_file_; +} + +std::string AppOptions::object_info_file () const +{ + return object_info_file_; +} + +std::string AppOptions::process_id () const +{ + return process_id_; +} + +std::string AppOptions::host_monitor_ior () const +{ + return host_monitor_ior_; +} + +size_t AppOptions::get_port () const +{ + return port_; +} + +std::string AppOptions::host_id () const +{ + return host_id_; +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Handler.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Handler.cpp new file mode 100644 index 00000000000..104d3361819 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Handler.cpp @@ -0,0 +1,41 @@ +/** + * @file C++ Implementation: AppSideMonitor_Handler + * + * @brief Defines implementation of AppSideMonitor_Handler. + * + */ + +#include "AppSideMonitor_Handler.h" +#include <iostream> + +AppSideMonitor_Handler::AppSideMonitor_Handler () +: acceptor_ (0) +{ + int *i = 0; + std::cout << *i; + std::cerr << "AppSideMonitor_Handler::AppSideMonitor_Handler\n"; +} + +int AppSideMonitor_Handler::handle_input (ACE_HANDLE) +{ + char ch; + + if (this->peer().recv (&ch, sizeof (ch) <= 0)) + { + ACE_DEBUG((LM_DEBUG,"It looks like the monitor failed!\n")); +// acceptor_->open (); + } + else + ACE_DEBUG((LM_DEBUG,"It looks like the monitor is misbehaving!\n")); + + return -1; +} + +int AppSideMonitor_Handler::open (void *factory) +{ + ACE_DEBUG((LM_DEBUG,"AppSideMonitor_Handler::open\n")); + acceptor_ = static_cast <FactoryAcceptor *> (factory); +// acceptor_->close(); + return super::open (factory); +} + diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Thread.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Thread.cpp new file mode 100644 index 00000000000..df4059ab5ae --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Thread.cpp @@ -0,0 +1,48 @@ +/** + * @file C++ Implementation: AppSideMonitor_Thread + * + * @brief Defines implementation of AppSideMonitor_Thread. + * + */ + +#include "AppSideMonitor_Thread.h" +#include "AppOptions.h" + +#include "ace/TP_Reactor.h" + +AppSideMonitor_Thread::AppSideMonitor_Thread (ACE_Barrier *thread_barrier) +: port_ (AppOptions::instance()->get_port()), + reactor_ (new ACE_TP_Reactor), + acceptor_ (serv_addr_, &reactor_), + synchronizer_ (thread_barrier) +{ +} + +int AppSideMonitor_Thread::svc (void) +{ + if (serv_addr_.set (this->port_, INADDR_ANY) == -1) + { + ACE_DEBUG ((LM_ERROR, "Can't set port.\n")); + return EXIT_FAILURE; + } + if (acceptor_.open (serv_addr_) == -1) + { + ACE_DEBUG ((LM_DEBUG, "The Acceptor can't open the socket.\n")); + return EXIT_FAILURE; + } + + this->synchronizer_->wait (); + this->synchronizer_ = 0; + + //ACE_DEBUG ((LM_DEBUG, "Entering reactor event loop.\n")); + if (reactor_.run_reactor_event_loop() == -1) + ACE_ERROR_RETURN ((LM_ERROR,"run_reactor_event_loop failed\n"), -1); + + return 0; +} + +void AppSideMonitor_Thread::stop () +{ + if (reactor_.end_reactor_event_loop() == -1) + ACE_DEBUG((LM_ERROR,"end_reactor_event_loop failed\n")); +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideReg.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideReg.cpp new file mode 100644 index 00000000000..d2f9bc3fca4 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideReg.cpp @@ -0,0 +1,121 @@ +/** + * @file C++ Implementation: AppSideReg + * + * @brief Defines implementation of AppSideReg. + * + */ + +#include "AppSideReg.h" +#include "AppOptions.h" +#include "monitorC.h" +#include "ArgPair.h" +#include "ace/Barrier.h" + +#include <sstream> +#include <stdexcept> +#include <algorithm> + +AppSideReg::AppSideReg(ACE_Barrier *ext_barrier, CORBA::ORB_ptr orb) + : HM_ior_ (AppOptions::instance ()->host_monitor_ior()), + orb_ (CORBA::ORB::_duplicate (orb)), + external_barrier_ (ext_barrier) +{ +} + + +AppSideReg::~AppSideReg() +{ + monitor_->stop (); + orb_->destroy (); +} + +void AppSideReg::unregister_process (void) +{ + hmvar_->unregister_process (AppOptions::instance()->process_id().c_str()); +} + +int AppSideReg::svc(void) +{ + try + { + //ACE_DEBUG ((LM_DEBUG, "Entering svc ()\n")); + //ArgPair arg_pair = AppOptions::instance()->arg_pair (); + //std::copy (arg_pair.argv, arg_pair.argv + arg_pair.argc, + // std::ostream_iterator<std::string> (std::cout,"\n")); + //this->orb_ = CORBA::ORB_init (arg_pair.argc, arg_pair.argv, "ORB"); + //std::copy (arg_pair.argv, arg_pair.argv + arg_pair.argc, + // std::ostream_iterator<std::string> (std::cout,"\n")); + CORBA::Object_var obj = orb_->string_to_object (HM_ior_.c_str()); + + //ACE_DEBUG ((LM_DEBUG, "Obtained HM IOR\n")); + + if (CORBA::is_nil (obj)) + { + ACE_DEBUG((LM_ERROR, "Nil Reference\n")); + return 1; + } + + /// Downcast the object reference to a reference of type HostMonitor. + this->hmvar_ = HostMonitor::_narrow (obj); + if (CORBA::is_nil (hmvar_)) + { + ACE_DEBUG((LM_ERROR, "Argument is not a HostMonitor reference.\n")); + return 1; + } + + //ACE_DEBUG ((LM_DEBUG, "Creating the monitor\n")); + + ACE_Barrier internal_thread_barrier (2); + monitor_ = std::auto_ptr <AppSideMonitor_Thread> + (new AppSideMonitor_Thread (&internal_thread_barrier)); + monitor_->activate (); + + //ACE_DEBUG ((LM_DEBUG, "Monitor activated\n")); + + internal_thread_barrier.wait (); + /// Waiting for the AppSideMonitor_Thread to finish its socket stuff. + try { + hmvar_->dump (); + } catch (CORBA::Exception & ex) { + ACE_DEBUG((LM_DEBUG,"exception from dump.\n")); + throw; + } + + + //ACE_DEBUG ((LM_DEBUG, "Registering process\n")); + try { + if (hmvar_->register_process ( + AppOptions::instance()->process_id().c_str(), + AppOptions::instance()->host_id().c_str(), + AppOptions::instance()->get_port())) + { + ACE_DEBUG((LM_DEBUG, "Registered successfully %s with host monitor.\n", + AppOptions::instance()->process_id().c_str())); + } + else + { + ACE_DEBUG((LM_ERROR, "Registeration with the monitor failed.\n")); + } + } catch (CORBA::Exception & ex) { + ACE_DEBUG((LM_DEBUG,"exception from register_process.\n")); + throw; + } + + + //ACE_DEBUG ((LM_DEBUG, "Registering process\n")); + } + catch (CORBA::Exception &ex) + { + ACE_PRINT_EXCEPTION (ex, "A CORBA exception was raised:"); + return -1; + } + catch (...) + { + ACE_DEBUG((LM_ERROR, "Unknown exception raised!")); + return -1; + } + + external_barrier_->wait (); + return 0; +} + diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ArgPair.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ArgPair.cpp new file mode 100644 index 00000000000..3645ca79367 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ArgPair.cpp @@ -0,0 +1,39 @@ +#include <algorithm> + +#include "ArgPair.h" + +ArgPair::ArgPair (int c, char **v) + : argc (c), + argv (new char *[c]) +{ + std::copy (v, v + c, this->argv); +} + +ArgPair::ArgPair (const ArgPair &ap) + : argc (ap.argc), + argv (new char *[ap.argc]) +{ + std::copy (ap.argv, ap.argv + ap.argc, this->argv); +} + +ArgPair & ArgPair::operator = (const ArgPair &ap) +{ + if (this != &ap) + { + ArgPair temp (ap); + this->swap (temp); + } + return *this; +} + +void ArgPair::swap (ArgPair &ap) +{ + std::swap (this->argc, ap.argc); + std::swap (this->argv, ap.argv); +} + +ArgPair::~ArgPair() +{ + delete [] argv; +} + diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Hello.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Hello.cpp new file mode 100644 index 00000000000..a8d58dcce8c --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Hello.cpp @@ -0,0 +1,24 @@ +// +// $Id$ +// +#include "Hello.h" + +ACE_RCSID(Hello, Hello, "$Id$") + +Hello_i::Hello_i (CORBA::ORB_ptr orb) + : orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +char * +Hello_i::get_string (void) +{ + ACE_DEBUG ((LM_DEBUG, "returning string\n")); + return CORBA::string_dup ("Hello there!"); +} + +void +Hello_i::shutdown (void) +{ + this->orb_->shutdown (0); +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/IOR_Interceptor.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/IOR_Interceptor.cpp new file mode 100644 index 00000000000..6ee65169257 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/IOR_Interceptor.cpp @@ -0,0 +1,63 @@ +#include "IOR_Interceptor.h" +#include "ObjectReferenceFactory.h" +#include "tao/ORB_Constants.h" + +ACE_RCSID (Hello, + IOR_Interceptor, + "$Id$") + +IOR_Interceptor::IOR_Interceptor (void) +{ +} + +char * +IOR_Interceptor::name (void) +{ + return CORBA::string_dup ("IOR_Interceptor"); +} + +void +IOR_Interceptor::destroy (void) +{ +} + +void +IOR_Interceptor::establish_components ( + PortableInterceptor::IORInfo_ptr /* info */) +{ +} + +void +IOR_Interceptor::components_established ( + PortableInterceptor::IORInfo_ptr info) +{ + PortableInterceptor::ObjectReferenceFactory_var old_orf = + info->current_factory (); + + PortableInterceptor::ObjectReferenceFactory * tmp; + ACE_NEW_THROW_EX (tmp, + ObjectReferenceFactory (old_orf.in ()), + CORBA::NO_MEMORY ( + CORBA::SystemException::_tao_minor_code ( + TAO::VMCID, + ENOMEM), + CORBA::COMPLETED_NO)); + + PortableInterceptor::ObjectReferenceFactory_var orf = tmp; + + info->current_factory (orf.in ()); +} + +void +IOR_Interceptor::adapter_manager_state_changed ( + const char *, + PortableInterceptor::AdapterState) +{ +} + +void +IOR_Interceptor:: adapter_state_changed ( + const PortableInterceptor::ObjectReferenceTemplateSeq &, + PortableInterceptor::AdapterState) +{ +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/LWFT.mpc b/TAO/orbsvcs/examples/FaultTolerance/FLARe/LWFT.mpc new file mode 100644 index 00000000000..92de9417cdb --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/LWFT.mpc @@ -0,0 +1,131 @@ +// -*- MPC -*- +// $Id$ + +project(*idl): taoidldefaults { + IDL_Files { + LWFT.idl + monitor.idl + } + IDL_Files { + idlflags += -SS + ObjectReferenceFactory.idl + } + custom_only = 1 +} + +project(*server2): rt_server, avoids_minimum_corba, avoids_corba_e_compact, avoids_corba_e_micro, objreftemplate, pi_server, interceptors, naming, pi, iorinterceptor { + after += *idl + exename = server-2 + Source_Files { + server-2.cpp + Hello.cpp + ObjectReferenceFactory.cpp + ServerORBInitializer.cpp + IOR_Interceptor.cpp + AppSideMonitor_Thread.cpp + AppSideMonitor_Handler.cpp + AppSideReg.cpp + AppOptions.cpp + ArgPair.cpp + LWFTC.cpp + LWFTS.cpp + } + Source_Files { + LWFTC.cpp + LWFTS.cpp + ObjectReferenceFactoryC.cpp + monitorC.cpp + monitorS.cpp + } +} + +project(*server1): rt_server, avoids_minimum_corba, avoids_corba_e_compact, avoids_corba_e_micro, objreftemplate, pi_server, interceptors, naming, pi, iorinterceptor { + after += *idl + exename = server-1 + Source_Files { + server-1.cpp + Hello.cpp + ObjectReferenceFactory.cpp + ServerORBInitializer.cpp + IOR_Interceptor.cpp + AppSideMonitor_Thread.cpp + AppSideMonitor_Handler.cpp + AppSideReg.cpp + AppOptions.cpp + ArgPair.cpp + LWFTC.cpp + LWFTS.cpp + } + Source_Files { + LWFTC.cpp + LWFTS.cpp + ObjectReferenceFactoryC.cpp + monitorC.cpp + monitorS.cpp + } +} + +project(*client): rt_client, avoids_minimum_corba, avoids_corba_e_compact, avoids_corba_e_micro, pi, pi_server, interceptors, naming { + after += *idl + exename = client + Source_Files { + LWFTC.cpp + LWFTS.cpp + Client_ORBInitializer.cpp + Client_Request_Interceptor.cpp + Agent.cpp + client.cpp + } + Source_Files { + LWFTC.cpp + LWFTS.cpp + } + IDL_Files { + } +} + + +project(*HostMonitor): taoserver, portableserver, naming { + after += *idl + exename = host_monitor + Source_Files { + host_monitor.cpp + HostMonitorImpl.cpp + LinuxCPULoadCalculator.cpp + Failure_Handler.cpp + Monitor_Thread.cpp + RM_Proxy.cpp + Utilization_Monitor.cpp + Timer.cpp + HMOptions.cpp + ArgPair.cpp + LWFTC.cpp + LWFTS.cpp + } + Source_Files { + LWFTC.cpp + LWFTS.cpp + monitorC.cpp + monitorS.cpp + } +} + +project(*ReplicationManager): taoclient, taoserver, naming { + after += *idl + exename = ReplicationManager + Source_Files { + ReplicationManager.cpp + Timer.cpp + ReplicationManager_process.cpp + RMOptions.cpp + ArgPair.cpp + } + Source_Files { + LWFTC.cpp + LWFTS.cpp + RMOptions.h + ArgPair.h + } + IDL_Files { + } +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ObjectReferenceFactory.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ObjectReferenceFactory.cpp new file mode 100644 index 00000000000..b3b5e80c70b --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ObjectReferenceFactory.cpp @@ -0,0 +1,55 @@ +#include "ObjectReferenceFactory.h" +#include "tao/PortableServer/PortableServer.h" +#include "tao/Stub.h" +#include "tao/Profile.h" +#include "tao/debug.h" + +ACE_RCSID (Hello, + ObjectReferenceFactory, + "$Id$") + +ObjectReferenceFactory::ObjectReferenceFactory ( + PortableInterceptor::ObjectReferenceFactory * old_orf) + : old_orf_ (old_orf) +{ + CORBA::add_ref (old_orf); +} + +ObjectReferenceFactory::~ObjectReferenceFactory (void) +{ +} + +CORBA::Object_ptr +ObjectReferenceFactory::make_object ( + const char *repository_id, + const PortableInterceptor::ObjectId & id) +{ + ACE_ASSERT (repository_id != 0); + + CORBA::String_var s = PortableServer::ObjectId_to_string (id); + + CORBA::Object_var ref = this->old_orf_->make_object (repository_id, id); + + TAO_MProfile &mp = ref->_stubobj ()->base_profiles (); + + IOP::TaggedComponent mytag; + const char* tag = s.in (); + CORBA::ULong tag_id = 9654; + size_t tag_length = ACE_OS::strlen (tag); + mytag.tag = tag_id; + mytag.component_data.length (tag_length + 1); + + CORBA::Octet *buf = mytag.component_data.get_buffer (); + ACE_OS::memcpy (buf, tag, tag_length + 1); + buf[tag_length] = '\0'; + + const CORBA::ULong profile_count = mp.profile_count (); + + for (CORBA::ULong i = 0; i < profile_count; ++i) + { + TAO_Profile *profile = mp.get_profile (i); + profile->add_tagged_component (mytag); + } + + return ref._retn (); +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ServerORBInitializer.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ServerORBInitializer.cpp new file mode 100644 index 00000000000..0d8a3e735be --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ServerORBInitializer.cpp @@ -0,0 +1,47 @@ +#include "ServerORBInitializer.h" +#include "IOR_Interceptor.h" +#include "tao/ORB_Constants.h" +#include "tao/PortableServer/PortableServer.h" + +ACE_RCSID (Hello, + ServerORBInitializer, + "$Id$") + +void +ServerORBInitializer::pre_init ( + PortableInterceptor::ORBInitInfo_ptr /* info */) +{ +} + +void +ServerORBInitializer::post_init ( + PortableInterceptor::ORBInitInfo_ptr info) +{ + + CORBA::Object_var obj = + info->resolve_initial_references ("POACurrent"); + + PortableServer::Current_var poa_current = + PortableServer::Current::_narrow (obj.in ()); + + ACE_ASSERT (!CORBA::is_nil (poa_current.in ())); + + + CORBA::String_var orb_id = info->orb_id (); + + // Create and register the test's IORInterceptor + + PortableInterceptor::IORInterceptor_ptr ior_intercept; + ACE_NEW_THROW_EX (ior_intercept, + IOR_Interceptor, + CORBA::NO_MEMORY ( + CORBA::SystemException::_tao_minor_code ( + TAO::VMCID, + ENOMEM), + CORBA::COMPLETED_NO)); + + PortableInterceptor::IORInterceptor_var ior_interceptor = + ior_intercept; + + info->add_ior_interceptor (ior_interceptor.in ()); +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-1.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-1.cpp new file mode 100644 index 00000000000..263637757a3 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-1.cpp @@ -0,0 +1,613 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "ace/Stats.h" +#include "ace/Task.h" +#include "ace/Sample_History.h" +#include "ace/Throughput_Stats.h" +#include "tao/ORB_Core.h" +#include "tao/debug.h" +#include "tao/RTPortableServer/RTPortableServer.h" +#include "LWFTS.h" +#include "tests/RTCORBA/common_args.cpp" +#include "tests/RTCORBA/check_supported_priorities.cpp" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" +#include "ace/Sample_History.h" +#include "AppSideReg.h" +#include "AppOptions.h" +#include "ace/Barrier.h" +#include "ServerORBInitializer.h" +#include "tao/ORBInitializer_Registry.h" +#include "ace/OS_NS_stdio.h" +#include <fstream> +#include <sstream> + +ACE_RCSID(Thread_Pools, server, "$Id$") + +static int stop = 0; + +class test_i : + public POA_test +{ +public: + test_i (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa); + + void method (CORBA::ULong start, CORBA::ULong end, CORBA::ULong work, + CORBA::ULong prime_number, CORBA::ULong kill); + + void shutdown (void); + + PortableServer::POA_ptr _default_POA (void); + + void dump (void); + +private: + CORBA::ORB_var orb_; + PortableServer::POA_var poa_; + ACE_Sample_History history_; + ACE_hrtime_t start_; + ACE_hrtime_t end_; +}; + +test_i::test_i (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa) + : orb_ (CORBA::ORB::_duplicate (orb)), + poa_ (PortableServer::POA::_duplicate (poa)), + history_ (50) +{ +} + +void +test_i::method (CORBA::ULong test_start, CORBA::ULong test_end, + CORBA::ULong work, + CORBA::ULong prime_number, + CORBA::ULong kill) +{ + /* + CORBA::Object_var obj = + this->orb_->resolve_initial_references ("RTCurrent"); + + RTCORBA::Current_var current = + RTCORBA::Current::_narrow (obj.in ()); + + if (CORBA::is_nil (obj.in ())) + throw CORBA::INTERNAL (); + + CORBA::Short servant_thread_priority = + current->the_priority (); + + ACE_DEBUG ((LM_DEBUG,"Servant thread priority: %d\n", + servant_thread_priority)); + */ + + static int i = 0; + ACE_DEBUG ((LM_DEBUG, "%d\n",i++)); + + if (kill && stop) + ACE_OS::exit (1); + if (test_start == 1) + { + this->start_ = ACE_OS::gethrtime (); + } + ACE_hrtime_t start = ACE_OS::gethrtime (); + + for (; work != 0; work--) + ACE::is_prime (prime_number, + 2, + prime_number / 2); + ACE_hrtime_t end = ACE_OS::gethrtime (); + + // ACE_DEBUG ((LM_DEBUG, "Time taken = %d\n", end - start)); + this->history_.sample (end - start); + if (test_end == 1) + { + this->end_ = ACE_OS::gethrtime (); + } +} + +void +test_i::dump (void) +{ + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_Basic_Stats stats; + this->history_.collect_basic_stats (stats); + stats.dump_results ("Total", gsf); + ACE_Throughput_Stats::dump_throughput ("Total", gsf, + this->end_ - this->start_, + stats.samples_count ()); +} + +PortableServer::POA_ptr +test_i::_default_POA (void) +{ + return PortableServer::POA::_duplicate (this->poa_.in ()); +} + +void +test_i::shutdown (void) +{ + this->orb_->shutdown (0); +} +/* +static const char *ior_output_file_1 = "s1.ior"; +static const char *ior_output_file_2 = "s2.ior"; +static const char *ior_output_file_3 = "s3.ior"; +static const char *ior_output_file_4 = "s4.ior"; +*/ +static CORBA::ULong static_threads = 1; +static CORBA::ULong dynamic_threads = 0; +static CORBA::ULong number_of_lanes = 0; +static RTCORBA::Priority default_thread_priority = 0; +static RTCORBA::Priority pool_priority = ACE_INT16_MIN; + +static const char *bands_file = "empty-file"; +static const char *lanes_file = "empty-file"; +std::string first_object_id; +//std::string second_object_id; +size_t first_object_role; +double first_object_load; +//size_t second_object_role; +const char *rm_ior_file = "file://rm.ior"; + +void +read_object_info (std::string file_name) +{ + std::ifstream input_file (file_name.c_str ()); + input_file >> first_object_id; + input_file >> first_object_role; + input_file >> first_object_load; +// input_file >> second_object_id; +// input_file >> second_object_role; +} + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, + "b:f:hl:n:s:a:" // server options + "c:e:g:hi:j:k:m:p:q:r:t:u:v:w:x:y:z:" // client options + ); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'b': + bands_file = get_opts.opt_arg (); + break; + + case 'f': + pool_priority = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'l': + lanes_file = get_opts.opt_arg (); + break; + + case 'n': + number_of_lanes = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 's': + static_threads = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'a': + stop = ACE_OS::atoi (get_opts.opt_arg()); + break; + + + case 'c': + case 'd': + case 'e': + case 'g': + case 'i': + case 'j': + case 'k': + case 'm': + case 'p': + case 'q': + case 'r': + case 't': + case 'u': + case 'v': + case 'w': + case 'x': + case 'y': + case 'z': + // client options: ignored. + break; +/* + case 'h': + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s\n" + "\t-b <bands file> (defaults to %s)\n" + "\t-f <pool priority> (defaults to %d)\n" + "\t-h <help: shows options menu>\n" + "\t-l <lanes file> (defaults to %s)\n" + "\t-n <number of lanes> (defaults to %d)\n" + "\t-o <ior file> (defaults to %s)\n" + "\t-s <static threads> (defaults to %d)\n" + "\n", + argv [0], + bands_file, + default_thread_priority, + lanes_file, + number_of_lanes, + ior_output_file_1, + static_threads), + -1); +*/ + } + + return 0; +} + +int +write_ior_to_file (const char *ior_file, + CORBA::ORB_ptr orb, + CORBA::Object_ptr object) +{ + CORBA::String_var ior = + orb->object_to_string (object); + + FILE *output_file = + ACE_OS::fopen (ior_file, + "w"); + + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_file), + -1); + + ACE_OS::fprintf (output_file, + "%s", + ior.in ()); + + ACE_OS::fclose (output_file); + + return 0; +} + +class Task : public ACE_Task_Base +{ +public: + + Task (ACE_Thread_Manager &thread_manager, + CORBA::ORB_ptr orb); + + int svc (void); + + CORBA::ORB_var orb_; + +}; + +Task::Task (ACE_Thread_Manager &thread_manager, + CORBA::ORB_ptr orb) + : ACE_Task_Base (&thread_manager), + orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +int +Task::svc (void) +{ + try + { + CORBA::Object_var object = + this->orb_->resolve_initial_references ("RootPOA"); + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (object.in ()); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (); + + object = + this->orb_->resolve_initial_references ("RTORB"); + + RTCORBA::RTORB_var rt_orb = + RTCORBA::RTORB::_narrow (object.in ()); + + object = + this->orb_->resolve_initial_references ("RTCurrent"); + + RTCORBA::Current_var current = + RTCORBA::Current::_narrow (object.in ()); + + default_thread_priority = + current->the_priority (); + + int result = 0; + CORBA::ULong stacksize = 0; + CORBA::Boolean allow_request_buffering = 0; + CORBA::ULong max_buffered_requests = 0; + CORBA::ULong max_request_buffer_size = 0; + + CORBA::PolicyList policies; + + CORBA::Boolean allow_borrowing = 0; + if (number_of_lanes != 0) + { + get_auto_priority_lanes_and_bands (number_of_lanes, + rt_orb.in (), + stacksize, + static_threads, + dynamic_threads, + allow_request_buffering, + max_buffered_requests, + max_request_buffer_size, + allow_borrowing, + policies, + 1); + } + else if (ACE_OS::strcmp (lanes_file, "empty-file") != 0) + { + result = + get_priority_lanes ("server", + lanes_file, + rt_orb.in (), + stacksize, + static_threads, + dynamic_threads, + allow_request_buffering, + max_buffered_requests, + max_request_buffer_size, + allow_borrowing, + policies, + 1); + + if (result != 0) + return result; + + result = + get_priority_bands ("server", + bands_file, + rt_orb.in (), + policies, + 1); + + if (result != 0) + return result; + } + else + { + if (pool_priority == ACE_INT16_MIN) + pool_priority = + default_thread_priority; + + RTCORBA::ThreadpoolId threadpool_id = + rt_orb->create_threadpool (stacksize, + static_threads, + dynamic_threads, + pool_priority, + allow_request_buffering, + max_buffered_requests, + max_request_buffer_size); + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + rt_orb->create_threadpool_policy (threadpool_id); + + if (ACE_OS::strcmp (bands_file, "empty-file") != 0) + { + result = + get_priority_bands ("server", + bands_file, + rt_orb.in (), + policies, + 1); + + if (result != 0) + return result; + } + } + + /* + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_implicit_activation_policy + (PortableServer::IMPLICIT_ACTIVATION); + */ + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + rt_orb->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED, + default_thread_priority); + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_lifespan_policy(PortableServer::PERSISTENT); + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_id_assignment_policy (PortableServer::USER_ID); + + PortableServer::POA_var poa = + root_poa->create_POA ("RT POA", + poa_manager.in (), + policies); + + read_object_info (AppOptions::instance ()->object_info_file ()); + + ACE_DEBUG ((LM_DEBUG, "Getting RM\n")); + + CORBA::Object_var tmp = this->orb_->string_to_object (rm_ior_file); + ReplicationManager_var rm = + ReplicationManager::_narrow (tmp.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // *************************************************** + // first servant activated + + test_i *first_servant = + new test_i (this->orb_.in (), + poa.in ()); + + PortableServer::ServantBase_var first_safe_servant + (first_servant); + ACE_UNUSED_ARG (first_safe_servant); + + /* + test_var first_test = + first_servant->_this (); + */ + + PortableServer::ObjectId_var id_1 = + PortableServer::string_to_ObjectId (first_object_id.c_str ()); + poa->activate_object_with_id (id_1.in (), first_servant); + CORBA::Object_var first_servant_object = + poa->id_to_reference (id_1.in ()); + test_var first_test = test::_narrow (first_servant_object.in ()); + + std::ostringstream ostr; + ostr << first_object_id << first_object_role << ".ior"; + result = + write_ior_to_file (ostr.str().c_str(), + this->orb_.in (), + first_test.in ()); + + rm->register_application (first_object_id.c_str (), first_object_load, + AppOptions::instance ()->host_id ().c_str (), + AppOptions::instance ()->process_id ().c_str (), + first_object_role, + first_test.in ()); + + if (result != 0) + return result; + + // *************************************************** + + // *************************************************** + // second servant activated +/* + ACE_DEBUG ((LM_DEBUG, "activating second object\n")); + + test_i *second_servant = + new test_i (this->orb_.in (), + poa.in ()); + + PortableServer::ServantBase_var second_safe_servant + (second_servant); + ACE_UNUSED_ARG (second_safe_servant); + + + test_var second_test = + second_servant->_this (); + + + PortableServer::ObjectId_var id_2 = + PortableServer::string_to_ObjectId (second_object_id.c_str ()); + poa->activate_object_with_id (id_2.in (), second_servant); + CORBA::Object_var second_servant_object = + poa->id_to_reference (id_2.in ()); + test_var second_test = test::_narrow (second_servant_object.in ()); + + result = + write_ior_to_file (ior_output_file_2, + this->orb_.in (), + second_test.in ()); + + rm->register_application (second_object_id.c_str (), + AppOptions::instance ()->host_id ().c_str (), + AppOptions::instance ()->process_id ().c_str (), + second_object_role, + second_test.in ()); + + if (result != 0) + return result; +*/ + // *************************************************** + + poa_manager->activate (); + + this->orb_->run (); + + this->orb_->destroy (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return 1; + } + + return 0; +} + +int +main (int argc, char *argv[]) +{ + try + { + AppOptions::instance ()->parse_args (argc, argv); + + PortableInterceptor::ORBInitializer_ptr tmp; + + ACE_NEW_RETURN (tmp, + ServerORBInitializer, + -1); // No CORBA exceptions yet! + + PortableInterceptor::ORBInitializer_var orb_initializer = tmp; + + PortableInterceptor::register_orb_initializer (orb_initializer.in ()); + + CORBA::ORB_var orb = + CORBA::ORB_init (argc, + argv, + ""); + + ACE_Barrier thread_barrier (2); + AppSideReg proc_reg (&thread_barrier, orb.in()); + proc_reg.activate (); + thread_barrier.wait(); + + ACE_DEBUG ((LM_DEBUG, "After initialization of AppSide\n")); + + + int result = + parse_args (argc, argv); + if (result != 0) + return result; + + // Make sure we can support multiple priorities that are required + // for this test. + check_supported_priorities (orb.in ()); + + // Thread Manager for managing task. + ACE_Thread_Manager thread_manager; + + // Create task. + Task task (thread_manager, + orb.in ()); + + // Task activation flags. + long flags = + THR_NEW_LWP | + THR_JOINABLE | + orb->orb_core ()->orb_params ()->thread_creation_flags (); + + // Activate task. + result = + task.activate (flags); + ACE_ASSERT (result != -1); + ACE_UNUSED_ARG (result); + + // Wait for task to exit. + result = + thread_manager.wait (); + ACE_ASSERT (result != -1); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + return 0; +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-2.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-2.cpp new file mode 100644 index 00000000000..2a3fe1bee90 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-2.cpp @@ -0,0 +1,613 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "ace/Throughput_Stats.h" +#include "ace/Stats.h" +#include "ace/Task.h" +#include "ace/Sample_History.h" +#include "tao/ORB_Core.h" +#include "tao/debug.h" +#include "tao/RTPortableServer/RTPortableServer.h" +#include "LWFTS.h" +#include "tests/RTCORBA/common_args.cpp" +#include "tests/RTCORBA/check_supported_priorities.cpp" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" +#include "ace/Sample_History.h" +#include "AppSideReg.h" +#include "AppOptions.h" +#include "ace/Barrier.h" +#include "ServerORBInitializer.h" +#include "tao/ORBInitializer_Registry.h" +#include "ace/OS_NS_stdio.h" +#include <fstream> +#include <sstream> + +ACE_RCSID(Thread_Pools, server, "$Id$") + +static int stop = 0; + +class test_i : + public POA_test +{ +public: + test_i (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa); + + void method (CORBA::ULong start, CORBA::ULong end, CORBA::ULong work, + CORBA::ULong prime_number, CORBA::ULong kill); + + void shutdown (void); + + PortableServer::POA_ptr _default_POA (void); + + void dump (void); + +private: + CORBA::ORB_var orb_; + PortableServer::POA_var poa_; + ACE_Sample_History history_; + ACE_hrtime_t start_; + ACE_hrtime_t end_; +}; + +test_i::test_i (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa) + : orb_ (CORBA::ORB::_duplicate (orb)), + poa_ (PortableServer::POA::_duplicate (poa)), + history_ (50) +{ +} + +void +test_i::method (CORBA::ULong test_start, CORBA::ULong test_end, + CORBA::ULong work, + CORBA::ULong prime_number, + CORBA::ULong kill) +{ + /* + CORBA::Object_var obj = + this->orb_->resolve_initial_references ("RTCurrent"); + + RTCORBA::Current_var current = + RTCORBA::Current::_narrow (obj.in ()); + + if (CORBA::is_nil (obj.in ())) + throw CORBA::INTERNAL (); + + CORBA::Short servant_thread_priority = + current->the_priority (); + + ACE_DEBUG ((LM_DEBUG,"Servant thread priority: %d\n", + servant_thread_priority)); + */ + + //ACE_DEBUG ((LM_DEBUG, "Working on the prime number\n")); + + if (kill && stop) + ACE_OS::exit(1); + if (test_start == 1) + { + this->start_ = ACE_OS::gethrtime (); + } + ACE_hrtime_t start = ACE_OS::gethrtime (); + + for (; work != 0; work--) + ACE::is_prime (prime_number, + 2, + prime_number / 2); + ACE_hrtime_t end = ACE_OS::gethrtime (); + + // ACE_DEBUG ((LM_DEBUG, "Time taken = %d\n", end - start)); + this->history_.sample (end - start); + if (test_end == 1) + { + this->end_ = ACE_OS::gethrtime (); + } +} + +void +test_i::dump (void) +{ + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_Basic_Stats stats; + this->history_.collect_basic_stats (stats); + stats.dump_results ("Total", gsf); + ACE_Throughput_Stats::dump_throughput ("Total", gsf, + this->end_ - this->start_, + stats.samples_count ()); +} + +PortableServer::POA_ptr +test_i::_default_POA (void) +{ + return PortableServer::POA::_duplicate (this->poa_.in ()); +} + +void +test_i::shutdown (void) +{ + this->orb_->shutdown (0); +} + +static CORBA::ULong static_threads = 1; +static CORBA::ULong dynamic_threads = 0; +static CORBA::ULong number_of_lanes = 0; +static RTCORBA::Priority default_thread_priority = 0; +static RTCORBA::Priority pool_priority = ACE_INT16_MIN; + +static const char *bands_file = "empty-file"; +static const char *lanes_file = "empty-file"; +std::string first_object_id; +std::string second_object_id; +size_t first_object_role; +size_t second_object_role; +double first_object_load; +double second_object_load; +const char *rm_ior_file = "file://rm.ior"; + +void +read_object_info (std::string file_name) +{ + std::ifstream input_file (file_name.c_str ()); + input_file >> first_object_id; + input_file >> first_object_role; + input_file >> first_object_load; + input_file >> second_object_id; + input_file >> second_object_role; + input_file >> second_object_load; +} + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, + "b:f:hl:n:s:a:" // server options + "c:e:g:hi:j:k:m:p:q:r:t:u:v:w:x:y:z:" // client options + ); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'b': + bands_file = get_opts.opt_arg (); + break; + + case 'f': + pool_priority = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'l': + lanes_file = get_opts.opt_arg (); + break; + + case 'n': + number_of_lanes = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 's': + static_threads = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'a': + stop = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'c': + case 'd': + case 'e': + case 'g': + case 'i': + case 'j': + case 'k': + case 'm': + case 'p': + case 'q': + case 'r': + case 't': + case 'u': + case 'v': + case 'w': + case 'x': + case 'y': + case 'z': + // client options: ignored. + break; +/* + case 'h': + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s\n" + "\t-b <bands file> (defaults to %s)\n" + "\t-f <pool priority> (defaults to %d)\n" + "\t-h <help: shows options menu>\n" + "\t-l <lanes file> (defaults to %s)\n" + "\t-n <number of lanes> (defaults to %d)\n" + "\t-o <ior file> (defaults to %s)\n" + "\t-s <static threads> (defaults to %d)\n" + "\n", + argv [0], + bands_file, + default_thread_priority, + lanes_file, + number_of_lanes, + ior_output_file_1, + static_threads), + -1); +*/ + } + + return 0; +} + +int +write_ior_to_file (const char *ior_file, + CORBA::ORB_ptr orb, + CORBA::Object_ptr object) +{ + CORBA::String_var ior = + orb->object_to_string (object); + + FILE *output_file = + ACE_OS::fopen (ior_file, + "w"); + + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_file), + -1); + + ACE_OS::fprintf (output_file, + "%s", + ior.in ()); + + ACE_OS::fclose (output_file); + + return 0; +} + +class Task : public ACE_Task_Base +{ +public: + + Task (ACE_Thread_Manager &thread_manager, + CORBA::ORB_ptr orb); + + int svc (void); + + CORBA::ORB_var orb_; + +}; + +Task::Task (ACE_Thread_Manager &thread_manager, + CORBA::ORB_ptr orb) + : ACE_Task_Base (&thread_manager), + orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +int +Task::svc (void) +{ + try + { + CORBA::Object_var object = + this->orb_->resolve_initial_references ("RootPOA"); + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (object.in ()); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (); + + object = + this->orb_->resolve_initial_references ("RTORB"); + + RTCORBA::RTORB_var rt_orb = + RTCORBA::RTORB::_narrow (object.in ()); + + object = + this->orb_->resolve_initial_references ("RTCurrent"); + + RTCORBA::Current_var current = + RTCORBA::Current::_narrow (object.in ()); + + default_thread_priority = + current->the_priority (); + + int result = 0; + CORBA::ULong stacksize = 0; + CORBA::Boolean allow_request_buffering = 0; + CORBA::ULong max_buffered_requests = 0; + CORBA::ULong max_request_buffer_size = 0; + + CORBA::PolicyList policies; + + CORBA::Boolean allow_borrowing = 0; + if (number_of_lanes != 0) + { + get_auto_priority_lanes_and_bands (number_of_lanes, + rt_orb.in (), + stacksize, + static_threads, + dynamic_threads, + allow_request_buffering, + max_buffered_requests, + max_request_buffer_size, + allow_borrowing, + policies, + 1); + } + else if (ACE_OS::strcmp (lanes_file, "empty-file") != 0) + { + result = + get_priority_lanes ("server", + lanes_file, + rt_orb.in (), + stacksize, + static_threads, + dynamic_threads, + allow_request_buffering, + max_buffered_requests, + max_request_buffer_size, + allow_borrowing, + policies, + 1); + + if (result != 0) + return result; + + result = + get_priority_bands ("server", + bands_file, + rt_orb.in (), + policies, + 1); + + if (result != 0) + return result; + } + else + { + if (pool_priority == ACE_INT16_MIN) + pool_priority = + default_thread_priority; + + RTCORBA::ThreadpoolId threadpool_id = + rt_orb->create_threadpool (stacksize, + static_threads, + dynamic_threads, + pool_priority, + allow_request_buffering, + max_buffered_requests, + max_request_buffer_size); + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + rt_orb->create_threadpool_policy (threadpool_id); + + if (ACE_OS::strcmp (bands_file, "empty-file") != 0) + { + result = + get_priority_bands ("server", + bands_file, + rt_orb.in (), + policies, + 1); + + if (result != 0) + return result; + } + } + + /* + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_implicit_activation_policy + (PortableServer::IMPLICIT_ACTIVATION); + */ + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + rt_orb->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED, + default_thread_priority); + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_lifespan_policy(PortableServer::PERSISTENT); + + policies.length (policies.length () + 1); + policies[policies.length () - 1] = + root_poa->create_id_assignment_policy (PortableServer::USER_ID); + + PortableServer::POA_var poa = + root_poa->create_POA ("RT POA", + poa_manager.in (), + policies); + + read_object_info (AppOptions::instance ()->object_info_file ()); + + ACE_DEBUG ((LM_DEBUG, "Getting RM\n")); + + CORBA::Object_var tmp = this->orb_->string_to_object (rm_ior_file); + ReplicationManager_var rm = + ReplicationManager::_narrow (tmp.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // *************************************************** + // first servant activated + + test_i *first_servant = + new test_i (this->orb_.in (), + poa.in ()); + + PortableServer::ServantBase_var first_safe_servant + (first_servant); + ACE_UNUSED_ARG (first_safe_servant); + + /* + test_var first_test = + first_servant->_this (); + */ + + PortableServer::ObjectId_var id_1 = + PortableServer::string_to_ObjectId (first_object_id.c_str ()); + poa->activate_object_with_id (id_1.in (), first_servant); + CORBA::Object_var first_servant_object = + poa->id_to_reference (id_1.in ()); + test_var first_test = test::_narrow (first_servant_object.in ()); + + std::ostringstream ostr; + ostr << first_object_id << first_object_role << ".ior"; + result = + write_ior_to_file (ostr.str().c_str(), + this->orb_.in (), + first_test.in ()); + + rm->register_application (first_object_id.c_str (), first_object_load, + AppOptions::instance ()->host_id ().c_str (), + AppOptions::instance ()->process_id ().c_str (), + first_object_role, + first_test.in ()); + + if (result != 0) + return result; + + // *************************************************** + + // *************************************************** + // second servant activated + + ACE_DEBUG ((LM_DEBUG, "activating second object\n")); + + test_i *second_servant = + new test_i (this->orb_.in (), + poa.in ()); + + PortableServer::ServantBase_var second_safe_servant + (second_servant); + ACE_UNUSED_ARG (second_safe_servant); + + /* + test_var second_test = + second_servant->_this (); + */ + + //second_object_id = "ABCD"; + PortableServer::ObjectId_var id_2 = + PortableServer::string_to_ObjectId (second_object_id.c_str ()); + poa->activate_object_with_id (id_2.in (), second_servant); + CORBA::Object_var second_servant_object = + poa->id_to_reference (id_2.in ()); + test_var second_test = test::_narrow (second_servant_object.in ()); + + ostr.str(""); + ostr << second_object_id << second_object_role << ".ior"; + result = + write_ior_to_file (ostr.str().c_str(), + this->orb_.in (), + second_test.in ()); + + rm->register_application (second_object_id.c_str (), second_object_load, + AppOptions::instance ()->host_id ().c_str (), + AppOptions::instance ()->process_id ().c_str (), + second_object_role, + second_test.in ()); + + if (result != 0) + return result; + + // *************************************************** + + poa_manager->activate (); + + this->orb_->run (); + + this->orb_->destroy (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return 1; + } + + return 0; +} + +int +main (int argc, char *argv[]) +{ + try + { + + AppOptions::instance ()->parse_args (argc, argv); + + PortableInterceptor::ORBInitializer_ptr tmp; + + ACE_NEW_RETURN (tmp, + ServerORBInitializer, + -1); // No CORBA exceptions yet! + + PortableInterceptor::ORBInitializer_var orb_initializer = tmp; + + PortableInterceptor::register_orb_initializer (orb_initializer.in ()); + + + CORBA::ORB_var orb = + CORBA::ORB_init (argc, + argv, + ""); + + ACE_Barrier thread_barrier (2); + AppSideReg proc_reg (&thread_barrier, orb.in()); + proc_reg.activate (); + thread_barrier.wait(); + + ACE_DEBUG ((LM_DEBUG, "After initialization of AppSide\n")); + + + int result = + parse_args (argc, argv); + if (result != 0) + return result; + + // Make sure we can support multiple priorities that are required + // for this test. + check_supported_priorities (orb.in ()); + + // Thread Manager for managing task. + ACE_Thread_Manager thread_manager; + + // Create task. + Task task (thread_manager, + orb.in ()); + + // Task activation flags. + long flags = + THR_NEW_LWP | + THR_JOINABLE | + orb->orb_core ()->orb_params ()->thread_creation_flags (); + + // Activate task. + result = + task.activate (flags); + ACE_ASSERT (result != -1); + ACE_UNUSED_ARG (result); + + // Wait for task to exit. + result = + thread_manager.wait (); + ACE_ASSERT (result != -1); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + return 0; +} |