summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-05-26 16:28:40 +0000
committerjai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-05-26 16:28:40 +0000
commitaf140812b88cc366f5fa153357dca943ebebc377 (patch)
treea00e2ebf32944ab348f488645ef46bbae115a940
parent0cbddee90ca2d09a0c1b730bf3a8395637fd370c (diff)
downloadATCD-af140812b88cc366f5fa153357dca943ebebc377.tar.gz
added FLARe directory
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/Agent.cpp229
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_ORBInitializer.cpp58
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_Request_Interceptor.cpp74
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/client.cpp1328
4 files changed, 1689 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Agent.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Agent.cpp
new file mode 100644
index 00000000000..2c5efeec6cc
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Agent.cpp
@@ -0,0 +1,229 @@
+// cvs-id : $Id$
+
+#include "Agent.h"
+
+Agent_i::Agent_i (bool proactive)
+ : //failover_map_ (100),
+ //secondary_map_ (100),
+ //failure_map_ (100),
+ proactive_(proactive),
+ update_count_ (0)
+{
+}
+
+Agent_i::~Agent_i (void)
+{
+}
+
+void Agent_i::proactive(bool v)
+{
+ proactive_ = v;
+}
+
+CORBA::Object_ptr
+Agent_i::next_member (const char *ior_string
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+{
+ ACE_DEBUG ((LM_DEBUG, "NEXT MEMBER CALLED for ior_string = %s.\n",ior_string));
+ //sleep (3);
+ //if (! proactive_)
+ // {
+// CORBA::Object_var object = RM_var_->get_next(ior_string);
+ //return CORBA::Object::_duplicate(object.in());
+// }
+
+ ACE_Guard <ACE_Thread_Mutex> guard (ior_map_mutex_);
+ AGENT_RANKED_IOR_LIST ranked_ior_list;
+ if (this->objectid_rankedior_map_.find(ACE_CString(ior_string),
+ ranked_ior_list) == 0)
+ {
+ CORBA::Object_var ior (ranked_ior_list.ior_list.front());
+ ranked_ior_list.ior_list.pop_front();
+ this->objectid_rankedior_map_.rebind(ACE_CString(ior_string),ranked_ior_list);
+ return CORBA::Object::_duplicate(ior.in());
+ }
+ else
+ {
+ ACE_DEBUG((LM_ERROR,"No ior list for tag=%s!!!\n",ior_string));
+ return 0;
+ }
+
+/* // this->RM_var_->next_member (ior_string);
+ CORBA::Object_var replica_ior;
+ CORBA::ULong failure_count = 0;
+ if (this->failure_map_.find (ACE_CString(ior_string), failure_count) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "FAILURE COUNT IS %d\n", failure_count));
+ if (failure_count == 1)
+ {
+ if (this->failover_map_.find (ior_string, replica_ior) == 0)
+ {
+ this->failure_map_.rebind (ior_string, 2);
+ return CORBA::Object::_duplicate (replica_ior.in ());
+ }
+ }
+ else if (failure_count == 2)
+ {
+ if (this->secondary_map_.find (ior_string, replica_ior) == 0)
+ {
+ return CORBA::Object::_duplicate (replica_ior.in ());
+ }
+ }
+ }
+ else
+ {
+ for (FAILURE_MAP::iterator it = this->failure_map_.begin ();
+ it != this->failure_map_.end (); ++it)
+ {
+ ACE_CString object_id = (*it).ext_id_;
+ ACE_DEBUG((LM_DEBUG,"object_id in the failure_map_ is %s.\n",object_id.c_str()));
+ }
+ }
+ return 0;
+*/
+}
+
+void Agent_i::
+update_rank_list (const RankList & rank_list
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+{
+/* if (update_count_ > 80)
+ {
+ ACE_DEBUG((LM_DEBUG,"not updated.\n"));
+ return;
+ }
+ update_count_++;
+*/
+ ACE_Guard <ACE_Thread_Mutex> guard (ior_map_mutex_);
+ objectid_rankedior_map_.close();
+ objectid_rankedior_map_.open();
+
+ // ACE_DEBUG((LM_DEBUG,"Received rank_list length = %d.\n", rank_list.length()));
+ for (size_t i = 0;i < rank_list.length();++i)
+ {
+ AGENT_RANKED_IOR_LIST ranked_ior_list;
+ ranked_ior_list.now = rank_list[i].now;
+ for (size_t j = 0; j < rank_list[i].ior_list.length(); ++j)
+ {
+ ranked_ior_list.ior_list.push_back(
+ CORBA::Object::_duplicate(rank_list[i].ior_list[j]));
+ }
+ ACE_CString oid (rank_list[i].object_id);
+ objectid_rankedior_map_.bind(oid,ranked_ior_list);
+ //ACE_DEBUG((LM_DEBUG,"object_id=%s. ior_list_size=%d.\n",
+ // oid.c_str(), ranked_ior_list.ior_list.size()));
+ }
+}
+
+void Agent_i::
+initialize (CORBA::Object_ptr rm_ior)
+{
+ this->RM_var_ = ReplicationManager::_narrow (rm_ior);
+ Agent_var temp = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ ACE_DEBUG ((LM_DEBUG, "calling register agent\n"));
+ RankList *rank_list = this->RM_var_->register_agent (temp.in ());
+ update_rank_list (*rank_list);
+
+/*
+ CORBA::String_var ior_string =
+ this->orb_->object_to_string (temp.in () ACE_ENV_ARG_PARAMETER);
+
+ FILE *output_file= ACE_OS::fopen (ior_file, "w");
+ if (output_file == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file <%s> for writing "
+ "IOR: %s",
+ ior_string.in ()),
+ 1);
+ }
+
+ ACE_OS::fprintf (output_file, "%s", ior_string.in ());
+ ACE_OS::fclose (output_file);
+*/
+
+}
+
+/*
+void
+Agent_i::update_reference (const char *ior_string
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+{
+ this->forward_str_ = CORBA::string_dup (ior_string);
+}
+
+void
+Agent_i::update_failover_list (const FailoverList &failover_list
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+{
+ ACE_DEBUG ((LM_DEBUG, "failure map updates\n"));
+ CORBA::ULong failover_list_length = failover_list.length ();
+ for (CORBA::ULong i = 0; i < failover_list_length; ++i)
+ {
+ const char *object_id = failover_list[i];
+ ACE_DEBUG ((LM_DEBUG, "OBJECT ID is %s\n", object_id));
+ this->failure_map_.rebind (object_id, 2);
+ }
+}
+
+void
+Agent_i::update_failover (const char * object_id,
+ CORBA::Object_ptr next_member
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+{
+ ACE_DEBUG ((LM_DEBUG, "calling update next member\n"));
+ this->failover_map_.rebind (object_id,
+ CORBA::Object::_duplicate (next_member));
+}
+
+void
+Agent_i::update_secondary (const char * object_id,
+ CORBA::Object_ptr next_member
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+{
+ ACE_DEBUG ((LM_DEBUG, "calling update next members\n"));
+ this->secondary_map_.rebind (object_id,
+ CORBA::Object::_duplicate (next_member));
+}
+
+void
+Agent_i::initialize_agent (const ReplicaList & replica_list,
+ const ReplicasList & replicas_list
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+{
+ ACE_DEBUG ((LM_DEBUG, "calling initialize agent\n"));
+
+ CORBA::ULong replica_list_length = replica_list.length ();
+ for (CORBA::ULong i = 0; i < replica_list_length; ++i)
+ {
+ const char *object_id = replica_list[i].object_id;
+ CORBA::Object_var replica_ior = replica_list[i].next_member;
+
+ if (this->failover_map_.bind (object_id, replica_ior) != 0)
+ ACE_DEBUG((LM_DEBUG,"failover_map_ did not bind %s.\n", object_id));
+
+ if (this->failure_map_.bind (object_id, 1) != 0)
+ ACE_DEBUG((LM_DEBUG,"failure_map_ did not bind %s.\n", object_id));
+
+ ACE_DEBUG((LM_DEBUG,"object_id added to failover map = %s:\n",object_id));
+ }
+
+ CORBA::ULong replicas_list_length = replicas_list.length ();
+ for (CORBA::ULong i = 0; i < replicas_list_length; ++i)
+ {
+ const char *object_id = replicas_list[i].object_id;
+ CORBA::Object_var replicas_ior = replicas_list[i].next_members;
+ this->secondary_map_.bind (object_id, replicas_ior);
+ ACE_DEBUG((LM_DEBUG,"object_id added to secondary map = %s:\n",object_id));
+ }
+}
+
+*/
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_ORBInitializer.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_ORBInitializer.cpp
new file mode 100644
index 00000000000..592fc06f663
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_ORBInitializer.cpp
@@ -0,0 +1,58 @@
+// -*- C++ -*-
+
+#include "Client_ORBInitializer.h"
+
+ACE_RCSID (Hello,
+ Client_ORBInitializer,
+ "$Id$")
+
+#if TAO_HAS_INTERCEPTORS == 1
+
+#include "Client_Request_Interceptor.h"
+
+#include "tao/StringSeqC.h"
+#include "tao/ORB_Constants.h"
+#include "ace/OS_NS_string.h"
+
+Client_ORBInitializer::Client_ORBInitializer (Agent_i *agent)
+ : agent_ (agent)
+{
+}
+
+Client_ORBInitializer::~Client_ORBInitializer (void)
+{
+}
+
+
+void
+Client_ORBInitializer::pre_init (
+ PortableInterceptor::ORBInitInfo_ptr)
+{
+}
+
+void
+Client_ORBInitializer::post_init (
+ PortableInterceptor::ORBInitInfo_ptr info)
+{
+ CORBA::String_var orb_id = info->orb_id ();
+
+ PortableInterceptor::ClientRequestInterceptor_ptr interceptor =
+ PortableInterceptor::ClientRequestInterceptor::_nil ();
+
+ // Install the client request interceptor.
+ ACE_NEW_THROW_EX (interceptor,
+ Client_Request_Interceptor (orb_id.in (), agent_),
+ CORBA::NO_MEMORY (
+ CORBA::SystemException::_tao_minor_code (
+ TAO::VMCID,
+ ENOMEM),
+ CORBA::COMPLETED_NO));
+
+ PortableInterceptor::ClientRequestInterceptor_var
+ client_interceptor = interceptor;
+
+ info->add_client_request_interceptor (client_interceptor.in ());
+
+}
+
+#endif /* TAO_HAS_INTERCEPTORS == 1 */
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_Request_Interceptor.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_Request_Interceptor.cpp
new file mode 100644
index 00000000000..02b773ae896
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_Request_Interceptor.cpp
@@ -0,0 +1,74 @@
+// -*- C++ -*-
+
+#include "Client_Request_Interceptor.h"
+#include "ace/Log_Msg.h"
+#include "Agent.h"
+
+ACE_RCSID (Hello,
+ Client_Request_Interceptor,
+ "$Id$")
+
+Client_Request_Interceptor::Client_Request_Interceptor (
+ const char *orb_id, Agent_i *agent)
+ : orb_id_ (CORBA::string_dup (orb_id)),
+ orb_ (),
+ request_count_ (0),
+ agent_ (agent)
+{
+}
+
+char *
+Client_Request_Interceptor::name ()
+{
+ return CORBA::string_dup ("Client_Request_Interceptor");
+}
+
+void
+Client_Request_Interceptor::destroy (void)
+{
+}
+
+void
+Client_Request_Interceptor::send_request (
+ PortableInterceptor::ClientRequestInfo_ptr ri)
+{
+}
+
+void
+Client_Request_Interceptor::send_poll (
+ PortableInterceptor::ClientRequestInfo_ptr)
+{
+}
+
+void
+Client_Request_Interceptor::receive_reply (
+ PortableInterceptor::ClientRequestInfo_ptr)
+{
+}
+
+void
+Client_Request_Interceptor::receive_exception (
+ PortableInterceptor::ClientRequestInfo_ptr ri)
+{
+ ACE_DEBUG ((LM_DEBUG, "Catching exception\n"));
+ const CORBA::ULong tagID = 9654;
+ char *tag = 0;
+ try
+ {
+ IOP::TaggedComponent_var mytag = ri->get_effective_component (tagID);
+ tag = reinterpret_cast <char *> ( mytag->component_data.get_buffer ());
+ ACE_CString new_string = CORBA::string_dup (tag);
+ CORBA::Object_var forward = this->agent_->next_member (tag);
+ ACE_THROW (PortableInterceptor::ForwardRequest (forward.in ()));
+ }
+ catch (CORBA::BAD_PARAM&)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Tagged Component not found\n"));
+ }
+}
+
+void
+Client_Request_Interceptor::receive_other (
+ PortableInterceptor::ClientRequestInfo_ptr)
+{
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/client.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/client.cpp
new file mode 100644
index 00000000000..e9e1ac92829
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/client.cpp
@@ -0,0 +1,1328 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
+#include "ace/Throughput_Stats.h"
+#include "ace/Sample_History.h"
+#include "ace/Read_Buffer.h"
+#include "ace/Array_Base.h"
+#include "ace/Task.h"
+#include "ace/OS_NS_unistd.h"
+#include "tao/ORB_Core.h"
+#include "tao/debug.h"
+#include "tao/RTCORBA/RTCORBA.h"
+#include "tao/RTCORBA/Priority_Mapping_Manager.h"
+#include "LWFTC.h"
+#include "tests/RTCORBA/common_args.cpp"
+#include "tests/RTCORBA/check_supported_priorities.cpp"
+#include "ace/Event.h"
+#include "Client_ORBInitializer.h"
+#include "tao/ORBInitializer_Registry.h"
+#include "Agent.h"
+#include "ace/Barrier.h"
+#include <vector>
+#include <fstream>
+#include <algorithm>
+
+ACE_RCSID(Thread_Pool, client, "$Id$")
+
+enum Priority_Setting
+{
+ AT_THREAD_CREATION = 0,
+ AFTER_THREAD_CREATION = 1
+};
+
+static const char *ior = "file://s1.ior";
+static const char *rates_file = "rates";
+static const char *invocation_priorities_file = "empty-file";
+static int shutdown_server = 0;
+static int do_dump_history = 0;
+static const char *history_file_name;
+static ACE_UINT32 gsf = 0;
+static CORBA::ULong continuous_workers = 0;
+static int done = 0;
+static CORBA::ULong time_for_test = 10;
+static CORBA::ULong work = 10;
+static CORBA::ULong max_throughput_timeout = 5;
+static CORBA::ULong continuous_worker_priority = 0;
+static int set_priority = 1;
+static Priority_Setting priority_setting = AFTER_THREAD_CREATION;
+static int individual_continuous_worker_stats = 0;
+static int print_missed_invocations = 0;
+static ACE_hrtime_t test_start;
+static CORBA::ULong prime_number = 9619;
+static int count_missed_end_deadlines = 0;
+const char *agent_ior_file = "agent.ior";
+const char *rm_ior = "file://rm.ior";
+std::vector <int> sample_vector (3100);
+size_t count = 0;
+const char *dummy_file_name = "temp1";
+
+class Agent_Thread : public ACE_Task_Base
+{
+public:
+ Agent_Thread (CORBA::ORB_ptr orb, Agent_i *agent, ACE_Barrier *barrier);
+
+ virtual int svc (void);
+
+private:
+ CORBA::ORB_ptr orb_;
+
+ Agent_i *agent_;
+
+ ACE_Barrier *synchronizer_;
+};
+
+Agent_Thread::Agent_Thread (CORBA::ORB_ptr orb, Agent_i *agent,
+ ACE_Barrier *thread_barrier)
+ : orb_ (orb),
+ agent_ (agent),
+ synchronizer_ (thread_barrier)
+{
+}
+
+int
+Agent_Thread::svc (void)
+{
+ try
+ {
+ CORBA::Object_var poa_object =
+ this->orb_->resolve_initial_references (
+ "RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (poa_object.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize the POA.\n"),
+ 1);
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+/*
+ Agent_var temp =
+ this->agent_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ this->orb_->object_to_string (temp.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+*/
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+/*
+ FILE *output_file= ACE_OS::fopen (agent_ior_file, "w");
+ if (output_file == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file <%s> for writing "
+ "IOR: %s",
+ ior.in ()),
+ 1);
+ }
+
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+*/
+ /*
+ CORBA::Object_var naming_obj =
+ this->orb_->resolve_initial_references ("NameService"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (naming_obj.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to get the Naming Service.\n"),
+ 1);
+
+ CosNaming::NamingContext_var naming_context =
+ CosNaming::NamingContext::_narrow (naming_obj.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CosNaming::Name name (1);
+ name.length (1);
+ name[0].id = CORBA::string_dup ("ReplicationManager");
+
+ CORBA::Object_var rm_obj =
+ naming_context->resolve (name ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ */
+
+ CORBA::Object_var tmp = this->orb_->string_to_object(rm_ior);
+ ReplicationManager_var rm =
+ ReplicationManager::_narrow (tmp.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ agent_->initialize(rm.in());
+
+ //ACE_DEBUG ((LM_DEBUG, "calling register agent\n"));
+
+ //rm->register_agent (temp.in ());
+
+ this->synchronizer_->wait ();
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return 1;
+ }
+ return 0;
+}
+
+
+struct Synchronizers
+{
+ Synchronizers (void)
+ : worker_lock_ (),
+ workers_ (1),
+ workers_ready_ (0),
+ number_of_workers_ (0)
+ {
+ }
+
+ ACE_SYNCH_MUTEX worker_lock_;
+ ACE_Event workers_;
+ CORBA::ULong workers_ready_;
+ CORBA::ULong number_of_workers_;
+};
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv,
+ "c:e:g:hi:k:m:p:q:r:t:u:v:w:x:y:z:a:" //client options
+ "b:f:hl:n:o:s:" // server options
+ );
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'c':
+ continuous_workers =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'e':
+ count_missed_end_deadlines =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'g':
+ do_dump_history =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'i':
+ individual_continuous_worker_stats =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'k':
+ ior =
+ get_opts.opt_arg ();
+ break;
+
+ case 'm':
+ print_missed_invocations =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'p':
+ invocation_priorities_file =
+ get_opts.opt_arg ();
+ break;
+
+ case 'q':
+ prime_number =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'r':
+ rates_file =
+ get_opts.opt_arg ();
+ break;
+
+ case 't':
+ time_for_test =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'u':
+ continuous_worker_priority =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'v':
+ priority_setting =
+ Priority_Setting (ACE_OS::atoi (get_opts.opt_arg ()));
+ break;
+
+ case 'w':
+ work =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'x':
+ shutdown_server =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'y':
+ set_priority =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'z':
+ max_throughput_timeout =
+ ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'a':
+ history_file_name = get_opts.opt_arg();
+ break;
+
+ case 'b':
+ case 'f':
+ case 'l':
+ case 'n':
+ case 'o':
+ case 's':
+ // server options: ignored.
+ break;
+
+ case 'h':
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s\n"
+ "\t-c <number of continuous workers> (defaults to %d)\n"
+ "\t-e <count missed end deadlines> (defaults to %d)\n"
+ "\t-g <show history> (defaults to %d)\n"
+ "\t-h <help: shows options menu>\n"
+ "\t-i <print stats of individual continuous workers> (defaults to %d)\n"
+ "\t-k <ior> (defaults to %s)\n"
+ "\t-m <print missed invocations for paced workers> (defaults to %d)\n"
+ "\t-p <invocation priorities file> (defaults to %s)\n"
+ "\t-q <prime number> (defaults to %d)\n"
+ "\t-r <rates file> (defaults to %s)\n"
+ "\t-t <time for test> (defaults to %d)\n"
+ "\t-u <continuous worker priority> (defaults to %d)\n"
+ "\t-v <priority setting: AT_THREAD_CREATION = 0, AFTER_THREAD_CREATION = 1> (defaults to %s)\n"
+ "\t-w <work> (defaults to %d)\n"
+ "\t-x <shutdown server> (defaults to %d)\n"
+ "\t-y <set invocation priorities> (defaults to %d)\n"
+ "\t-z <timeout for max throughput measurement> (defaults to %d)\n"
+ "\n",
+ argv [0],
+ continuous_workers,
+ count_missed_end_deadlines,
+ do_dump_history,
+ individual_continuous_worker_stats,
+ ior,
+ print_missed_invocations,
+ invocation_priorities_file,
+ prime_number,
+ rates_file,
+ time_for_test,
+ continuous_worker_priority,
+ priority_setting == 0 ? "AT_THREAD_CREATION" : "AFTER_THREAD_CREATION",
+ work,
+ shutdown_server,
+ set_priority,
+ max_throughput_timeout),
+ -1);
+ }
+
+ return 0;
+}
+
+double
+to_seconds (ACE_UINT64 hrtime,
+ ACE_UINT32 sf)
+{
+ double seconds =
+#if defined ACE_LACKS_LONGLONG_T
+ hrtime / sf;
+#else /* ! ACE_LACKS_LONGLONG_T */
+ static_cast<double> (ACE_UINT64_DBLCAST_ADAPTER (hrtime / sf));
+#endif /* ! ACE_LACKS_LONGLONG_T */
+ seconds /= ACE_HR_SCALE_CONVERSION;
+
+ return seconds;
+}
+
+ACE_UINT64
+to_hrtime (double seconds,
+ ACE_UINT32 sf)
+{
+ return ACE_UINT64 (seconds * sf * ACE_HR_SCALE_CONVERSION);
+}
+
+int
+start_synchronization (test_ptr test,
+ Synchronizers &synchronizers)
+{
+ CORBA::ULong synchronization_iterations = 1;
+ try
+ {
+ for (CORBA::ULong i = 0;
+ i < synchronization_iterations;
+ ++i)
+ {
+ test->method (0, 0, work,
+ prime_number, 0);
+ }
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
+ mon,
+ synchronizers.worker_lock_,
+ -1);
+
+ if (synchronizers.workers_ready_ == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
+ }
+
+ ++synchronizers.workers_ready_;
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "%d worker ready\n",
+ synchronizers.workers_ready_));
+
+ if (synchronizers.workers_ready_ ==
+ synchronizers.number_of_workers_)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
+
+ test_start =
+ ACE_OS::gethrtime ();
+
+ synchronizers.workers_.signal ();
+
+ return 0;
+ }
+ }
+
+ synchronizers.workers_.wait ();
+
+ return 0;
+}
+
+int
+end_synchronization (Synchronizers &synchronizers)
+{
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
+ mon,
+ synchronizers.worker_lock_,
+ -1);
+
+ if (synchronizers.workers_ready_ ==
+ synchronizers.number_of_workers_)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
+
+ synchronizers.workers_.reset ();
+ }
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "%d worker completed\n",
+ synchronizers.workers_ready_));
+
+ --synchronizers.workers_ready_;
+
+ if (synchronizers.workers_ready_ == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
+
+ synchronizers.workers_.signal ();
+
+ return 0;
+ }
+ }
+
+ synchronizers.workers_.wait ();
+
+ return 0;
+}
+
+int
+max_throughput (test_ptr test,
+ RTCORBA::Current_ptr current,
+ RTCORBA::PriorityMapping &priority_mapping,
+ CORBA::ULong &max_rate)
+{
+ CORBA::ULong calls_made = 0;
+ CORBA::Short CORBA_priority = 0;
+ CORBA::Short native_priority = 0;
+
+ try
+ {
+ CORBA_priority =
+ current->the_priority ();
+
+ CORBA::Boolean result =
+ priority_mapping.to_native (CORBA_priority,
+ native_priority);
+ if (!result)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in converting CORBA priority %d to native priority\n",
+ CORBA_priority),
+ -1);
+
+ ACE_hrtime_t start =
+ ACE_OS::gethrtime ();
+
+ ACE_hrtime_t end =
+ start +
+ to_hrtime (max_throughput_timeout, gsf);
+
+ for (;;)
+ {
+ ACE_hrtime_t now =
+ ACE_OS::gethrtime ();
+
+ if (now > end)
+ break;
+
+ test->method (0, 0, work,
+ prime_number, 0);
+
+ ++calls_made;
+ }
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ max_rate =
+ calls_made / max_throughput_timeout;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "\nPriority = %d/%d; Max rate calculations => %d calls in %d seconds; Max rate = %.2f\n",
+ CORBA_priority,
+ native_priority,
+ calls_made,
+ max_throughput_timeout,
+ calls_made / (double) max_throughput_timeout));
+
+ return 0;
+}
+
+class Paced_Worker :
+ public ACE_Task_Base
+{
+public:
+ Paced_Worker (ACE_Thread_Manager &thread_manager,
+ test_ptr test,
+ CORBA::Short rate,
+ CORBA::ULong iterations,
+ CORBA::Short priority,
+ RTCORBA::Current_ptr current,
+ RTCORBA::PriorityMapping &priority_mapping,
+ Synchronizers &synchronizers);
+
+ int svc (void);
+ ACE_hrtime_t deadline_for_current_call (CORBA::ULong i);
+ void reset_priority (void);
+ void print_stats (ACE_hrtime_t test_end);
+ int setup (void);
+ void missed_start_deadline (CORBA::ULong invocation);
+ void missed_end_deadline (CORBA::ULong invocation);
+
+ test_var test_;
+ int rate_;
+ ACE_Sample_History history_;
+ CORBA::Short priority_;
+ RTCORBA::Current_var current_;
+ RTCORBA::PriorityMapping &priority_mapping_;
+ Synchronizers &synchronizers_;
+ CORBA::Short CORBA_priority_;
+ CORBA::Short native_priority_;
+ ACE_hrtime_t interval_between_calls_;
+ CORBA::ULong missed_start_deadlines_;
+ CORBA::ULong missed_end_deadlines_;
+
+ typedef ACE_Array_Base<CORBA::ULong> Missed_Invocations;
+ Missed_Invocations missed_start_invocations_;
+ Missed_Invocations missed_end_invocations_;
+};
+
+Paced_Worker::Paced_Worker (ACE_Thread_Manager &thread_manager,
+ test_ptr test,
+ CORBA::Short rate,
+ CORBA::ULong iterations,
+ CORBA::Short priority,
+ RTCORBA::Current_ptr current,
+ RTCORBA::PriorityMapping &priority_mapping,
+ Synchronizers &synchronizers)
+ : ACE_Task_Base (&thread_manager),
+ test_ (test::_duplicate (test)),
+ rate_ (rate),
+ history_ (iterations),
+ priority_ (priority),
+ current_ (RTCORBA::Current::_duplicate (current)),
+ priority_mapping_ (priority_mapping),
+ synchronizers_ (synchronizers),
+ CORBA_priority_ (0),
+ native_priority_ (0),
+ interval_between_calls_ (),
+ missed_start_deadlines_ (0),
+ missed_end_deadlines_ (0),
+ missed_start_invocations_ (iterations),
+ missed_end_invocations_ (iterations)
+{
+ this->interval_between_calls_ =
+ to_hrtime (1 / double (this->rate_), gsf);
+}
+
+void
+Paced_Worker::reset_priority (void)
+{
+ if (set_priority)
+ {
+ this->current_->the_priority (this->priority_);
+ }
+ else
+ {
+ this->current_->the_priority (continuous_worker_priority);
+ }
+}
+
+ACE_hrtime_t
+Paced_Worker::deadline_for_current_call (CORBA::ULong i)
+{
+ ACE_hrtime_t deadline_for_current_call =
+ this->interval_between_calls_;
+
+ deadline_for_current_call *= i;
+
+ deadline_for_current_call += test_start;
+
+ return deadline_for_current_call;
+}
+
+void
+Paced_Worker::print_stats (ACE_hrtime_t test_end)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX,
+ mon,
+ this->synchronizers_.worker_lock_);
+
+ CORBA::ULong missed_total_deadlines =
+ this->missed_start_deadlines_ + this->missed_end_deadlines_;
+
+ CORBA::ULong made_total_deadlines =
+ this->history_.max_samples () - missed_total_deadlines;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "\n************ Statistics for thread %t ************\n\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Priority = %d/%d; Rate = %d/sec; Iterations = %d; ",
+ this->CORBA_priority_,
+ this->native_priority_,
+ this->rate_,
+ this->history_.max_samples ()));
+
+ if (count_missed_end_deadlines)
+ ACE_DEBUG ((LM_DEBUG,
+ "Deadlines made/missed[start,end]/%% = %d/%d[%d,%d]/%.2f%%; Effective Rate = %.2f\n",
+ made_total_deadlines,
+ missed_total_deadlines,
+ this->missed_start_deadlines_,
+ this->missed_end_deadlines_,
+ made_total_deadlines * 100 / (double) this->history_.max_samples (),
+ made_total_deadlines / to_seconds (test_end - test_start, gsf)));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Deadlines made/missed/%% = %d/%d/%.2f%%; Effective Rate = %.2f\n",
+ made_total_deadlines,
+ missed_total_deadlines,
+ made_total_deadlines * 100 / (double) this->history_.max_samples (),
+ made_total_deadlines / to_seconds (test_end - test_start, gsf)));
+
+
+ if (do_dump_history)
+ {
+ //this->history_.dump_samples (history_file_name, gsf);
+ std::ofstream output_file (history_file_name);
+ std::copy (sample_vector.begin(), sample_vector.end(),
+ std::ostream_iterator <int> (output_file, "\n"));
+ }
+
+ ACE_Basic_Stats stats;
+ this->history_.collect_basic_stats (stats);
+ stats.dump_results ("Total", gsf);
+
+ ACE_Throughput_Stats::dump_throughput ("Total", gsf,
+ test_end - test_start,
+ stats.samples_count ());
+
+ if (print_missed_invocations)
+ {
+ ACE_DEBUG ((LM_DEBUG, "\nMissed start invocations are: "));
+
+ for (CORBA::ULong j = 0;
+ j != this->missed_start_deadlines_;
+ ++j)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "%d ",
+ this->missed_start_invocations_[j]));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ if (count_missed_end_deadlines)
+ {
+ ACE_DEBUG ((LM_DEBUG, "\nMissed end invocations are: "));
+
+ for (CORBA::ULong j = 0;
+ j != this->missed_end_deadlines_;
+ ++j)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "%d ",
+ this->missed_end_invocations_[j]));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+ }
+ }
+}
+
+int
+Paced_Worker::setup (void)
+{
+ if (priority_setting == AFTER_THREAD_CREATION)
+ {
+ this->reset_priority ();
+ }
+
+ this->CORBA_priority_ =
+ this->current_->the_priority ();
+
+ CORBA::Boolean result =
+ this->priority_mapping_.to_native (this->CORBA_priority_,
+ this->native_priority_);
+ if (!result)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in converting CORBA priority %d to native priority\n",
+ this->CORBA_priority_),
+ -1);
+
+ return
+ start_synchronization (this->test_.in (),
+ this->synchronizers_);
+}
+
+void
+Paced_Worker::missed_start_deadline (CORBA::ULong invocation)
+{
+ this->missed_start_invocations_[this->missed_start_deadlines_++] =
+ invocation;
+}
+
+void
+Paced_Worker::missed_end_deadline (CORBA::ULong invocation)
+{
+ if (count_missed_end_deadlines)
+ this->missed_end_invocations_[this->missed_end_deadlines_++] =
+ invocation;
+}
+
+int
+Paced_Worker::svc (void)
+{
+ try
+ {
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ //size_t terminate_count = this->history_.max_samples () / 2;
+ size_t terminate_count = 500;
+ int result =
+ this->setup ();
+
+ if (result != 0)
+ return result;
+
+ CORBA::ULong testing_start = 0;
+ CORBA::ULong testing_end = 0;
+ CORBA::ULong end_iteration = this->history_.max_samples () - 1;
+
+ for (CORBA::ULong i = 0;
+ i != this->history_.max_samples ();
+ ++i)
+ {
+ ACE_hrtime_t deadline_for_current_call =
+ this->deadline_for_current_call (i);
+
+ ACE_hrtime_t time_before_call =
+ ACE_OS::gethrtime ();
+
+ if (time_before_call > deadline_for_current_call)
+ {
+ this->missed_start_deadline (i + 1);
+ continue;
+ }
+
+ if (i == 0)
+ {
+ testing_start = 1;
+ }
+ else if (i == end_iteration)
+ {
+ testing_end = 1;
+ }
+
+ if (i == terminate_count)
+ {
+ sample_vector[count++] = 0;
+ this->test_->method (testing_start, testing_end, work,
+ prime_number, 1);
+ }
+ else
+ {
+ this->test_->method (testing_start, testing_end, work,
+ prime_number, 0);
+ }
+
+ ACE_hrtime_t time_after_call =
+ ACE_OS::gethrtime ();
+ this->history_.sample (time_after_call - time_before_call);
+ sample_vector[count++] = ((time_after_call - time_before_call) / gsf);
+
+ if (time_after_call > deadline_for_current_call)
+ {
+ this->missed_end_deadline (i + 1);
+ continue;
+ }
+
+ ACE_hrtime_t sleep_time =
+ deadline_for_current_call - time_after_call;
+
+ ACE_OS::sleep (ACE_Time_Value (0,
+ long (to_seconds (sleep_time, gsf) *
+ ACE_ONE_SECOND_IN_USECS)));
+ }
+
+ ACE_hrtime_t test_end = ACE_OS::gethrtime ();
+
+ done = 1;
+
+ end_synchronization (this->synchronizers_);
+
+ this->print_stats (test_end);
+
+ ACE_DEBUG ((LM_DEBUG, "******************************\n"));
+
+ this->test_->dump ();
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ return 0;
+}
+
+class Continuous_Worker :
+ public ACE_Task_Base
+{
+public:
+ Continuous_Worker (ACE_Thread_Manager &thread_manager,
+ test_ptr test,
+ CORBA::ULong iterations,
+ RTCORBA::Current_ptr current,
+ RTCORBA::PriorityMapping &priority_mapping,
+ Synchronizers &synchronizers);
+
+ int svc (void);
+ void print_stats (ACE_Sample_History &history,
+ ACE_hrtime_t test_end);
+ int setup (void);
+ void print_collective_stats (void);
+
+ test_var test_;
+ CORBA::ULong iterations_;
+ RTCORBA::Current_var current_;
+ RTCORBA::PriorityMapping &priority_mapping_;
+ Synchronizers &synchronizers_;
+ CORBA::Short CORBA_priority_;
+ CORBA::Short native_priority_;
+ ACE_Basic_Stats collective_stats_;
+ ACE_hrtime_t time_for_test_;
+};
+
+Continuous_Worker::Continuous_Worker (ACE_Thread_Manager &thread_manager,
+ test_ptr test,
+ CORBA::ULong iterations,
+ RTCORBA::Current_ptr current,
+ RTCORBA::PriorityMapping &priority_mapping,
+ Synchronizers &synchronizers)
+ : ACE_Task_Base (&thread_manager),
+ test_ (test::_duplicate (test)),
+ iterations_ (iterations),
+ current_ (RTCORBA::Current::_duplicate (current)),
+ priority_mapping_ (priority_mapping),
+ synchronizers_ (synchronizers),
+ CORBA_priority_ (0),
+ native_priority_ (0),
+ collective_stats_ (),
+ time_for_test_ (0)
+{
+}
+
+void
+Continuous_Worker::print_stats (ACE_Sample_History &history,
+ ACE_hrtime_t test_end)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX,
+ mon,
+ this->synchronizers_.worker_lock_);
+
+ if (individual_continuous_worker_stats)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "\n************ Statistics for thread %t ************\n\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Iterations = %d\n",
+ history.sample_count ()));
+
+ if (do_dump_history)
+ {
+ history.dump_samples (history_file_name, gsf);
+ }
+
+ ACE_Basic_Stats stats;
+ history.collect_basic_stats (stats);
+ stats.dump_results ("Total", gsf);
+
+ ACE_Throughput_Stats::dump_throughput ("Total", gsf,
+ test_end - test_start,
+ stats.samples_count ());
+ }
+
+ history.collect_basic_stats (this->collective_stats_);
+ ACE_hrtime_t elapsed_time_for_current_thread =
+ test_end - test_start;
+ if (elapsed_time_for_current_thread > this->time_for_test_)
+ this->time_for_test_ = elapsed_time_for_current_thread;
+}
+
+void
+Continuous_Worker::print_collective_stats (void)
+{
+ if (continuous_workers > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "\n************ Statistics for continuous workers ************\n\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Priority = %d/%d; Collective iterations = %d; Workers = %d; Average = %d\n",
+ this->CORBA_priority_,
+ this->native_priority_,
+ this->collective_stats_.samples_count (),
+ continuous_workers,
+ this->collective_stats_.samples_count () /
+ continuous_workers));
+
+ this->collective_stats_.dump_results ("Collective", gsf);
+
+ ACE_Throughput_Stats::dump_throughput ("Individual", gsf,
+ this->time_for_test_,
+ this->collective_stats_.samples_count () /
+ continuous_workers);
+
+ ACE_Throughput_Stats::dump_throughput ("Collective", gsf,
+ this->time_for_test_,
+ this->collective_stats_.samples_count ());
+ }
+}
+
+int
+Continuous_Worker::setup (void)
+{
+ if (priority_setting == AFTER_THREAD_CREATION)
+ {
+ this->current_->the_priority (continuous_worker_priority);
+ }
+
+ this->CORBA_priority_ =
+ this->current_->the_priority ();
+
+ CORBA::Boolean result =
+ this->priority_mapping_.to_native (this->CORBA_priority_,
+ this->native_priority_);
+ if (!result)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in converting CORBA priority %d to native priority\n",
+ this->CORBA_priority_),
+ -1);
+
+ return
+ start_synchronization (this->test_.in (),
+ this->synchronizers_);
+}
+
+int
+Continuous_Worker::svc (void)
+{
+ try
+ {
+ ACE_Sample_History history (this->iterations_);
+
+ int result =
+ this->setup ();
+
+ if (result != 0)
+ return result;
+
+ for (CORBA::ULong i = 0;
+ i != history.max_samples () && !done;
+ ++i)
+ {
+ ACE_hrtime_t start = ACE_OS::gethrtime ();
+
+ this->test_->method (0, 0, work,
+ prime_number, 0);
+
+ ACE_hrtime_t end = ACE_OS::gethrtime ();
+ history.sample (end - start);
+ }
+
+ ACE_hrtime_t test_end = ACE_OS::gethrtime ();
+
+ end_synchronization (this->synchronizers_);
+
+ this->print_stats (history,
+ test_end);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ return 0;
+}
+
+class Task : public ACE_Task_Base
+{
+public:
+
+ Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb);
+
+ int svc (void);
+
+ CORBA::ORB_var orb_;
+
+};
+
+Task::Task (ACE_Thread_Manager &thread_manager,
+ CORBA::ORB_ptr orb)
+ : ACE_Task_Base (&thread_manager),
+ orb_ (CORBA::ORB::_duplicate (orb))
+{
+}
+
+int
+Task::svc (void)
+{
+ Synchronizers synchronizers;
+
+ gsf = ACE_High_Res_Timer::global_scale_factor ();
+
+ try
+ {
+ CORBA::Object_var object =
+ this->orb_->string_to_object (ior);
+
+ test_var test =
+ test::_narrow (object.in ());
+
+ object =
+ this->orb_->resolve_initial_references ("RTCurrent");
+
+ RTCORBA::Current_var current =
+ RTCORBA::Current::_narrow (object.in ());
+
+ object =
+ this->orb_->resolve_initial_references ("PriorityMappingManager");
+
+ RTCORBA::PriorityMappingManager_var mapping_manager =
+ RTCORBA::PriorityMappingManager::_narrow (object.in ());
+
+ RTCORBA::PriorityMapping &priority_mapping =
+ *mapping_manager->mapping ();
+
+ ULong_Array rates;
+ int result =
+ get_values ("client",
+ rates_file,
+ "rates",
+ rates,
+ 1);
+ if (result != 0)
+ return result;
+
+ ULong_Array invocation_priorities;
+ result =
+ get_values ("client",
+ invocation_priorities_file,
+ "invocation priorities",
+ invocation_priorities,
+ 1);
+ if (result != 0)
+ return result;
+
+ if (invocation_priorities.size () != 0 &&
+ invocation_priorities.size () != rates.size ())
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Number of invocation priorities (%d) != Number of rates (%d)\n",
+ invocation_priorities.size (),
+ rates.size ()),
+ -1);
+
+ synchronizers.number_of_workers_ =
+ rates.size ();// + continuous_workers;
+
+ CORBA::ULong max_rate = 0;
+ result =
+ max_throughput (test.in (),
+ current.in (),
+ priority_mapping,
+ max_rate);
+ if (result != 0)
+ return result;
+
+ CORBA::Short priority_range =
+ RTCORBA::maxPriority - RTCORBA::minPriority;
+
+ ACE_Thread_Manager paced_workers_manager;
+
+ CORBA::ULong i = 0;
+ Paced_Worker **paced_workers =
+ new Paced_Worker *[rates.size ()];
+
+ for (i = 0;
+ i < rates.size ();
+ ++i)
+ {
+ CORBA::Short priority = 0;
+
+ if (invocation_priorities.size () == 0)
+ priority =
+ CORBA::Short ((priority_range /
+ double (rates.size ())) *
+ (i + 1));
+ else
+ priority =
+ invocation_priorities[i];
+
+ paced_workers[i] =
+ new Paced_Worker (paced_workers_manager,
+ test.in (),
+ rates[i],
+ time_for_test * rates[i],
+ priority,
+ current.in (),
+ priority_mapping,
+ synchronizers);
+ }
+
+ CORBA::Short CORBA_priority;
+ CORBA::Short native_priority;
+ CORBA::Boolean convert_result;
+ int force_active = 0;
+
+ long flags =
+ THR_NEW_LWP |
+ THR_JOINABLE |
+ this->orb_->orb_core ()->orb_params ()->thread_creation_flags ();
+
+ for (i = 0;
+ i < rates.size ();
+ ++i)
+ {
+ if (priority_setting == AT_THREAD_CREATION)
+ {
+ if (set_priority)
+ {
+ CORBA_priority =
+ paced_workers[i]->priority_;
+
+ convert_result =
+ priority_mapping.to_native (CORBA_priority,
+ native_priority);
+ if (!convert_result)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in converting CORBA priority %d to native priority\n",
+ CORBA_priority),
+ -1);
+ }
+
+ result =
+ paced_workers[i]->activate (flags,
+ 1,
+ force_active,
+ native_priority);
+ if (result != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Paced_Worker::activate failed\n"),
+ result);
+ }
+ else
+ {
+
+ result =
+ paced_workers[i]->activate (flags);
+ if (result != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Paced_Worker::activate failed\n"),
+ result);
+ }
+ }
+
+ if (rates.size () != 0)
+ {
+ paced_workers_manager.wait ();
+ }
+
+ for (i = 0;
+ i < rates.size ();
+ ++i)
+ {
+ delete paced_workers[i];
+ }
+ delete[] paced_workers;
+
+ if (shutdown_server)
+ {
+ test->shutdown ();
+ }
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ try
+ {
+
+ Agent_i *agent;
+ ACE_NEW_RETURN (agent,
+ Agent_i (), 1);
+ PortableServer::ServantBase_var owner_transfer (agent);
+
+ // ******************************************************
+
+ // ******************************************************
+ // register request interceptor
+
+
+ PortableInterceptor::ORBInitializer_ptr temp_initializer =
+ PortableInterceptor::ORBInitializer::_nil ();
+
+ ACE_NEW_RETURN (temp_initializer,
+ Client_ORBInitializer (agent),
+ -1); // No exceptions yet!
+ PortableInterceptor::ORBInitializer_var orb_initializer =
+ temp_initializer;
+
+ PortableInterceptor::register_orb_initializer (orb_initializer.in ());
+
+
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv);
+
+ int result =
+ parse_args (argc, argv);
+ if (result != 0)
+ return result;
+
+ // Make sure we can support multiple priorities that are required
+ // for this test.
+ check_supported_priorities (orb.in ());
+
+ // Thread Manager for managing task.
+ ACE_Thread_Manager thread_manager;
+
+ ACE_Barrier thread_barrier (2);
+
+ Agent_Thread agent_thread (orb.in (), agent, &thread_barrier);
+
+ // Create task.
+ Task task (thread_manager,
+ orb.in ());
+
+ // Task activation flags.
+ long flags =
+ THR_NEW_LWP |
+ THR_JOINABLE |
+ orb->orb_core ()->orb_params ()->thread_creation_flags ();
+
+ if (agent_thread.activate (flags, 1, 0, 0) != 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot activate agent thread\n"), -1);
+ }
+
+ // Activate task.
+ result =
+ task.activate (flags);
+
+ ACE_ASSERT (result != -1);
+ ACE_UNUSED_ARG (result);
+
+ // Wait for task to exit.
+ result =
+ thread_manager.wait ();
+ ACE_ASSERT (result != -1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ return -1;
+ }
+
+ return 0;
+}