summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-05-05 16:34:35 +0000
committerjai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-05-05 16:34:35 +0000
commitc81aaf68e35eb1a713c0a542e8bc0c4f109d471c (patch)
tree8da42b56809de840effaa3a76e0c6ad21a05a8b8
parentcf1756793043e645a99f489631cb1e321ee91a6c (diff)
downloadATCD-c81aaf68e35eb1a713c0a542e8bc0c4f109d471c.tar.gz
added RT CORBA server that uses FLARe
-rwxr-xr-xTAO/orbsvcs/examples/FaultTolerance/FLARe/RTCORBA/ClientServer/server-rt.cpp483
1 files changed, 483 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/RTCORBA/ClientServer/server-rt.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/RTCORBA/ClientServer/server-rt.cpp
new file mode 100755
index 00000000000..5d4a82b00e5
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/RTCORBA/ClientServer/server-rt.cpp
@@ -0,0 +1,483 @@
+// $Id$
+
+#include <sstream>
+#include "ace/Get_Opt.h"
+
+#include "orbsvcs/orbsvcs/LWFT/AppOptions.h"
+#include "orbsvcs/orbsvcs/LWFT/AppSideMonitor_Thread.h"
+#include "orbsvcs/orbsvcs/LWFT/StateSyncAgentTask.h"
+#include "orbsvcs/orbsvcs/LWFT/ReplicationManagerC.h"
+#include "orbsvcs/orbsvcs/LWFT/LWFT_Server_Init.h"
+#include "orbsvcs/orbsvcs/LWFT/LWFT_Client_Init.h"
+#include "orbsvcs/Naming/Naming_Client.h"
+
+#include "ace/Task.h"
+#include "tao/ORB_Core.h"
+#include "tao/debug.h"
+#include "tao/RTPortableServer/RTPortableServer.h"
+#include "tests/RTCORBA/common_args.cpp"
+#include "tests/RTCORBA/check_supported_priorities.cpp"
+#include "ace/High_Res_Timer.h"
+#include "tao/ORBInitializer_Registry.h"
+#include "ace/OS_NS_stdio.h"
+#include "Worker_i.h"
+
+static int stop = 0;
+std::string ior_output = "test1.ior";
+long invocations = 0;
+
+static CORBA::ULong static_threads = 1;
+static CORBA::ULong dynamic_threads = 0;
+static CORBA::ULong number_of_lanes = 0;
+static RTCORBA::Priority default_thread_priority = 0;
+static RTCORBA::Priority pool_priority = ACE_INT16_MIN;
+static const char *bands_file = "empty-file";
+static const char *lanes_file = "empty-file";
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv,
+ ACE_TEXT ("b:f:l:n:s:a:p:i:"));
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'b':
+ bands_file = get_opts.opt_arg ();
+ break;
+
+ case 'f':
+ pool_priority = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'l':
+ lanes_file = get_opts.opt_arg ();
+ break;
+
+ case 'n':
+ number_of_lanes = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 's':
+ static_threads = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'a':
+ stop = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'i':
+ invocations = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'p':
+ ior_output = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s\n"
+ "\t-b <bands file> (defaults to %s)\n"
+ "\t-f <pool priority> (defaults to %d)\n"
+ "\t-l <lanes file> (defaults to %s)\n"
+ "\t-n <number of lanes> (defaults to %d)\n"
+ "\t-p <ior file> (defaults to %s)\n"
+ "\t-s <static threads> (defaults to %d)\n"
+ "\n",
+ argv [0],
+ bands_file,
+ default_thread_priority,
+ lanes_file,
+ number_of_lanes,
+ ior_output,
+ static_threads),
+ -1);
+ }
+
+ return 0;
+}
+
+int
+check_for_nil (CORBA::Object_ptr obj, const char *msg)
+{
+ if (CORBA::is_nil (obj))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ERROR: Object reference <%C> is nil\n",
+ msg),
+ -1);
+ else
+ return 0;
+}
+
+int
+write_ior_to_file (const char *ior_file,
+ CORBA::ORB_ptr orb,
+ CORBA::Object_ptr object)
+{
+ CORBA::String_var ior =
+ orb->object_to_string (object);
+
+ FILE *output_file =
+ ACE_OS::fopen (ior_file,
+ "w");
+
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_file),
+ -1);
+
+ ACE_OS::fprintf (output_file,
+ "%s",
+ ior.in ());
+
+ ACE_OS::fclose (output_file);
+
+ return 0;
+}
+
+int
+create_object (PortableServer::POA_ptr poa,
+ CORBA::ORB_ptr orb,
+ Worker_i *server_impl,
+ const ACE_TCHAR *filename)
+{
+ PortableServer::ObjectId_var id =
+ PortableServer::string_to_ObjectId (AppOptions::instance ()->app_id ().c_str ());
+
+ poa->activate_object_with_id (id,
+ server_impl);
+
+ CORBA::Object_var server =
+ poa->id_to_reference (id.in ());
+
+ CORBA::String_var ior =
+ orb->object_to_string (server.in ());
+
+ if (filename != 0)
+ {
+ FILE *output_file= ACE_OS::fopen (filename, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ filename),
+ -1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+ }
+
+ return 0;
+}
+
+class Task : public ACE_Task_Base
+{
+public:
+
+ Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb,
+ ReplicationManager_ptr rm,
+ StateSynchronizationAgent_ptr agent);
+
+ int svc (void);
+
+ CORBA::ORB_var orb_;
+
+ ReplicationManager_var rm_;
+ StateSynchronizationAgent_ptr agent_;
+
+};
+
+Task::Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb,
+ ReplicationManager_ptr rm,
+ StateSynchronizationAgent_ptr agent)
+ : ACE_Task_Base (&thread_manager),
+ orb_ (CORBA::ORB::_duplicate (orb)),
+ rm_ (ReplicationManager::_duplicate (rm)),
+ agent_ (StateSynchronizationAgent::_duplicate (agent))
+{
+}
+
+int
+Task::svc (void)
+{
+ try
+ {
+ CORBA::Object_var object =
+ this->orb_->resolve_initial_references ("RootPOA");
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (object.in ());
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager ();
+
+ object =
+ this->orb_->resolve_initial_references ("RTORB");
+
+ RTCORBA::RTORB_var rt_orb =
+ RTCORBA::RTORB::_narrow (object.in ());
+
+ object =
+ this->orb_->resolve_initial_references ("RTCurrent");
+
+ RTCORBA::Current_var current =
+ RTCORBA::Current::_narrow (object.in ());
+
+ default_thread_priority =
+ current->the_priority ();
+
+ int result = 0;
+ CORBA::ULong stacksize = 0;
+ CORBA::Boolean allow_request_buffering = 0;
+ CORBA::ULong max_buffered_requests = 0;
+ CORBA::ULong max_request_buffer_size = 0;
+
+ CORBA::PolicyList policies;
+
+ CORBA::Boolean allow_borrowing = 0;
+ if (number_of_lanes != 0)
+ {
+ get_auto_priority_lanes_and_bands (number_of_lanes,
+ rt_orb.in (),
+ stacksize,
+ static_threads,
+ dynamic_threads,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size,
+ allow_borrowing,
+ policies,
+ 1);
+ }
+ else if (ACE_OS::strcmp (lanes_file, "empty-file") != 0)
+ {
+ result =
+ get_priority_lanes ("server",
+ lanes_file,
+ rt_orb.in (),
+ stacksize,
+ static_threads,
+ dynamic_threads,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size,
+ allow_borrowing,
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+
+ result =
+ get_priority_bands ("server",
+ bands_file,
+ rt_orb.in (),
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+ }
+ else
+ {
+ if (pool_priority == ACE_INT16_MIN)
+ pool_priority =
+ default_thread_priority;
+
+ RTCORBA::ThreadpoolId threadpool_id =
+ rt_orb->create_threadpool (stacksize,
+ static_threads,
+ dynamic_threads,
+ pool_priority,
+ allow_request_buffering,
+ max_buffered_requests,
+ max_request_buffer_size);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ rt_orb->create_threadpool_policy (threadpool_id);
+
+ if (ACE_OS::strcmp (bands_file, "empty-file") != 0)
+ {
+ result =
+ get_priority_bands ("server",
+ bands_file,
+ rt_orb.in (),
+ policies,
+ 1);
+
+ if (result != 0)
+ return result;
+ }
+ }
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ rt_orb->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED,
+ default_thread_priority);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
+
+ policies.length (policies.length () + 1);
+ policies[policies.length () - 1] =
+ root_poa->create_id_assignment_policy (PortableServer::USER_ID);
+
+ PortableServer::POA_var poa =
+ root_poa->create_POA ("RT POA",
+ poa_manager.in (),
+ policies);
+
+ Worker_i server_impl (this->orb_.in (),
+ root_poa.in (),
+ AppOptions::instance ()->app_id (),
+ agent_,
+ invocations);
+
+ int result = create_object (root_poa.in (),
+ orb_.in (),
+ &server_impl,
+ ior_output.c_str ());
+
+ CORBA::Object_var obj = root_poa->servant_to_reference (&server_impl);
+
+ if (result == -1)
+ return -1;
+
+ poa_manager->activate ();
+
+ rm_->register_application (
+ AppOptions::instance ()->app_id ().c_str (),
+ AppOptions::instance ()->load (),
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ AppOptions::instance ()->role (),
+ obj.in ());
+
+ ReplicatedApplication_var app =
+ ReplicatedApplication::_narrow (obj.in ());
+
+ agent_->register_application (
+ AppOptions::instance ()->app_id ().c_str (),
+ app.in ());
+
+ this->orb_->run ();
+
+ ACE_DEBUG ((LM_DEBUG, "Server ORB event loop finished\n\n"));
+
+ this->orb_->destroy ();
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ try
+ {
+ LWFT_Client_Init client_initializer;
+ CORBA::ORB_var orb = client_initializer.init (argc, argv);
+
+ AppOptions::instance ()->parse_args (argc, argv);
+ AppOptions::instance ()->orb (CORBA::ORB::_duplicate (orb.in ()));
+
+ int result = AppSideMonitor_Thread::instance ()->activate ();
+
+ if (result != 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "AppSideReg::activate () returned %d\n",
+ result),
+ -1);
+ }
+
+ // create task for state synchronization agent
+ result = StateSyncAgentTask::instance ()->activate ();
+
+ if (result != 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "StateSyncAgentTask::activate () "
+ "returned %d, errno = %d\n",
+ result,
+ errno),
+ -1);
+ }
+
+ ReplicationManager_var rm;
+
+ TAO_Naming_Client naming_client;
+ naming_client.init (orb.in ());
+
+ CosNaming::NamingContextExt_var ns =
+ CosNaming::NamingContextExt::_narrow (naming_client.get_context ());
+
+ CORBA::Object_var rm_obj = ns->resolve_str ("ReplicationManager");
+
+ if (CORBA::is_nil (rm_obj.in ()))
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ServerTask: Null RM objref from Naming Service\n"));
+ }
+ else
+ {
+ rm = ReplicationManager::_narrow (rm_obj.in ());
+ }
+
+ rm->register_state_synchronization_agent (
+ AppOptions::instance ()->host_id ().c_str (),
+ AppOptions::instance ()->process_id ().c_str (),
+ StateSyncAgentTask::instance ()->agent_ref ());
+
+ // Parse arguments.
+ if (parse_args (argc, argv) != 0)
+ return -1;
+
+ // Make sure we can support multiple priorities that are required
+ // for this test.
+ check_supported_priorities (orb.in ());
+
+ // Thread Manager for managing task.
+ ACE_Thread_Manager thread_manager;
+
+ // Create task.
+ Task task (thread_manager,
+ orb.in (), rm.in (),
+ StateSyncAgentTask::instance ()->agent_ref ());
+
+ // Task activation flags.
+ long flags =
+ THR_NEW_LWP |
+ THR_JOINABLE |
+ orb->orb_core ()->orb_params ()->thread_creation_flags ();
+
+ // Activate task.
+ result =
+ task.activate (flags);
+ ACE_ASSERT (result != -1);
+ ACE_UNUSED_ARG (result);
+
+ // Wait for task to exit.
+ result =
+ thread_manager.wait ();
+ ACE_ASSERT (result != -1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ return 0;
+}