summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-05-26 16:24:27 +0000
committerjai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-05-26 16:24:27 +0000
commit0cbddee90ca2d09a0c1b730bf3a8395637fd370c (patch)
tree8e8a0ac7e6b45d8a28f4926b30447f217167aa73
parent0862d92f43a7ad92b18c014a37d75c32662c4f41 (diff)
downloadATCD-0cbddee90ca2d09a0c1b730bf3a8395637fd370c.tar.gz
added FLARe directory
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/AppOptions.cpp117
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Handler.cpp41
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Thread.cpp48
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideReg.cpp121
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/ArgPair.cpp39
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/Hello.cpp24
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/IOR_Interceptor.cpp63
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/LWFT.mpc131
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/ObjectReferenceFactory.cpp55
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/ServerORBInitializer.cpp47
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/server-1.cpp613
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/server-2.cpp613
12 files changed, 1912 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppOptions.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppOptions.cpp
new file mode 100644
index 00000000000..a9702009de5
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppOptions.cpp
@@ -0,0 +1,117 @@
+#include <unistd.h>
+#include <stdlib.h>
+#include <iostream>
+#include <sstream>
+
+#include "AppOptions.h"
+#include "ace/Global_Macros.h"
+#include "ace/Guard_T.h"
+#include "ace/Log_Msg.h"
+#include "ace/Get_Opt.h"
+
+/// Initialize the static data member.
+AppOptions * volatile AppOptions::instance_ = 0;
+std::auto_ptr <AppOptions> AppOptions::deleter_;
+ACE_Thread_Mutex AppOptions::lock_;
+
+AppOptions::AppOptions (void)
+ : host_monitor_ior_ ("file://HostMonitor.ior"),
+ port_ (5000),
+ arg_pair_ (0,0)
+{
+ char hostname [100];
+ gethostname (hostname, sizeof (hostname));
+ host_id_ = hostname;
+ ACE_DEBUG((LM_DEBUG,"Hostname is %s.\n",hostname));
+}
+
+AppOptions *AppOptions::instance (void)
+{
+ if (! instance_)
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, 0);
+ if (! instance_)
+ {
+ instance_ = new AppOptions ();
+ deleter_.reset (instance_);
+ }
+ }
+ return instance_;
+}
+
+bool
+AppOptions::parse_args (int argc, char **argv)
+{
+ bool retval = true;
+ this->arg_pair_ = ArgPair (argc, argv);
+
+ ACE_Get_Opt get_opts (argc, argv, "-k:z:i:p:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'z':
+ {
+ std::istringstream istr (get_opts.opt_arg ());
+ if (!(istr >> port_))
+ return false;
+ break;
+ }
+ case 'k':
+ {
+ host_monitor_ior_ = std::string (get_opts.opt_arg ());
+ break;
+ }
+/* case 'd':
+ {
+ host_id_ = std::string (get_opts.opt_arg ());
+ break;
+ }
+*/ case 'i':
+ {
+ object_info_file_ = std::string (get_opts.opt_arg ());
+ break;
+ }
+ case 'p':
+ {
+ process_id_ = std::string (get_opts.opt_arg ());
+ break;
+ }
+ }
+ return retval;
+}
+
+ArgPair AppOptions::arg_pair () const
+{
+ return this->arg_pair_;
+}
+std::string AppOptions::ior_output_file () const
+{
+ return ior_output_file_;
+}
+
+std::string AppOptions::object_info_file () const
+{
+ return object_info_file_;
+}
+
+std::string AppOptions::process_id () const
+{
+ return process_id_;
+}
+
+std::string AppOptions::host_monitor_ior () const
+{
+ return host_monitor_ior_;
+}
+
+size_t AppOptions::get_port () const
+{
+ return port_;
+}
+
+std::string AppOptions::host_id () const
+{
+ return host_id_;
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Handler.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Handler.cpp
new file mode 100644
index 00000000000..104d3361819
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Handler.cpp
@@ -0,0 +1,41 @@
+/**
+ * @file C++ Implementation: AppSideMonitor_Handler
+ *
+ * @brief Defines implementation of AppSideMonitor_Handler.
+ *
+ */
+
+#include "AppSideMonitor_Handler.h"
+#include <iostream>
+
+AppSideMonitor_Handler::AppSideMonitor_Handler ()
+: acceptor_ (0)
+{
+ int *i = 0;
+ std::cout << *i;
+ std::cerr << "AppSideMonitor_Handler::AppSideMonitor_Handler\n";
+}
+
+int AppSideMonitor_Handler::handle_input (ACE_HANDLE)
+{
+ char ch;
+
+ if (this->peer().recv (&ch, sizeof (ch) <= 0))
+ {
+ ACE_DEBUG((LM_DEBUG,"It looks like the monitor failed!\n"));
+// acceptor_->open ();
+ }
+ else
+ ACE_DEBUG((LM_DEBUG,"It looks like the monitor is misbehaving!\n"));
+
+ return -1;
+}
+
+int AppSideMonitor_Handler::open (void *factory)
+{
+ ACE_DEBUG((LM_DEBUG,"AppSideMonitor_Handler::open\n"));
+ acceptor_ = static_cast <FactoryAcceptor *> (factory);
+// acceptor_->close();
+ return super::open (factory);
+}
+
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Thread.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Thread.cpp
new file mode 100644
index 00000000000..df4059ab5ae
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideMonitor_Thread.cpp
@@ -0,0 +1,48 @@
+/**
+ * @file C++ Implementation: AppSideMonitor_Thread
+ *
+ * @brief Defines implementation of AppSideMonitor_Thread.
+ *
+ */
+
+#include "AppSideMonitor_Thread.h"
+#include "AppOptions.h"
+
+#include "ace/TP_Reactor.h"
+
+AppSideMonitor_Thread::AppSideMonitor_Thread (ACE_Barrier *thread_barrier)
+: port_ (AppOptions::instance()->get_port()),
+ reactor_ (new ACE_TP_Reactor),
+ acceptor_ (serv_addr_, &reactor_),
+ synchronizer_ (thread_barrier)
+{
+}
+
+int AppSideMonitor_Thread::svc (void)
+{
+ if (serv_addr_.set (this->port_, INADDR_ANY) == -1)
+ {
+ ACE_DEBUG ((LM_ERROR, "Can't set port.\n"));
+ return EXIT_FAILURE;
+ }
+ if (acceptor_.open (serv_addr_) == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG, "The Acceptor can't open the socket.\n"));
+ return EXIT_FAILURE;
+ }
+
+ this->synchronizer_->wait ();
+ this->synchronizer_ = 0;
+
+ //ACE_DEBUG ((LM_DEBUG, "Entering reactor event loop.\n"));
+ if (reactor_.run_reactor_event_loop() == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"run_reactor_event_loop failed\n"), -1);
+
+ return 0;
+}
+
+void AppSideMonitor_Thread::stop ()
+{
+ if (reactor_.end_reactor_event_loop() == -1)
+ ACE_DEBUG((LM_ERROR,"end_reactor_event_loop failed\n"));
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideReg.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideReg.cpp
new file mode 100644
index 00000000000..d2f9bc3fca4
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/AppSideReg.cpp
@@ -0,0 +1,121 @@
+/**
+ * @file C++ Implementation: AppSideReg
+ *
+ * @brief Defines implementation of AppSideReg.
+ *
+ */
+
+#include "AppSideReg.h"
+#include "AppOptions.h"
+#include "monitorC.h"
+#include "ArgPair.h"
+#include "ace/Barrier.h"
+
+#include <sstream>
+#include <stdexcept>
+#include <algorithm>
+
+AppSideReg::AppSideReg(ACE_Barrier *ext_barrier, CORBA::ORB_ptr orb)
+ : HM_ior_ (AppOptions::instance ()->host_monitor_ior()),
+ orb_ (CORBA::ORB::_duplicate (orb)),
+ external_barrier_ (ext_barrier)
+{
+}
+
+
+AppSideReg::~AppSideReg()
+{
+ monitor_->stop ();
+ orb_->destroy ();
+}
+
+void AppSideReg::unregister_process (void)
+{
+ hmvar_->unregister_process (AppOptions::instance()->process_id().c_str());
+}
+
+int AppSideReg::svc(void)
+{
+ try
+ {
+ //ACE_DEBUG ((LM_DEBUG, "Entering svc ()\n"));
+ //ArgPair arg_pair = AppOptions::instance()->arg_pair ();
+ //std::copy (arg_pair.argv, arg_pair.argv + arg_pair.argc,
+ // std::ostream_iterator<std::string> (std::cout,"\n"));
+ //this->orb_ = CORBA::ORB_init (arg_pair.argc, arg_pair.argv, "ORB");
+ //std::copy (arg_pair.argv, arg_pair.argv + arg_pair.argc,
+ // std::ostream_iterator<std::string> (std::cout,"\n"));
+ CORBA::Object_var obj = orb_->string_to_object (HM_ior_.c_str());
+
+ //ACE_DEBUG ((LM_DEBUG, "Obtained HM IOR\n"));
+
+ if (CORBA::is_nil (obj))
+ {
+ ACE_DEBUG((LM_ERROR, "Nil Reference\n"));
+ return 1;
+ }
+
+ /// Downcast the object reference to a reference of type HostMonitor.
+ this->hmvar_ = HostMonitor::_narrow (obj);
+ if (CORBA::is_nil (hmvar_))
+ {
+ ACE_DEBUG((LM_ERROR, "Argument is not a HostMonitor reference.\n"));
+ return 1;
+ }
+
+ //ACE_DEBUG ((LM_DEBUG, "Creating the monitor\n"));
+
+ ACE_Barrier internal_thread_barrier (2);
+ monitor_ = std::auto_ptr <AppSideMonitor_Thread>
+ (new AppSideMonitor_Thread (&internal_thread_barrier));
+ monitor_->activate ();
+
+ //ACE_DEBUG ((LM_DEBUG, "Monitor activated\n"));
+
+ internal_thread_barrier.wait ();
+ /// Waiting for the AppSideMonitor_Thread to finish its socket stuff.
+ try {
+ hmvar_->dump ();
+ } catch (CORBA::Exception & ex) {
+ ACE_DEBUG((LM_DEBUG,"exception from dump.\n"));
+ throw;
+ }
+
+
+ //ACE_DEBUG ((LM_DEBUG, "Registering process\n"));
+ try {
+ if (hmvar_->register_process (
+ AppOptions::instance()->process_id().c_str(),
+ AppOptions::instance()->host_id().c_str(),
+ AppOptions::instance()->get_port()))
+ {
+ ACE_DEBUG((LM_DEBUG, "Registered successfully %s with host monitor.\n",
+ AppOptions::instance()->process_id().c_str()));
+ }
+ else
+ {
+ ACE_DEBUG((LM_ERROR, "Registeration with the monitor failed.\n"));
+ }
+ } catch (CORBA::Exception & ex) {
+ ACE_DEBUG((LM_DEBUG,"exception from register_process.\n"));
+ throw;
+ }
+
+
+ //ACE_DEBUG ((LM_DEBUG, "Registering process\n"));
+ }
+ catch (CORBA::Exception &ex)
+ {
+ ACE_PRINT_EXCEPTION (ex, "A CORBA exception was raised:");
+ return -1;
+ }
+ catch (...)
+ {
+ ACE_DEBUG((LM_ERROR, "Unknown exception raised!"));
+ return -1;
+ }
+
+ external_barrier_->wait ();
+ return 0;
+}
+
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ArgPair.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ArgPair.cpp
new file mode 100644
index 00000000000..3645ca79367
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ArgPair.cpp
@@ -0,0 +1,39 @@
+#include <algorithm>
+
+#include "ArgPair.h"
+
+ArgPair::ArgPair (int c, char **v)
+ : argc (c),
+ argv (new char *[c])
+{
+ std::copy (v, v + c, this->argv);
+}
+
+ArgPair::ArgPair (const ArgPair &ap)
+ : argc (ap.argc),
+ argv (new char *[ap.argc])
+{
+ std::copy (ap.argv, ap.argv + ap.argc, this->argv);
+}
+
+ArgPair & ArgPair::operator = (const ArgPair &ap)
+{
+ if (this != &ap)
+ {
+ ArgPair temp (ap);
+ this->swap (temp);
+ }
+ return *this;
+}
+
+void ArgPair::swap (ArgPair &ap)
+{
+ std::swap (this->argc, ap.argc);
+ std::swap (this->argv, ap.argv);
+}
+
+ArgPair::~ArgPair()
+{
+ delete [] argv;
+}
+
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Hello.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Hello.cpp
new file mode 100644
index 00000000000..a8d58dcce8c
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Hello.cpp
@@ -0,0 +1,24 @@
+//
+// $Id$
+//
+#include "Hello.h"
+
+ACE_RCSID(Hello, Hello, "$Id$")
+
+Hello_i::Hello_i (CORBA::ORB_ptr orb)
+ : orb_ (CORBA::ORB::_duplicate (orb))
+{
+}
+
+char *
+Hello_i::get_string (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "returning string\n"));
+ return CORBA::string_dup ("Hello there!");
+}
+
+void
+Hello_i::shutdown (void)
+{
+ this->orb_->shutdown (0);
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/IOR_Interceptor.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/IOR_Interceptor.cpp
new file mode 100644
index 00000000000..6ee65169257
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/IOR_Interceptor.cpp
@@ -0,0 +1,63 @@
+#include "IOR_Interceptor.h"
+#include "ObjectReferenceFactory.h"
+#include "tao/ORB_Constants.h"
+
+ACE_RCSID (Hello,
+ IOR_Interceptor,
+ "$Id$")
+
+IOR_Interceptor::IOR_Interceptor (void)
+{
+}
+
+char *
+IOR_Interceptor::name (void)
+{
+ return CORBA::string_dup ("IOR_Interceptor");
+}
+
+void
+IOR_Interceptor::destroy (void)
+{
+}
+
+void
+IOR_Interceptor::establish_components (
+ PortableInterceptor::IORInfo_ptr /* info */)
+{
+}
+
+void
+IOR_Interceptor::components_established (
+ PortableInterceptor::IORInfo_ptr info)
+{
+ PortableInterceptor::ObjectReferenceFactory_var old_orf =
+ info->current_factory ();
+
+ PortableInterceptor::ObjectReferenceFactory * tmp;
+ ACE_NEW_THROW_EX (tmp,
+ ObjectReferenceFactory (old_orf.in ()),
+ CORBA::NO_MEMORY (
+ CORBA::SystemException::_tao_minor_code (
+ TAO::VMCID,
+ ENOMEM),
+ CORBA::COMPLETED_NO));
+
+ PortableInterceptor::ObjectReferenceFactory_var orf = tmp;
+
+ info->current_factory (orf.in ());
+}
+
+void
+IOR_Interceptor::adapter_manager_state_changed (
+ const char *,
+ PortableInterceptor::AdapterState)
+{
+}
+
+void
+IOR_Interceptor:: adapter_state_changed (
+ const PortableInterceptor::ObjectReferenceTemplateSeq &,
+ PortableInterceptor::AdapterState)
+{
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/LWFT.mpc b/TAO/orbsvcs/examples/FaultTolerance/FLARe/LWFT.mpc
new file mode 100644
index 00000000000..92de9417cdb
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/LWFT.mpc
@@ -0,0 +1,131 @@
+// -*- MPC -*-
+// $Id$
+
+project(*idl): taoidldefaults {
+ IDL_Files {
+ LWFT.idl
+ monitor.idl
+ }
+ IDL_Files {
+ idlflags += -SS
+ ObjectReferenceFactory.idl
+ }
+ custom_only = 1
+}
+
+project(*server2): rt_server, avoids_minimum_corba, avoids_corba_e_compact, avoids_corba_e_micro, objreftemplate, pi_server, interceptors, naming, pi, iorinterceptor {
+ after += *idl
+ exename = server-2
+ Source_Files {
+ server-2.cpp
+ Hello.cpp
+ ObjectReferenceFactory.cpp
+ ServerORBInitializer.cpp
+ IOR_Interceptor.cpp
+ AppSideMonitor_Thread.cpp
+ AppSideMonitor_Handler.cpp
+ AppSideReg.cpp
+ AppOptions.cpp
+ ArgPair.cpp
+ LWFTC.cpp
+ LWFTS.cpp
+ }
+ Source_Files {
+ LWFTC.cpp
+ LWFTS.cpp
+ ObjectReferenceFactoryC.cpp
+ monitorC.cpp
+ monitorS.cpp
+ }
+}
+
+project(*server1): rt_server, avoids_minimum_corba, avoids_corba_e_compact, avoids_corba_e_micro, objreftemplate, pi_server, interceptors, naming, pi, iorinterceptor {
+ after += *idl
+ exename = server-1
+ Source_Files {
+ server-1.cpp
+ Hello.cpp
+ ObjectReferenceFactory.cpp
+ ServerORBInitializer.cpp
+ IOR_Interceptor.cpp
+ AppSideMonitor_Thread.cpp
+ AppSideMonitor_Handler.cpp
+ AppSideReg.cpp
+ AppOptions.cpp
+ ArgPair.cpp
+ LWFTC.cpp
+ LWFTS.cpp
+ }
+ Source_Files {
+ LWFTC.cpp
+ LWFTS.cpp
+ ObjectReferenceFactoryC.cpp
+ monitorC.cpp
+ monitorS.cpp
+ }
+}
+
+project(*client): rt_client, avoids_minimum_corba, avoids_corba_e_compact, avoids_corba_e_micro, pi, pi_server, interceptors, naming {
+ after += *idl
+ exename = client
+ Source_Files {
+ LWFTC.cpp
+ LWFTS.cpp
+ Client_ORBInitializer.cpp
+ Client_Request_Interceptor.cpp
+ Agent.cpp
+ client.cpp
+ }
+ Source_Files {
+ LWFTC.cpp
+ LWFTS.cpp
+ }
+ IDL_Files {
+ }
+}
+
+
+project(*HostMonitor): taoserver, portableserver, naming {
+ after += *idl
+ exename = host_monitor
+ Source_Files {
+ host_monitor.cpp
+ HostMonitorImpl.cpp
+ LinuxCPULoadCalculator.cpp
+ Failure_Handler.cpp
+ Monitor_Thread.cpp
+ RM_Proxy.cpp
+ Utilization_Monitor.cpp
+ Timer.cpp
+ HMOptions.cpp
+ ArgPair.cpp
+ LWFTC.cpp
+ LWFTS.cpp
+ }
+ Source_Files {
+ LWFTC.cpp
+ LWFTS.cpp
+ monitorC.cpp
+ monitorS.cpp
+ }
+}
+
+project(*ReplicationManager): taoclient, taoserver, naming {
+ after += *idl
+ exename = ReplicationManager
+ Source_Files {
+ ReplicationManager.cpp
+ Timer.cpp
+ ReplicationManager_process.cpp
+ RMOptions.cpp
+ ArgPair.cpp
+ }
+ Source_Files {
+ LWFTC.cpp
+ LWFTS.cpp
+ RMOptions.h
+ ArgPair.h
+ }
+ IDL_Files {
+ }
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ObjectReferenceFactory.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ObjectReferenceFactory.cpp
new file mode 100644
index 00000000000..b3b5e80c70b
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ObjectReferenceFactory.cpp
@@ -0,0 +1,55 @@
+#include "ObjectReferenceFactory.h"
+#include "tao/PortableServer/PortableServer.h"
+#include "tao/Stub.h"
+#include "tao/Profile.h"
+#include "tao/debug.h"
+
+ACE_RCSID (Hello,
+ ObjectReferenceFactory,
+ "$Id$")
+
+ObjectReferenceFactory::ObjectReferenceFactory (
+ PortableInterceptor::ObjectReferenceFactory * old_orf)
+ : old_orf_ (old_orf)
+{
+ CORBA::add_ref (old_orf);
+}
+
+ObjectReferenceFactory::~ObjectReferenceFactory (void)
+{
+}
+
+CORBA::Object_ptr
+ObjectReferenceFactory::make_object (
+ const char *repository_id,
+ const PortableInterceptor::ObjectId & id)
+{
+ ACE_ASSERT (repository_id != 0);
+
+ CORBA::String_var s = PortableServer::ObjectId_to_string (id);
+
+ CORBA::Object_var ref = this->old_orf_->make_object (repository_id, id);
+
+ TAO_MProfile &mp = ref->_stubobj ()->base_profiles ();
+
+ IOP::TaggedComponent mytag;
+ const char* tag = s.in ();
+ CORBA::ULong tag_id = 9654;
+ size_t tag_length = ACE_OS::strlen (tag);
+ mytag.tag = tag_id;
+ mytag.component_data.length (tag_length + 1);
+
+ CORBA::Octet *buf = mytag.component_data.get_buffer ();
+ ACE_OS::memcpy (buf, tag, tag_length + 1);
+ buf[tag_length] = '\0';
+
+ const CORBA::ULong profile_count = mp.profile_count ();
+
+ for (CORBA::ULong i = 0; i < profile_count; ++i)
+ {
+ TAO_Profile *profile = mp.get_profile (i);
+ profile->add_tagged_component (mytag);
+ }
+
+ return ref._retn ();
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/ServerORBInitializer.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ServerORBInitializer.cpp
new file mode 100644
index 00000000000..0d8a3e735be
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/ServerORBInitializer.cpp
@@ -0,0 +1,47 @@
+#include "ServerORBInitializer.h"
+#include "IOR_Interceptor.h"
+#include "tao/ORB_Constants.h"
+#include "tao/PortableServer/PortableServer.h"
+
+ACE_RCSID (Hello,
+ ServerORBInitializer,
+ "$Id$")
+
+void
+ServerORBInitializer::pre_init (
+ PortableInterceptor::ORBInitInfo_ptr /* info */)
+{
+}
+
+void
+ServerORBInitializer::post_init (
+ PortableInterceptor::ORBInitInfo_ptr info)
+{
+
+ CORBA::Object_var obj =
+ info->resolve_initial_references ("POACurrent");
+
+ PortableServer::Current_var poa_current =
+ PortableServer::Current::_narrow (obj.in ());
+
+ ACE_ASSERT (!CORBA::is_nil (poa_current.in ()));
+
+
+ CORBA::String_var orb_id = info->orb_id ();
+
+ // Create and register the test's IORInterceptor
+
+ PortableInterceptor::IORInterceptor_ptr ior_intercept;
+ ACE_NEW_THROW_EX (ior_intercept,
+ IOR_Interceptor,
+ CORBA::NO_MEMORY (
+ CORBA::SystemException::_tao_minor_code (
+ TAO::VMCID,
+ ENOMEM),
+ CORBA::COMPLETED_NO));
+
+ PortableInterceptor::IORInterceptor_var ior_interceptor =
+ ior_intercept;
+
+ info->add_ior_interceptor (ior_interceptor.in ());
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-1.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-1.cpp
new file mode 100644
index 00000000000..263637757a3
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-1.cpp
@@ -0,0 +1,613 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "ace/Stats.h"
+#include "ace/Task.h"
+#include "ace/Sample_History.h"
+#include "ace/Throughput_Stats.h"
+#include "tao/ORB_Core.h"
+#include "tao/debug.h"
+#include "tao/RTPortableServer/RTPortableServer.h"
+#include "LWFTS.h"
+#include "tests/RTCORBA/common_args.cpp"
+#include "tests/RTCORBA/check_supported_priorities.cpp"
+#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
+#include "ace/Sample_History.h"
+#include "AppSideReg.h"
+#include "AppOptions.h"
+#include "ace/Barrier.h"
+#include "ServerORBInitializer.h"
+#include "tao/ORBInitializer_Registry.h"
+#include "ace/OS_NS_stdio.h"
+#include <fstream>
+#include <sstream>
+
+ACE_RCSID(Thread_Pools, server, "$Id$")
+
+static int stop = 0;
+
+class test_i :
+ public POA_test
+{
+public:
+ test_i (CORBA::ORB_ptr orb,
+ PortableServer::POA_ptr poa);
+
+ void method (CORBA::ULong start, CORBA::ULong end, CORBA::ULong work,
+ CORBA::ULong prime_number, CORBA::ULong kill);
+
+ void shutdown (void);
+
+ PortableServer::POA_ptr _default_POA (void);
+
+ void dump (void);
+
+private:
+ CORBA::ORB_var orb_;
+ PortableServer::POA_var poa_;
+ ACE_Sample_History history_;
+ ACE_hrtime_t start_;
+ ACE_hrtime_t end_;
+};
+
+test_i::test_i (CORBA::ORB_ptr orb,
+ PortableServer::POA_ptr poa)
+ : orb_ (CORBA::ORB::_duplicate (orb)),
+ poa_ (PortableServer::POA::_duplicate (poa)),
+ history_ (50)
+{
+}
+
+void
+test_i::method (CORBA::ULong test_start, CORBA::ULong test_end,
+ CORBA::ULong work,
+ CORBA::ULong prime_number,
+ CORBA::ULong kill)
+{
+ /*
+ CORBA::Object_var obj =
+ this->orb_->resolve_initial_references ("RTCurrent");
+
+ RTCORBA::Current_var current =
+ RTCORBA::Current::_narrow (obj.in ());
+
+ if (CORBA::is_nil (obj.in ()))
+ throw CORBA::INTERNAL ();
+
+ CORBA::Short servant_thread_priority =
+ current->the_priority ();
+
+ ACE_DEBUG ((LM_DEBUG,"Servant thread priority: %d\n",
+ servant_thread_priority));
+ */
+
+ static int i = 0;
+ ACE_DEBUG ((LM_DEBUG, "%d\n",i++));
+
+ if (kill && stop)
+ ACE_OS::exit (1);
+ if (test_start == 1)
+ {
+ this->start_ = ACE_OS::gethrtime ();
+ }
+ ACE_hrtime_t start = ACE_OS::gethrtime ();
+
+ for (; work != 0; work--)
+ ACE::is_prime (prime_number,
+ 2,
+ prime_number / 2);
+ ACE_hrtime_t end = ACE_OS::gethrtime ();
+
+ // ACE_DEBUG ((LM_DEBUG, "Time taken = %d\n", end - start));
+ this->history_.sample (end - start);
+ if (test_end == 1)
+ {
+ this->end_ = ACE_OS::gethrtime ();
+ }
+}
+
+void
+test_i::dump (void)
+{
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_Basic_Stats stats;
+ this->history_.collect_basic_stats (stats);
+ stats.dump_results ("Total", gsf);
+ ACE_Throughput_Stats::dump_throughput ("Total", gsf,
+ this->end_ - this->start_,
+ stats.samples_count ());
+}
+
+PortableServer::POA_ptr
+test_i::_default_POA (void)
+{
+ return PortableServer::POA::_duplicate (this->poa_.in ());
+}
+
+void
+test_i::shutdown (void)
+{
+ this->orb_->shutdown (0);
+}
+/*
+static const char *ior_output_file_1 = "s1.ior";
+static const char *ior_output_file_2 = "s2.ior";
+static const char *ior_output_file_3 = "s3.ior";
+static const char *ior_output_file_4 = "s4.ior";
+*/
+static CORBA::ULong static_threads = 1;
+static CORBA::ULong dynamic_threads = 0;
+static CORBA::ULong number_of_lanes = 0;
+static RTCORBA::Priority default_thread_priority = 0;
+static RTCORBA::Priority pool_priority = ACE_INT16_MIN;
+
+static const char *bands_file = "empty-file";
+static const char *lanes_file = "empty-file";
+std::string first_object_id;
+//std::string second_object_id;
+size_t first_object_role;
+double first_object_load;
+//size_t second_object_role;
+const char *rm_ior_file = "file://rm.ior";
+
+void
+read_object_info (std::string file_name)
+{
+ std::ifstream input_file (file_name.c_str ());
+ input_file >> first_object_id;
+ input_file >> first_object_role;
+ input_file >> first_object_load;
+// input_file >> second_object_id;
+// input_file >> second_object_role;
+}
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv,
+ "b:f:hl:n:s:a:" // server options
+ "c:e:g:hi:j:k:m:p:q:r:t:u:v:w:x:y:z:" // client options
+ );
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'b':
+ bands_file = get_opts.opt_arg ();
+ break;
+
+ case 'f':
+ pool_priority = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'l':
+ lanes_file = get_opts.opt_arg ();
+ break;
+
+ case 'n':
+ number_of_lanes = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 's':
+ static_threads = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'a':
+ stop = ACE_OS::atoi (get_opts.opt_arg());
+ break;
+
+
+ case 'c':
+ case 'd':
+ case 'e':
+ case 'g':
+ case 'i':
+ case 'j':
+ case 'k':
+ case 'm':
+ case 'p':
+ case 'q':
+ case 'r':
+ case 't':
+ case 'u':
+ case 'v':
+ case 'w':
+ case 'x':
+ case 'y':
+ case 'z':
+ // client options: ignored.
+ break;
+/*
+ case 'h':
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s\n"
+ "\t-b <bands file> (defaults to %s)\n"
+ "\t-f <pool priority> (defaults to %d)\n"
+ "\t-h <help: shows options menu>\n"
+ "\t-l <lanes file> (defaults to %s)\n"
+ "\t-n <number of lanes> (defaults to %d)\n"
+ "\t-o <ior file> (defaults to %s)\n"
+ "\t-s <static threads> (defaults to %d)\n"
+ "\n",
+ argv [0],
+ bands_file,
+ default_thread_priority,
+ lanes_file,
+ number_of_lanes,
+ ior_output_file_1,
+ static_threads),
+ -1);
+*/
+ }
+
+ return 0;
+}
+
+int
+write_ior_to_file (const char *ior_file,
+ CORBA::ORB_ptr orb,
+ CORBA::Object_ptr object)
+{
+ CORBA::String_var ior =
+ orb->object_to_string (object);
+
+ FILE *output_file =
+ ACE_OS::fopen (ior_file,
+ "w");
+
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_file),
+ -1);
+
+ ACE_OS::fprintf (output_file,
+ "%s",
+ ior.in ());
+
+ ACE_OS::fclose (output_file);
+
+ return 0;
+}
+
+class Task : public ACE_Task_Base
+{
+public:
+
+ Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb);
+
+ int svc (void);
+
+ CORBA::ORB_var orb_;
+
+};
+
+Task::Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb)
+ : ACE_Task_Base (&thread_manager),
+ orb_ (CORBA::ORB::_duplicate (orb))
+{
+}
+
+int
+Task::svc (void)
+{
+ try
+ {
+ CORBA::Object_var object =
+ this->orb_->resolve_initial_references ("RootPOA");
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (object.in ());
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager ();
+
+ object =
+ this->orb_->resolve_initial_references ("RTORB");
+
+ RTCORBA::RTORB_var rt_orb =
+ RTCORBA::RTORB::_narrow (object.in ());
+
+ object =
+ this->orb_->resolve_initial_references ("RTCurrent");
+
+ RTCORBA::Current_var current =
+ RTCORBA::Current::_narrow (object.in ());
+
+ default_thread_priority =
+ current->the_priority ();
+
+ int result = 0;
+ CORBA::ULong stacksize = 0;
+ CORBA::Boolean allow_request_buffering = 0;
+ CORBA::ULong max_buffered_requests = 0;
+ CORBA::ULong max_request_buffer_size = 0;
+
+ CORBA::PolicyList policies;
+
+ CORBA::Boolean allow_borrowing = 0;
+ if (number_of_lanes != 0)
+ {
+ get_auto_priority_lanes_and_bands (number_of_lanes,
+ rt_orb.in (),
+ stacksize,
+ static_threads,
+ dynamic_threads,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size,
+ allow_borrowing,
+ policies,
+ 1);
+ }
+ else if (ACE_OS::strcmp (lanes_file, "empty-file") != 0)
+ {
+ result =
+ get_priority_lanes ("server",
+ lanes_file,
+ rt_orb.in (),
+ stacksize,
+ static_threads,
+ dynamic_threads,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size,
+ allow_borrowing,
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+
+ result =
+ get_priority_bands ("server",
+ bands_file,
+ rt_orb.in (),
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+ }
+ else
+ {
+ if (pool_priority == ACE_INT16_MIN)
+ pool_priority =
+ default_thread_priority;
+
+ RTCORBA::ThreadpoolId threadpool_id =
+ rt_orb->create_threadpool (stacksize,
+ static_threads,
+ dynamic_threads,
+ pool_priority,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ rt_orb->create_threadpool_policy (threadpool_id);
+
+ if (ACE_OS::strcmp (bands_file, "empty-file") != 0)
+ {
+ result =
+ get_priority_bands ("server",
+ bands_file,
+ rt_orb.in (),
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+ }
+ }
+
+ /*
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_implicit_activation_policy
+ (PortableServer::IMPLICIT_ACTIVATION);
+ */
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ rt_orb->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED,
+ default_thread_priority);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_id_assignment_policy (PortableServer::USER_ID);
+
+ PortableServer::POA_var poa =
+ root_poa->create_POA ("RT POA",
+ poa_manager.in (),
+ policies);
+
+ read_object_info (AppOptions::instance ()->object_info_file ());
+
+ ACE_DEBUG ((LM_DEBUG, "Getting RM\n"));
+
+ CORBA::Object_var tmp = this->orb_->string_to_object (rm_ior_file);
+ ReplicationManager_var rm =
+ ReplicationManager::_narrow (tmp.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // ***************************************************
+ // first servant activated
+
+ test_i *first_servant =
+ new test_i (this->orb_.in (),
+ poa.in ());
+
+ PortableServer::ServantBase_var first_safe_servant
+ (first_servant);
+ ACE_UNUSED_ARG (first_safe_servant);
+
+ /*
+ test_var first_test =
+ first_servant->_this ();
+ */
+
+ PortableServer::ObjectId_var id_1 =
+ PortableServer::string_to_ObjectId (first_object_id.c_str ());
+ poa->activate_object_with_id (id_1.in (), first_servant);
+ CORBA::Object_var first_servant_object =
+ poa->id_to_reference (id_1.in ());
+ test_var first_test = test::_narrow (first_servant_object.in ());
+
+ std::ostringstream ostr;
+ ostr << first_object_id << first_object_role << ".ior";
+ result =
+ write_ior_to_file (ostr.str().c_str(),
+ this->orb_.in (),
+ first_test.in ());
+
+ rm->register_application (first_object_id.c_str (), first_object_load,
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ first_object_role,
+ first_test.in ());
+
+ if (result != 0)
+ return result;
+
+ // ***************************************************
+
+ // ***************************************************
+ // second servant activated
+/*
+ ACE_DEBUG ((LM_DEBUG, "activating second object\n"));
+
+ test_i *second_servant =
+ new test_i (this->orb_.in (),
+ poa.in ());
+
+ PortableServer::ServantBase_var second_safe_servant
+ (second_servant);
+ ACE_UNUSED_ARG (second_safe_servant);
+
+
+ test_var second_test =
+ second_servant->_this ();
+
+
+ PortableServer::ObjectId_var id_2 =
+ PortableServer::string_to_ObjectId (second_object_id.c_str ());
+ poa->activate_object_with_id (id_2.in (), second_servant);
+ CORBA::Object_var second_servant_object =
+ poa->id_to_reference (id_2.in ());
+ test_var second_test = test::_narrow (second_servant_object.in ());
+
+ result =
+ write_ior_to_file (ior_output_file_2,
+ this->orb_.in (),
+ second_test.in ());
+
+ rm->register_application (second_object_id.c_str (),
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ second_object_role,
+ second_test.in ());
+
+ if (result != 0)
+ return result;
+*/
+ // ***************************************************
+
+ poa_manager->activate ();
+
+ this->orb_->run ();
+
+ this->orb_->destroy ();
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ try
+ {
+ AppOptions::instance ()->parse_args (argc, argv);
+
+ PortableInterceptor::ORBInitializer_ptr tmp;
+
+ ACE_NEW_RETURN (tmp,
+ ServerORBInitializer,
+ -1); // No CORBA exceptions yet!
+
+ PortableInterceptor::ORBInitializer_var orb_initializer = tmp;
+
+ PortableInterceptor::register_orb_initializer (orb_initializer.in ());
+
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc,
+ argv,
+ "");
+
+ ACE_Barrier thread_barrier (2);
+ AppSideReg proc_reg (&thread_barrier, orb.in());
+ proc_reg.activate ();
+ thread_barrier.wait();
+
+ ACE_DEBUG ((LM_DEBUG, "After initialization of AppSide\n"));
+
+
+ int result =
+ parse_args (argc, argv);
+ if (result != 0)
+ return result;
+
+ // Make sure we can support multiple priorities that are required
+ // for this test.
+ check_supported_priorities (orb.in ());
+
+ // Thread Manager for managing task.
+ ACE_Thread_Manager thread_manager;
+
+ // Create task.
+ Task task (thread_manager,
+ orb.in ());
+
+ // Task activation flags.
+ long flags =
+ THR_NEW_LWP |
+ THR_JOINABLE |
+ orb->orb_core ()->orb_params ()->thread_creation_flags ();
+
+ // Activate task.
+ result =
+ task.activate (flags);
+ ACE_ASSERT (result != -1);
+ ACE_UNUSED_ARG (result);
+
+ // Wait for task to exit.
+ result =
+ thread_manager.wait ();
+ ACE_ASSERT (result != -1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-2.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-2.cpp
new file mode 100644
index 00000000000..2a3fe1bee90
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/server-2.cpp
@@ -0,0 +1,613 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "ace/Throughput_Stats.h"
+#include "ace/Stats.h"
+#include "ace/Task.h"
+#include "ace/Sample_History.h"
+#include "tao/ORB_Core.h"
+#include "tao/debug.h"
+#include "tao/RTPortableServer/RTPortableServer.h"
+#include "LWFTS.h"
+#include "tests/RTCORBA/common_args.cpp"
+#include "tests/RTCORBA/check_supported_priorities.cpp"
+#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
+#include "ace/Sample_History.h"
+#include "AppSideReg.h"
+#include "AppOptions.h"
+#include "ace/Barrier.h"
+#include "ServerORBInitializer.h"
+#include "tao/ORBInitializer_Registry.h"
+#include "ace/OS_NS_stdio.h"
+#include <fstream>
+#include <sstream>
+
+ACE_RCSID(Thread_Pools, server, "$Id$")
+
+static int stop = 0;
+
+class test_i :
+ public POA_test
+{
+public:
+ test_i (CORBA::ORB_ptr orb,
+ PortableServer::POA_ptr poa);
+
+ void method (CORBA::ULong start, CORBA::ULong end, CORBA::ULong work,
+ CORBA::ULong prime_number, CORBA::ULong kill);
+
+ void shutdown (void);
+
+ PortableServer::POA_ptr _default_POA (void);
+
+ void dump (void);
+
+private:
+ CORBA::ORB_var orb_;
+ PortableServer::POA_var poa_;
+ ACE_Sample_History history_;
+ ACE_hrtime_t start_;
+ ACE_hrtime_t end_;
+};
+
+test_i::test_i (CORBA::ORB_ptr orb,
+ PortableServer::POA_ptr poa)
+ : orb_ (CORBA::ORB::_duplicate (orb)),
+ poa_ (PortableServer::POA::_duplicate (poa)),
+ history_ (50)
+{
+}
+
+void
+test_i::method (CORBA::ULong test_start, CORBA::ULong test_end,
+ CORBA::ULong work,
+ CORBA::ULong prime_number,
+ CORBA::ULong kill)
+{
+ /*
+ CORBA::Object_var obj =
+ this->orb_->resolve_initial_references ("RTCurrent");
+
+ RTCORBA::Current_var current =
+ RTCORBA::Current::_narrow (obj.in ());
+
+ if (CORBA::is_nil (obj.in ()))
+ throw CORBA::INTERNAL ();
+
+ CORBA::Short servant_thread_priority =
+ current->the_priority ();
+
+ ACE_DEBUG ((LM_DEBUG,"Servant thread priority: %d\n",
+ servant_thread_priority));
+ */
+
+ //ACE_DEBUG ((LM_DEBUG, "Working on the prime number\n"));
+
+ if (kill && stop)
+ ACE_OS::exit(1);
+ if (test_start == 1)
+ {
+ this->start_ = ACE_OS::gethrtime ();
+ }
+ ACE_hrtime_t start = ACE_OS::gethrtime ();
+
+ for (; work != 0; work--)
+ ACE::is_prime (prime_number,
+ 2,
+ prime_number / 2);
+ ACE_hrtime_t end = ACE_OS::gethrtime ();
+
+ // ACE_DEBUG ((LM_DEBUG, "Time taken = %d\n", end - start));
+ this->history_.sample (end - start);
+ if (test_end == 1)
+ {
+ this->end_ = ACE_OS::gethrtime ();
+ }
+}
+
+void
+test_i::dump (void)
+{
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_Basic_Stats stats;
+ this->history_.collect_basic_stats (stats);
+ stats.dump_results ("Total", gsf);
+ ACE_Throughput_Stats::dump_throughput ("Total", gsf,
+ this->end_ - this->start_,
+ stats.samples_count ());
+}
+
+PortableServer::POA_ptr
+test_i::_default_POA (void)
+{
+ return PortableServer::POA::_duplicate (this->poa_.in ());
+}
+
+void
+test_i::shutdown (void)
+{
+ this->orb_->shutdown (0);
+}
+
+static CORBA::ULong static_threads = 1;
+static CORBA::ULong dynamic_threads = 0;
+static CORBA::ULong number_of_lanes = 0;
+static RTCORBA::Priority default_thread_priority = 0;
+static RTCORBA::Priority pool_priority = ACE_INT16_MIN;
+
+static const char *bands_file = "empty-file";
+static const char *lanes_file = "empty-file";
+std::string first_object_id;
+std::string second_object_id;
+size_t first_object_role;
+size_t second_object_role;
+double first_object_load;
+double second_object_load;
+const char *rm_ior_file = "file://rm.ior";
+
+void
+read_object_info (std::string file_name)
+{
+ std::ifstream input_file (file_name.c_str ());
+ input_file >> first_object_id;
+ input_file >> first_object_role;
+ input_file >> first_object_load;
+ input_file >> second_object_id;
+ input_file >> second_object_role;
+ input_file >> second_object_load;
+}
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv,
+ "b:f:hl:n:s:a:" // server options
+ "c:e:g:hi:j:k:m:p:q:r:t:u:v:w:x:y:z:" // client options
+ );
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'b':
+ bands_file = get_opts.opt_arg ();
+ break;
+
+ case 'f':
+ pool_priority = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'l':
+ lanes_file = get_opts.opt_arg ();
+ break;
+
+ case 'n':
+ number_of_lanes = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 's':
+ static_threads = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'a':
+ stop = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'c':
+ case 'd':
+ case 'e':
+ case 'g':
+ case 'i':
+ case 'j':
+ case 'k':
+ case 'm':
+ case 'p':
+ case 'q':
+ case 'r':
+ case 't':
+ case 'u':
+ case 'v':
+ case 'w':
+ case 'x':
+ case 'y':
+ case 'z':
+ // client options: ignored.
+ break;
+/*
+ case 'h':
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s\n"
+ "\t-b <bands file> (defaults to %s)\n"
+ "\t-f <pool priority> (defaults to %d)\n"
+ "\t-h <help: shows options menu>\n"
+ "\t-l <lanes file> (defaults to %s)\n"
+ "\t-n <number of lanes> (defaults to %d)\n"
+ "\t-o <ior file> (defaults to %s)\n"
+ "\t-s <static threads> (defaults to %d)\n"
+ "\n",
+ argv [0],
+ bands_file,
+ default_thread_priority,
+ lanes_file,
+ number_of_lanes,
+ ior_output_file_1,
+ static_threads),
+ -1);
+*/
+ }
+
+ return 0;
+}
+
+int
+write_ior_to_file (const char *ior_file,
+ CORBA::ORB_ptr orb,
+ CORBA::Object_ptr object)
+{
+ CORBA::String_var ior =
+ orb->object_to_string (object);
+
+ FILE *output_file =
+ ACE_OS::fopen (ior_file,
+ "w");
+
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_file),
+ -1);
+
+ ACE_OS::fprintf (output_file,
+ "%s",
+ ior.in ());
+
+ ACE_OS::fclose (output_file);
+
+ return 0;
+}
+
+class Task : public ACE_Task_Base
+{
+public:
+
+ Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb);
+
+ int svc (void);
+
+ CORBA::ORB_var orb_;
+
+};
+
+Task::Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb)
+ : ACE_Task_Base (&thread_manager),
+ orb_ (CORBA::ORB::_duplicate (orb))
+{
+}
+
+int
+Task::svc (void)
+{
+ try
+ {
+ CORBA::Object_var object =
+ this->orb_->resolve_initial_references ("RootPOA");
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (object.in ());
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager ();
+
+ object =
+ this->orb_->resolve_initial_references ("RTORB");
+
+ RTCORBA::RTORB_var rt_orb =
+ RTCORBA::RTORB::_narrow (object.in ());
+
+ object =
+ this->orb_->resolve_initial_references ("RTCurrent");
+
+ RTCORBA::Current_var current =
+ RTCORBA::Current::_narrow (object.in ());
+
+ default_thread_priority =
+ current->the_priority ();
+
+ int result = 0;
+ CORBA::ULong stacksize = 0;
+ CORBA::Boolean allow_request_buffering = 0;
+ CORBA::ULong max_buffered_requests = 0;
+ CORBA::ULong max_request_buffer_size = 0;
+
+ CORBA::PolicyList policies;
+
+ CORBA::Boolean allow_borrowing = 0;
+ if (number_of_lanes != 0)
+ {
+ get_auto_priority_lanes_and_bands (number_of_lanes,
+ rt_orb.in (),
+ stacksize,
+ static_threads,
+ dynamic_threads,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size,
+ allow_borrowing,
+ policies,
+ 1);
+ }
+ else if (ACE_OS::strcmp (lanes_file, "empty-file") != 0)
+ {
+ result =
+ get_priority_lanes ("server",
+ lanes_file,
+ rt_orb.in (),
+ stacksize,
+ static_threads,
+ dynamic_threads,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size,
+ allow_borrowing,
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+
+ result =
+ get_priority_bands ("server",
+ bands_file,
+ rt_orb.in (),
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+ }
+ else
+ {
+ if (pool_priority == ACE_INT16_MIN)
+ pool_priority =
+ default_thread_priority;
+
+ RTCORBA::ThreadpoolId threadpool_id =
+ rt_orb->create_threadpool (stacksize,
+ static_threads,
+ dynamic_threads,
+ pool_priority,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ rt_orb->create_threadpool_policy (threadpool_id);
+
+ if (ACE_OS::strcmp (bands_file, "empty-file") != 0)
+ {
+ result =
+ get_priority_bands ("server",
+ bands_file,
+ rt_orb.in (),
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+ }
+ }
+
+ /*
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_implicit_activation_policy
+ (PortableServer::IMPLICIT_ACTIVATION);
+ */
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ rt_orb->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED,
+ default_thread_priority);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_id_assignment_policy (PortableServer::USER_ID);
+
+ PortableServer::POA_var poa =
+ root_poa->create_POA ("RT POA",
+ poa_manager.in (),
+ policies);
+
+ read_object_info (AppOptions::instance ()->object_info_file ());
+
+ ACE_DEBUG ((LM_DEBUG, "Getting RM\n"));
+
+ CORBA::Object_var tmp = this->orb_->string_to_object (rm_ior_file);
+ ReplicationManager_var rm =
+ ReplicationManager::_narrow (tmp.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // ***************************************************
+ // first servant activated
+
+ test_i *first_servant =
+ new test_i (this->orb_.in (),
+ poa.in ());
+
+ PortableServer::ServantBase_var first_safe_servant
+ (first_servant);
+ ACE_UNUSED_ARG (first_safe_servant);
+
+ /*
+ test_var first_test =
+ first_servant->_this ();
+ */
+
+ PortableServer::ObjectId_var id_1 =
+ PortableServer::string_to_ObjectId (first_object_id.c_str ());
+ poa->activate_object_with_id (id_1.in (), first_servant);
+ CORBA::Object_var first_servant_object =
+ poa->id_to_reference (id_1.in ());
+ test_var first_test = test::_narrow (first_servant_object.in ());
+
+ std::ostringstream ostr;
+ ostr << first_object_id << first_object_role << ".ior";
+ result =
+ write_ior_to_file (ostr.str().c_str(),
+ this->orb_.in (),
+ first_test.in ());
+
+ rm->register_application (first_object_id.c_str (), first_object_load,
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ first_object_role,
+ first_test.in ());
+
+ if (result != 0)
+ return result;
+
+ // ***************************************************
+
+ // ***************************************************
+ // second servant activated
+
+ ACE_DEBUG ((LM_DEBUG, "activating second object\n"));
+
+ test_i *second_servant =
+ new test_i (this->orb_.in (),
+ poa.in ());
+
+ PortableServer::ServantBase_var second_safe_servant
+ (second_servant);
+ ACE_UNUSED_ARG (second_safe_servant);
+
+ /*
+ test_var second_test =
+ second_servant->_this ();
+ */
+
+ //second_object_id = "ABCD";
+ PortableServer::ObjectId_var id_2 =
+ PortableServer::string_to_ObjectId (second_object_id.c_str ());
+ poa->activate_object_with_id (id_2.in (), second_servant);
+ CORBA::Object_var second_servant_object =
+ poa->id_to_reference (id_2.in ());
+ test_var second_test = test::_narrow (second_servant_object.in ());
+
+ ostr.str("");
+ ostr << second_object_id << second_object_role << ".ior";
+ result =
+ write_ior_to_file (ostr.str().c_str(),
+ this->orb_.in (),
+ second_test.in ());
+
+ rm->register_application (second_object_id.c_str (), second_object_load,
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ second_object_role,
+ second_test.in ());
+
+ if (result != 0)
+ return result;
+
+ // ***************************************************
+
+ poa_manager->activate ();
+
+ this->orb_->run ();
+
+ this->orb_->destroy ();
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ try
+ {
+
+ AppOptions::instance ()->parse_args (argc, argv);
+
+ PortableInterceptor::ORBInitializer_ptr tmp;
+
+ ACE_NEW_RETURN (tmp,
+ ServerORBInitializer,
+ -1); // No CORBA exceptions yet!
+
+ PortableInterceptor::ORBInitializer_var orb_initializer = tmp;
+
+ PortableInterceptor::register_orb_initializer (orb_initializer.in ());
+
+
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc,
+ argv,
+ "");
+
+ ACE_Barrier thread_barrier (2);
+ AppSideReg proc_reg (&thread_barrier, orb.in());
+ proc_reg.activate ();
+ thread_barrier.wait();
+
+ ACE_DEBUG ((LM_DEBUG, "After initialization of AppSide\n"));
+
+
+ int result =
+ parse_args (argc, argv);
+ if (result != 0)
+ return result;
+
+ // Make sure we can support multiple priorities that are required
+ // for this test.
+ check_supported_priorities (orb.in ());
+
+ // Thread Manager for managing task.
+ ACE_Thread_Manager thread_manager;
+
+ // Create task.
+ Task task (thread_manager,
+ orb.in ());
+
+ // Task activation flags.
+ long flags =
+ THR_NEW_LWP |
+ THR_JOINABLE |
+ orb->orb_core ()->orb_params ()->thread_creation_flags ();
+
+ // Activate task.
+ result =
+ task.activate (flags);
+ ACE_ASSERT (result != -1);
+ ACE_UNUSED_ARG (result);
+
+ // Wait for task to exit.
+ result =
+ thread_manager.wait ();
+ ACE_ASSERT (result != -1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ return 0;
+}