diff options
author | jai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-05-26 16:28:40 +0000 |
---|---|---|
committer | jai <jai@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-05-26 16:28:40 +0000 |
commit | af140812b88cc366f5fa153357dca943ebebc377 (patch) | |
tree | a00e2ebf32944ab348f488645ef46bbae115a940 | |
parent | 0cbddee90ca2d09a0c1b730bf3a8395637fd370c (diff) | |
download | ATCD-af140812b88cc366f5fa153357dca943ebebc377.tar.gz |
added FLARe directory
4 files changed, 1689 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Agent.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Agent.cpp new file mode 100644 index 00000000000..2c5efeec6cc --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Agent.cpp @@ -0,0 +1,229 @@ +// cvs-id : $Id$ + +#include "Agent.h" + +Agent_i::Agent_i (bool proactive) + : //failover_map_ (100), + //secondary_map_ (100), + //failure_map_ (100), + proactive_(proactive), + update_count_ (0) +{ +} + +Agent_i::~Agent_i (void) +{ +} + +void Agent_i::proactive(bool v) +{ + proactive_ = v; +} + +CORBA::Object_ptr +Agent_i::next_member (const char *ior_string + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((::CORBA::SystemException)) +{ + ACE_DEBUG ((LM_DEBUG, "NEXT MEMBER CALLED for ior_string = %s.\n",ior_string)); + //sleep (3); + //if (! proactive_) + // { +// CORBA::Object_var object = RM_var_->get_next(ior_string); + //return CORBA::Object::_duplicate(object.in()); +// } + + ACE_Guard <ACE_Thread_Mutex> guard (ior_map_mutex_); + AGENT_RANKED_IOR_LIST ranked_ior_list; + if (this->objectid_rankedior_map_.find(ACE_CString(ior_string), + ranked_ior_list) == 0) + { + CORBA::Object_var ior (ranked_ior_list.ior_list.front()); + ranked_ior_list.ior_list.pop_front(); + this->objectid_rankedior_map_.rebind(ACE_CString(ior_string),ranked_ior_list); + return CORBA::Object::_duplicate(ior.in()); + } + else + { + ACE_DEBUG((LM_ERROR,"No ior list for tag=%s!!!\n",ior_string)); + return 0; + } + +/* // this->RM_var_->next_member (ior_string); + CORBA::Object_var replica_ior; + CORBA::ULong failure_count = 0; + if (this->failure_map_.find (ACE_CString(ior_string), failure_count) == 0) + { + ACE_DEBUG ((LM_DEBUG, "FAILURE COUNT IS %d\n", failure_count)); + if (failure_count == 1) + { + if (this->failover_map_.find (ior_string, replica_ior) == 0) + { + this->failure_map_.rebind (ior_string, 2); + return CORBA::Object::_duplicate (replica_ior.in ()); + } + } + else if (failure_count == 2) + { + if (this->secondary_map_.find (ior_string, replica_ior) == 0) + { + return CORBA::Object::_duplicate (replica_ior.in ()); + } + } + } + else + { + for (FAILURE_MAP::iterator it = this->failure_map_.begin (); + it != this->failure_map_.end (); ++it) + { + ACE_CString object_id = (*it).ext_id_; + ACE_DEBUG((LM_DEBUG,"object_id in the failure_map_ is %s.\n",object_id.c_str())); + } + } + return 0; +*/ +} + +void Agent_i:: +update_rank_list (const RankList & rank_list + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((::CORBA::SystemException)) +{ +/* if (update_count_ > 80) + { + ACE_DEBUG((LM_DEBUG,"not updated.\n")); + return; + } + update_count_++; +*/ + ACE_Guard <ACE_Thread_Mutex> guard (ior_map_mutex_); + objectid_rankedior_map_.close(); + objectid_rankedior_map_.open(); + + // ACE_DEBUG((LM_DEBUG,"Received rank_list length = %d.\n", rank_list.length())); + for (size_t i = 0;i < rank_list.length();++i) + { + AGENT_RANKED_IOR_LIST ranked_ior_list; + ranked_ior_list.now = rank_list[i].now; + for (size_t j = 0; j < rank_list[i].ior_list.length(); ++j) + { + ranked_ior_list.ior_list.push_back( + CORBA::Object::_duplicate(rank_list[i].ior_list[j])); + } + ACE_CString oid (rank_list[i].object_id); + objectid_rankedior_map_.bind(oid,ranked_ior_list); + //ACE_DEBUG((LM_DEBUG,"object_id=%s. ior_list_size=%d.\n", + // oid.c_str(), ranked_ior_list.ior_list.size())); + } +} + +void Agent_i:: +initialize (CORBA::Object_ptr rm_ior) +{ + this->RM_var_ = ReplicationManager::_narrow (rm_ior); + Agent_var temp = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + + ACE_DEBUG ((LM_DEBUG, "calling register agent\n")); + RankList *rank_list = this->RM_var_->register_agent (temp.in ()); + update_rank_list (*rank_list); + +/* + CORBA::String_var ior_string = + this->orb_->object_to_string (temp.in () ACE_ENV_ARG_PARAMETER); + + FILE *output_file= ACE_OS::fopen (ior_file, "w"); + if (output_file == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file <%s> for writing " + "IOR: %s", + ior_string.in ()), + 1); + } + + ACE_OS::fprintf (output_file, "%s", ior_string.in ()); + ACE_OS::fclose (output_file); +*/ + +} + +/* +void +Agent_i::update_reference (const char *ior_string + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((::CORBA::SystemException)) +{ + this->forward_str_ = CORBA::string_dup (ior_string); +} + +void +Agent_i::update_failover_list (const FailoverList &failover_list + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((::CORBA::SystemException)) +{ + ACE_DEBUG ((LM_DEBUG, "failure map updates\n")); + CORBA::ULong failover_list_length = failover_list.length (); + for (CORBA::ULong i = 0; i < failover_list_length; ++i) + { + const char *object_id = failover_list[i]; + ACE_DEBUG ((LM_DEBUG, "OBJECT ID is %s\n", object_id)); + this->failure_map_.rebind (object_id, 2); + } +} + +void +Agent_i::update_failover (const char * object_id, + CORBA::Object_ptr next_member + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((::CORBA::SystemException)) +{ + ACE_DEBUG ((LM_DEBUG, "calling update next member\n")); + this->failover_map_.rebind (object_id, + CORBA::Object::_duplicate (next_member)); +} + +void +Agent_i::update_secondary (const char * object_id, + CORBA::Object_ptr next_member + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((::CORBA::SystemException)) +{ + ACE_DEBUG ((LM_DEBUG, "calling update next members\n")); + this->secondary_map_.rebind (object_id, + CORBA::Object::_duplicate (next_member)); +} + +void +Agent_i::initialize_agent (const ReplicaList & replica_list, + const ReplicasList & replicas_list + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((::CORBA::SystemException)) +{ + ACE_DEBUG ((LM_DEBUG, "calling initialize agent\n")); + + CORBA::ULong replica_list_length = replica_list.length (); + for (CORBA::ULong i = 0; i < replica_list_length; ++i) + { + const char *object_id = replica_list[i].object_id; + CORBA::Object_var replica_ior = replica_list[i].next_member; + + if (this->failover_map_.bind (object_id, replica_ior) != 0) + ACE_DEBUG((LM_DEBUG,"failover_map_ did not bind %s.\n", object_id)); + + if (this->failure_map_.bind (object_id, 1) != 0) + ACE_DEBUG((LM_DEBUG,"failure_map_ did not bind %s.\n", object_id)); + + ACE_DEBUG((LM_DEBUG,"object_id added to failover map = %s:\n",object_id)); + } + + CORBA::ULong replicas_list_length = replicas_list.length (); + for (CORBA::ULong i = 0; i < replicas_list_length; ++i) + { + const char *object_id = replicas_list[i].object_id; + CORBA::Object_var replicas_ior = replicas_list[i].next_members; + this->secondary_map_.bind (object_id, replicas_ior); + ACE_DEBUG((LM_DEBUG,"object_id added to secondary map = %s:\n",object_id)); + } +} + +*/ diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_ORBInitializer.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_ORBInitializer.cpp new file mode 100644 index 00000000000..592fc06f663 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_ORBInitializer.cpp @@ -0,0 +1,58 @@ +// -*- C++ -*- + +#include "Client_ORBInitializer.h" + +ACE_RCSID (Hello, + Client_ORBInitializer, + "$Id$") + +#if TAO_HAS_INTERCEPTORS == 1 + +#include "Client_Request_Interceptor.h" + +#include "tao/StringSeqC.h" +#include "tao/ORB_Constants.h" +#include "ace/OS_NS_string.h" + +Client_ORBInitializer::Client_ORBInitializer (Agent_i *agent) + : agent_ (agent) +{ +} + +Client_ORBInitializer::~Client_ORBInitializer (void) +{ +} + + +void +Client_ORBInitializer::pre_init ( + PortableInterceptor::ORBInitInfo_ptr) +{ +} + +void +Client_ORBInitializer::post_init ( + PortableInterceptor::ORBInitInfo_ptr info) +{ + CORBA::String_var orb_id = info->orb_id (); + + PortableInterceptor::ClientRequestInterceptor_ptr interceptor = + PortableInterceptor::ClientRequestInterceptor::_nil (); + + // Install the client request interceptor. + ACE_NEW_THROW_EX (interceptor, + Client_Request_Interceptor (orb_id.in (), agent_), + CORBA::NO_MEMORY ( + CORBA::SystemException::_tao_minor_code ( + TAO::VMCID, + ENOMEM), + CORBA::COMPLETED_NO)); + + PortableInterceptor::ClientRequestInterceptor_var + client_interceptor = interceptor; + + info->add_client_request_interceptor (client_interceptor.in ()); + +} + +#endif /* TAO_HAS_INTERCEPTORS == 1 */ diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_Request_Interceptor.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_Request_Interceptor.cpp new file mode 100644 index 00000000000..02b773ae896 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/Client_Request_Interceptor.cpp @@ -0,0 +1,74 @@ +// -*- C++ -*- + +#include "Client_Request_Interceptor.h" +#include "ace/Log_Msg.h" +#include "Agent.h" + +ACE_RCSID (Hello, + Client_Request_Interceptor, + "$Id$") + +Client_Request_Interceptor::Client_Request_Interceptor ( + const char *orb_id, Agent_i *agent) + : orb_id_ (CORBA::string_dup (orb_id)), + orb_ (), + request_count_ (0), + agent_ (agent) +{ +} + +char * +Client_Request_Interceptor::name () +{ + return CORBA::string_dup ("Client_Request_Interceptor"); +} + +void +Client_Request_Interceptor::destroy (void) +{ +} + +void +Client_Request_Interceptor::send_request ( + PortableInterceptor::ClientRequestInfo_ptr ri) +{ +} + +void +Client_Request_Interceptor::send_poll ( + PortableInterceptor::ClientRequestInfo_ptr) +{ +} + +void +Client_Request_Interceptor::receive_reply ( + PortableInterceptor::ClientRequestInfo_ptr) +{ +} + +void +Client_Request_Interceptor::receive_exception ( + PortableInterceptor::ClientRequestInfo_ptr ri) +{ + ACE_DEBUG ((LM_DEBUG, "Catching exception\n")); + const CORBA::ULong tagID = 9654; + char *tag = 0; + try + { + IOP::TaggedComponent_var mytag = ri->get_effective_component (tagID); + tag = reinterpret_cast <char *> ( mytag->component_data.get_buffer ()); + ACE_CString new_string = CORBA::string_dup (tag); + CORBA::Object_var forward = this->agent_->next_member (tag); + ACE_THROW (PortableInterceptor::ForwardRequest (forward.in ())); + } + catch (CORBA::BAD_PARAM&) + { + ACE_DEBUG ((LM_DEBUG, "Tagged Component not found\n")); + } +} + +void +Client_Request_Interceptor::receive_other ( + PortableInterceptor::ClientRequestInfo_ptr) +{ +} diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/client.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/client.cpp new file mode 100644 index 00000000000..e9e1ac92829 --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/client.cpp @@ -0,0 +1,1328 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" +#include "ace/Throughput_Stats.h" +#include "ace/Sample_History.h" +#include "ace/Read_Buffer.h" +#include "ace/Array_Base.h" +#include "ace/Task.h" +#include "ace/OS_NS_unistd.h" +#include "tao/ORB_Core.h" +#include "tao/debug.h" +#include "tao/RTCORBA/RTCORBA.h" +#include "tao/RTCORBA/Priority_Mapping_Manager.h" +#include "LWFTC.h" +#include "tests/RTCORBA/common_args.cpp" +#include "tests/RTCORBA/check_supported_priorities.cpp" +#include "ace/Event.h" +#include "Client_ORBInitializer.h" +#include "tao/ORBInitializer_Registry.h" +#include "Agent.h" +#include "ace/Barrier.h" +#include <vector> +#include <fstream> +#include <algorithm> + +ACE_RCSID(Thread_Pool, client, "$Id$") + +enum Priority_Setting +{ + AT_THREAD_CREATION = 0, + AFTER_THREAD_CREATION = 1 +}; + +static const char *ior = "file://s1.ior"; +static const char *rates_file = "rates"; +static const char *invocation_priorities_file = "empty-file"; +static int shutdown_server = 0; +static int do_dump_history = 0; +static const char *history_file_name; +static ACE_UINT32 gsf = 0; +static CORBA::ULong continuous_workers = 0; +static int done = 0; +static CORBA::ULong time_for_test = 10; +static CORBA::ULong work = 10; +static CORBA::ULong max_throughput_timeout = 5; +static CORBA::ULong continuous_worker_priority = 0; +static int set_priority = 1; +static Priority_Setting priority_setting = AFTER_THREAD_CREATION; +static int individual_continuous_worker_stats = 0; +static int print_missed_invocations = 0; +static ACE_hrtime_t test_start; +static CORBA::ULong prime_number = 9619; +static int count_missed_end_deadlines = 0; +const char *agent_ior_file = "agent.ior"; +const char *rm_ior = "file://rm.ior"; +std::vector <int> sample_vector (3100); +size_t count = 0; +const char *dummy_file_name = "temp1"; + +class Agent_Thread : public ACE_Task_Base +{ +public: + Agent_Thread (CORBA::ORB_ptr orb, Agent_i *agent, ACE_Barrier *barrier); + + virtual int svc (void); + +private: + CORBA::ORB_ptr orb_; + + Agent_i *agent_; + + ACE_Barrier *synchronizer_; +}; + +Agent_Thread::Agent_Thread (CORBA::ORB_ptr orb, Agent_i *agent, + ACE_Barrier *thread_barrier) + : orb_ (orb), + agent_ (agent), + synchronizer_ (thread_barrier) +{ +} + +int +Agent_Thread::svc (void) +{ + try + { + CORBA::Object_var poa_object = + this->orb_->resolve_initial_references ( + "RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n"), + 1); + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; +/* + Agent_var temp = + this->agent_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::String_var ior = + this->orb_->object_to_string (temp.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; +*/ + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; +/* + FILE *output_file= ACE_OS::fopen (agent_ior_file, "w"); + if (output_file == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file <%s> for writing " + "IOR: %s", + ior.in ()), + 1); + } + + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); +*/ + /* + CORBA::Object_var naming_obj = + this->orb_->resolve_initial_references ("NameService" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup ("ReplicationManager"); + + CORBA::Object_var rm_obj = + naming_context->resolve (name ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + */ + + CORBA::Object_var tmp = this->orb_->string_to_object(rm_ior); + ReplicationManager_var rm = + ReplicationManager::_narrow (tmp.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + agent_->initialize(rm.in()); + + //ACE_DEBUG ((LM_DEBUG, "calling register agent\n")); + + //rm->register_agent (temp.in ()); + + this->synchronizer_->wait (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return 1; + } + return 0; +} + + +struct Synchronizers +{ + Synchronizers (void) + : worker_lock_ (), + workers_ (1), + workers_ready_ (0), + number_of_workers_ (0) + { + } + + ACE_SYNCH_MUTEX worker_lock_; + ACE_Event workers_; + CORBA::ULong workers_ready_; + CORBA::ULong number_of_workers_; +}; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, + "c:e:g:hi:k:m:p:q:r:t:u:v:w:x:y:z:a:" //client options + "b:f:hl:n:o:s:" // server options + ); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'c': + continuous_workers = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'e': + count_missed_end_deadlines = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'g': + do_dump_history = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'i': + individual_continuous_worker_stats = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'k': + ior = + get_opts.opt_arg (); + break; + + case 'm': + print_missed_invocations = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'p': + invocation_priorities_file = + get_opts.opt_arg (); + break; + + case 'q': + prime_number = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'r': + rates_file = + get_opts.opt_arg (); + break; + + case 't': + time_for_test = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'u': + continuous_worker_priority = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'v': + priority_setting = + Priority_Setting (ACE_OS::atoi (get_opts.opt_arg ())); + break; + + case 'w': + work = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'x': + shutdown_server = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'y': + set_priority = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'z': + max_throughput_timeout = + ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'a': + history_file_name = get_opts.opt_arg(); + break; + + case 'b': + case 'f': + case 'l': + case 'n': + case 'o': + case 's': + // server options: ignored. + break; + + case 'h': + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s\n" + "\t-c <number of continuous workers> (defaults to %d)\n" + "\t-e <count missed end deadlines> (defaults to %d)\n" + "\t-g <show history> (defaults to %d)\n" + "\t-h <help: shows options menu>\n" + "\t-i <print stats of individual continuous workers> (defaults to %d)\n" + "\t-k <ior> (defaults to %s)\n" + "\t-m <print missed invocations for paced workers> (defaults to %d)\n" + "\t-p <invocation priorities file> (defaults to %s)\n" + "\t-q <prime number> (defaults to %d)\n" + "\t-r <rates file> (defaults to %s)\n" + "\t-t <time for test> (defaults to %d)\n" + "\t-u <continuous worker priority> (defaults to %d)\n" + "\t-v <priority setting: AT_THREAD_CREATION = 0, AFTER_THREAD_CREATION = 1> (defaults to %s)\n" + "\t-w <work> (defaults to %d)\n" + "\t-x <shutdown server> (defaults to %d)\n" + "\t-y <set invocation priorities> (defaults to %d)\n" + "\t-z <timeout for max throughput measurement> (defaults to %d)\n" + "\n", + argv [0], + continuous_workers, + count_missed_end_deadlines, + do_dump_history, + individual_continuous_worker_stats, + ior, + print_missed_invocations, + invocation_priorities_file, + prime_number, + rates_file, + time_for_test, + continuous_worker_priority, + priority_setting == 0 ? "AT_THREAD_CREATION" : "AFTER_THREAD_CREATION", + work, + shutdown_server, + set_priority, + max_throughput_timeout), + -1); + } + + return 0; +} + +double +to_seconds (ACE_UINT64 hrtime, + ACE_UINT32 sf) +{ + double seconds = +#if defined ACE_LACKS_LONGLONG_T + hrtime / sf; +#else /* ! ACE_LACKS_LONGLONG_T */ + static_cast<double> (ACE_UINT64_DBLCAST_ADAPTER (hrtime / sf)); +#endif /* ! ACE_LACKS_LONGLONG_T */ + seconds /= ACE_HR_SCALE_CONVERSION; + + return seconds; +} + +ACE_UINT64 +to_hrtime (double seconds, + ACE_UINT32 sf) +{ + return ACE_UINT64 (seconds * sf * ACE_HR_SCALE_CONVERSION); +} + +int +start_synchronization (test_ptr test, + Synchronizers &synchronizers) +{ + CORBA::ULong synchronization_iterations = 1; + try + { + for (CORBA::ULong i = 0; + i < synchronization_iterations; + ++i) + { + test->method (0, 0, work, + prime_number, 0); + } + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + mon, + synchronizers.worker_lock_, + -1); + + if (synchronizers.workers_ready_ == 0) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "\n")); + } + + ++synchronizers.workers_ready_; + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "%d worker ready\n", + synchronizers.workers_ready_)); + + if (synchronizers.workers_ready_ == + synchronizers.number_of_workers_) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "\n")); + + test_start = + ACE_OS::gethrtime (); + + synchronizers.workers_.signal (); + + return 0; + } + } + + synchronizers.workers_.wait (); + + return 0; +} + +int +end_synchronization (Synchronizers &synchronizers) +{ + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + mon, + synchronizers.worker_lock_, + -1); + + if (synchronizers.workers_ready_ == + synchronizers.number_of_workers_) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "\n")); + + synchronizers.workers_.reset (); + } + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "%d worker completed\n", + synchronizers.workers_ready_)); + + --synchronizers.workers_ready_; + + if (synchronizers.workers_ready_ == 0) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "\n")); + + synchronizers.workers_.signal (); + + return 0; + } + } + + synchronizers.workers_.wait (); + + return 0; +} + +int +max_throughput (test_ptr test, + RTCORBA::Current_ptr current, + RTCORBA::PriorityMapping &priority_mapping, + CORBA::ULong &max_rate) +{ + CORBA::ULong calls_made = 0; + CORBA::Short CORBA_priority = 0; + CORBA::Short native_priority = 0; + + try + { + CORBA_priority = + current->the_priority (); + + CORBA::Boolean result = + priority_mapping.to_native (CORBA_priority, + native_priority); + if (!result) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in converting CORBA priority %d to native priority\n", + CORBA_priority), + -1); + + ACE_hrtime_t start = + ACE_OS::gethrtime (); + + ACE_hrtime_t end = + start + + to_hrtime (max_throughput_timeout, gsf); + + for (;;) + { + ACE_hrtime_t now = + ACE_OS::gethrtime (); + + if (now > end) + break; + + test->method (0, 0, work, + prime_number, 0); + + ++calls_made; + } + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + max_rate = + calls_made / max_throughput_timeout; + + ACE_DEBUG ((LM_DEBUG, + "\nPriority = %d/%d; Max rate calculations => %d calls in %d seconds; Max rate = %.2f\n", + CORBA_priority, + native_priority, + calls_made, + max_throughput_timeout, + calls_made / (double) max_throughput_timeout)); + + return 0; +} + +class Paced_Worker : + public ACE_Task_Base +{ +public: + Paced_Worker (ACE_Thread_Manager &thread_manager, + test_ptr test, + CORBA::Short rate, + CORBA::ULong iterations, + CORBA::Short priority, + RTCORBA::Current_ptr current, + RTCORBA::PriorityMapping &priority_mapping, + Synchronizers &synchronizers); + + int svc (void); + ACE_hrtime_t deadline_for_current_call (CORBA::ULong i); + void reset_priority (void); + void print_stats (ACE_hrtime_t test_end); + int setup (void); + void missed_start_deadline (CORBA::ULong invocation); + void missed_end_deadline (CORBA::ULong invocation); + + test_var test_; + int rate_; + ACE_Sample_History history_; + CORBA::Short priority_; + RTCORBA::Current_var current_; + RTCORBA::PriorityMapping &priority_mapping_; + Synchronizers &synchronizers_; + CORBA::Short CORBA_priority_; + CORBA::Short native_priority_; + ACE_hrtime_t interval_between_calls_; + CORBA::ULong missed_start_deadlines_; + CORBA::ULong missed_end_deadlines_; + + typedef ACE_Array_Base<CORBA::ULong> Missed_Invocations; + Missed_Invocations missed_start_invocations_; + Missed_Invocations missed_end_invocations_; +}; + +Paced_Worker::Paced_Worker (ACE_Thread_Manager &thread_manager, + test_ptr test, + CORBA::Short rate, + CORBA::ULong iterations, + CORBA::Short priority, + RTCORBA::Current_ptr current, + RTCORBA::PriorityMapping &priority_mapping, + Synchronizers &synchronizers) + : ACE_Task_Base (&thread_manager), + test_ (test::_duplicate (test)), + rate_ (rate), + history_ (iterations), + priority_ (priority), + current_ (RTCORBA::Current::_duplicate (current)), + priority_mapping_ (priority_mapping), + synchronizers_ (synchronizers), + CORBA_priority_ (0), + native_priority_ (0), + interval_between_calls_ (), + missed_start_deadlines_ (0), + missed_end_deadlines_ (0), + missed_start_invocations_ (iterations), + missed_end_invocations_ (iterations) +{ + this->interval_between_calls_ = + to_hrtime (1 / double (this->rate_), gsf); +} + +void +Paced_Worker::reset_priority (void) +{ + if (set_priority) + { + this->current_->the_priority (this->priority_); + } + else + { + this->current_->the_priority (continuous_worker_priority); + } +} + +ACE_hrtime_t +Paced_Worker::deadline_for_current_call (CORBA::ULong i) +{ + ACE_hrtime_t deadline_for_current_call = + this->interval_between_calls_; + + deadline_for_current_call *= i; + + deadline_for_current_call += test_start; + + return deadline_for_current_call; +} + +void +Paced_Worker::print_stats (ACE_hrtime_t test_end) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, + mon, + this->synchronizers_.worker_lock_); + + CORBA::ULong missed_total_deadlines = + this->missed_start_deadlines_ + this->missed_end_deadlines_; + + CORBA::ULong made_total_deadlines = + this->history_.max_samples () - missed_total_deadlines; + + ACE_DEBUG ((LM_DEBUG, + "\n************ Statistics for thread %t ************\n\n")); + + ACE_DEBUG ((LM_DEBUG, + "Priority = %d/%d; Rate = %d/sec; Iterations = %d; ", + this->CORBA_priority_, + this->native_priority_, + this->rate_, + this->history_.max_samples ())); + + if (count_missed_end_deadlines) + ACE_DEBUG ((LM_DEBUG, + "Deadlines made/missed[start,end]/%% = %d/%d[%d,%d]/%.2f%%; Effective Rate = %.2f\n", + made_total_deadlines, + missed_total_deadlines, + this->missed_start_deadlines_, + this->missed_end_deadlines_, + made_total_deadlines * 100 / (double) this->history_.max_samples (), + made_total_deadlines / to_seconds (test_end - test_start, gsf))); + else + ACE_DEBUG ((LM_DEBUG, + "Deadlines made/missed/%% = %d/%d/%.2f%%; Effective Rate = %.2f\n", + made_total_deadlines, + missed_total_deadlines, + made_total_deadlines * 100 / (double) this->history_.max_samples (), + made_total_deadlines / to_seconds (test_end - test_start, gsf))); + + + if (do_dump_history) + { + //this->history_.dump_samples (history_file_name, gsf); + std::ofstream output_file (history_file_name); + std::copy (sample_vector.begin(), sample_vector.end(), + std::ostream_iterator <int> (output_file, "\n")); + } + + ACE_Basic_Stats stats; + this->history_.collect_basic_stats (stats); + stats.dump_results ("Total", gsf); + + ACE_Throughput_Stats::dump_throughput ("Total", gsf, + test_end - test_start, + stats.samples_count ()); + + if (print_missed_invocations) + { + ACE_DEBUG ((LM_DEBUG, "\nMissed start invocations are: ")); + + for (CORBA::ULong j = 0; + j != this->missed_start_deadlines_; + ++j) + { + ACE_DEBUG ((LM_DEBUG, + "%d ", + this->missed_start_invocations_[j])); + } + + ACE_DEBUG ((LM_DEBUG, "\n")); + + if (count_missed_end_deadlines) + { + ACE_DEBUG ((LM_DEBUG, "\nMissed end invocations are: ")); + + for (CORBA::ULong j = 0; + j != this->missed_end_deadlines_; + ++j) + { + ACE_DEBUG ((LM_DEBUG, + "%d ", + this->missed_end_invocations_[j])); + } + + ACE_DEBUG ((LM_DEBUG, "\n")); + } + } +} + +int +Paced_Worker::setup (void) +{ + if (priority_setting == AFTER_THREAD_CREATION) + { + this->reset_priority (); + } + + this->CORBA_priority_ = + this->current_->the_priority (); + + CORBA::Boolean result = + this->priority_mapping_.to_native (this->CORBA_priority_, + this->native_priority_); + if (!result) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in converting CORBA priority %d to native priority\n", + this->CORBA_priority_), + -1); + + return + start_synchronization (this->test_.in (), + this->synchronizers_); +} + +void +Paced_Worker::missed_start_deadline (CORBA::ULong invocation) +{ + this->missed_start_invocations_[this->missed_start_deadlines_++] = + invocation; +} + +void +Paced_Worker::missed_end_deadline (CORBA::ULong invocation) +{ + if (count_missed_end_deadlines) + this->missed_end_invocations_[this->missed_end_deadlines_++] = + invocation; +} + +int +Paced_Worker::svc (void) +{ + try + { + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + //size_t terminate_count = this->history_.max_samples () / 2; + size_t terminate_count = 500; + int result = + this->setup (); + + if (result != 0) + return result; + + CORBA::ULong testing_start = 0; + CORBA::ULong testing_end = 0; + CORBA::ULong end_iteration = this->history_.max_samples () - 1; + + for (CORBA::ULong i = 0; + i != this->history_.max_samples (); + ++i) + { + ACE_hrtime_t deadline_for_current_call = + this->deadline_for_current_call (i); + + ACE_hrtime_t time_before_call = + ACE_OS::gethrtime (); + + if (time_before_call > deadline_for_current_call) + { + this->missed_start_deadline (i + 1); + continue; + } + + if (i == 0) + { + testing_start = 1; + } + else if (i == end_iteration) + { + testing_end = 1; + } + + if (i == terminate_count) + { + sample_vector[count++] = 0; + this->test_->method (testing_start, testing_end, work, + prime_number, 1); + } + else + { + this->test_->method (testing_start, testing_end, work, + prime_number, 0); + } + + ACE_hrtime_t time_after_call = + ACE_OS::gethrtime (); + this->history_.sample (time_after_call - time_before_call); + sample_vector[count++] = ((time_after_call - time_before_call) / gsf); + + if (time_after_call > deadline_for_current_call) + { + this->missed_end_deadline (i + 1); + continue; + } + + ACE_hrtime_t sleep_time = + deadline_for_current_call - time_after_call; + + ACE_OS::sleep (ACE_Time_Value (0, + long (to_seconds (sleep_time, gsf) * + ACE_ONE_SECOND_IN_USECS))); + } + + ACE_hrtime_t test_end = ACE_OS::gethrtime (); + + done = 1; + + end_synchronization (this->synchronizers_); + + this->print_stats (test_end); + + ACE_DEBUG ((LM_DEBUG, "******************************\n")); + + this->test_->dump (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + return 0; +} + +class Continuous_Worker : + public ACE_Task_Base +{ +public: + Continuous_Worker (ACE_Thread_Manager &thread_manager, + test_ptr test, + CORBA::ULong iterations, + RTCORBA::Current_ptr current, + RTCORBA::PriorityMapping &priority_mapping, + Synchronizers &synchronizers); + + int svc (void); + void print_stats (ACE_Sample_History &history, + ACE_hrtime_t test_end); + int setup (void); + void print_collective_stats (void); + + test_var test_; + CORBA::ULong iterations_; + RTCORBA::Current_var current_; + RTCORBA::PriorityMapping &priority_mapping_; + Synchronizers &synchronizers_; + CORBA::Short CORBA_priority_; + CORBA::Short native_priority_; + ACE_Basic_Stats collective_stats_; + ACE_hrtime_t time_for_test_; +}; + +Continuous_Worker::Continuous_Worker (ACE_Thread_Manager &thread_manager, + test_ptr test, + CORBA::ULong iterations, + RTCORBA::Current_ptr current, + RTCORBA::PriorityMapping &priority_mapping, + Synchronizers &synchronizers) + : ACE_Task_Base (&thread_manager), + test_ (test::_duplicate (test)), + iterations_ (iterations), + current_ (RTCORBA::Current::_duplicate (current)), + priority_mapping_ (priority_mapping), + synchronizers_ (synchronizers), + CORBA_priority_ (0), + native_priority_ (0), + collective_stats_ (), + time_for_test_ (0) +{ +} + +void +Continuous_Worker::print_stats (ACE_Sample_History &history, + ACE_hrtime_t test_end) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, + mon, + this->synchronizers_.worker_lock_); + + if (individual_continuous_worker_stats) + { + ACE_DEBUG ((LM_DEBUG, + "\n************ Statistics for thread %t ************\n\n")); + + ACE_DEBUG ((LM_DEBUG, + "Iterations = %d\n", + history.sample_count ())); + + if (do_dump_history) + { + history.dump_samples (history_file_name, gsf); + } + + ACE_Basic_Stats stats; + history.collect_basic_stats (stats); + stats.dump_results ("Total", gsf); + + ACE_Throughput_Stats::dump_throughput ("Total", gsf, + test_end - test_start, + stats.samples_count ()); + } + + history.collect_basic_stats (this->collective_stats_); + ACE_hrtime_t elapsed_time_for_current_thread = + test_end - test_start; + if (elapsed_time_for_current_thread > this->time_for_test_) + this->time_for_test_ = elapsed_time_for_current_thread; +} + +void +Continuous_Worker::print_collective_stats (void) +{ + if (continuous_workers > 0) + { + ACE_DEBUG ((LM_DEBUG, + "\n************ Statistics for continuous workers ************\n\n")); + + ACE_DEBUG ((LM_DEBUG, + "Priority = %d/%d; Collective iterations = %d; Workers = %d; Average = %d\n", + this->CORBA_priority_, + this->native_priority_, + this->collective_stats_.samples_count (), + continuous_workers, + this->collective_stats_.samples_count () / + continuous_workers)); + + this->collective_stats_.dump_results ("Collective", gsf); + + ACE_Throughput_Stats::dump_throughput ("Individual", gsf, + this->time_for_test_, + this->collective_stats_.samples_count () / + continuous_workers); + + ACE_Throughput_Stats::dump_throughput ("Collective", gsf, + this->time_for_test_, + this->collective_stats_.samples_count ()); + } +} + +int +Continuous_Worker::setup (void) +{ + if (priority_setting == AFTER_THREAD_CREATION) + { + this->current_->the_priority (continuous_worker_priority); + } + + this->CORBA_priority_ = + this->current_->the_priority (); + + CORBA::Boolean result = + this->priority_mapping_.to_native (this->CORBA_priority_, + this->native_priority_); + if (!result) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in converting CORBA priority %d to native priority\n", + this->CORBA_priority_), + -1); + + return + start_synchronization (this->test_.in (), + this->synchronizers_); +} + +int +Continuous_Worker::svc (void) +{ + try + { + ACE_Sample_History history (this->iterations_); + + int result = + this->setup (); + + if (result != 0) + return result; + + for (CORBA::ULong i = 0; + i != history.max_samples () && !done; + ++i) + { + ACE_hrtime_t start = ACE_OS::gethrtime (); + + this->test_->method (0, 0, work, + prime_number, 0); + + ACE_hrtime_t end = ACE_OS::gethrtime (); + history.sample (end - start); + } + + ACE_hrtime_t test_end = ACE_OS::gethrtime (); + + end_synchronization (this->synchronizers_); + + this->print_stats (history, + test_end); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + return 0; +} + +class Task : public ACE_Task_Base +{ +public: + + Task (ACE_Thread_Manager &thread_manager, + CORBA::ORB_ptr orb); + + int svc (void); + + CORBA::ORB_var orb_; + +}; + +Task::Task (ACE_Thread_Manager &thread_manager, + CORBA::ORB_ptr orb) + : ACE_Task_Base (&thread_manager), + orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +int +Task::svc (void) +{ + Synchronizers synchronizers; + + gsf = ACE_High_Res_Timer::global_scale_factor (); + + try + { + CORBA::Object_var object = + this->orb_->string_to_object (ior); + + test_var test = + test::_narrow (object.in ()); + + object = + this->orb_->resolve_initial_references ("RTCurrent"); + + RTCORBA::Current_var current = + RTCORBA::Current::_narrow (object.in ()); + + object = + this->orb_->resolve_initial_references ("PriorityMappingManager"); + + RTCORBA::PriorityMappingManager_var mapping_manager = + RTCORBA::PriorityMappingManager::_narrow (object.in ()); + + RTCORBA::PriorityMapping &priority_mapping = + *mapping_manager->mapping (); + + ULong_Array rates; + int result = + get_values ("client", + rates_file, + "rates", + rates, + 1); + if (result != 0) + return result; + + ULong_Array invocation_priorities; + result = + get_values ("client", + invocation_priorities_file, + "invocation priorities", + invocation_priorities, + 1); + if (result != 0) + return result; + + if (invocation_priorities.size () != 0 && + invocation_priorities.size () != rates.size ()) + ACE_ERROR_RETURN ((LM_ERROR, + "Number of invocation priorities (%d) != Number of rates (%d)\n", + invocation_priorities.size (), + rates.size ()), + -1); + + synchronizers.number_of_workers_ = + rates.size ();// + continuous_workers; + + CORBA::ULong max_rate = 0; + result = + max_throughput (test.in (), + current.in (), + priority_mapping, + max_rate); + if (result != 0) + return result; + + CORBA::Short priority_range = + RTCORBA::maxPriority - RTCORBA::minPriority; + + ACE_Thread_Manager paced_workers_manager; + + CORBA::ULong i = 0; + Paced_Worker **paced_workers = + new Paced_Worker *[rates.size ()]; + + for (i = 0; + i < rates.size (); + ++i) + { + CORBA::Short priority = 0; + + if (invocation_priorities.size () == 0) + priority = + CORBA::Short ((priority_range / + double (rates.size ())) * + (i + 1)); + else + priority = + invocation_priorities[i]; + + paced_workers[i] = + new Paced_Worker (paced_workers_manager, + test.in (), + rates[i], + time_for_test * rates[i], + priority, + current.in (), + priority_mapping, + synchronizers); + } + + CORBA::Short CORBA_priority; + CORBA::Short native_priority; + CORBA::Boolean convert_result; + int force_active = 0; + + long flags = + THR_NEW_LWP | + THR_JOINABLE | + this->orb_->orb_core ()->orb_params ()->thread_creation_flags (); + + for (i = 0; + i < rates.size (); + ++i) + { + if (priority_setting == AT_THREAD_CREATION) + { + if (set_priority) + { + CORBA_priority = + paced_workers[i]->priority_; + + convert_result = + priority_mapping.to_native (CORBA_priority, + native_priority); + if (!convert_result) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in converting CORBA priority %d to native priority\n", + CORBA_priority), + -1); + } + + result = + paced_workers[i]->activate (flags, + 1, + force_active, + native_priority); + if (result != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Paced_Worker::activate failed\n"), + result); + } + else + { + + result = + paced_workers[i]->activate (flags); + if (result != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Paced_Worker::activate failed\n"), + result); + } + } + + if (rates.size () != 0) + { + paced_workers_manager.wait (); + } + + for (i = 0; + i < rates.size (); + ++i) + { + delete paced_workers[i]; + } + delete[] paced_workers; + + if (shutdown_server) + { + test->shutdown (); + } + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + return 0; +} + +int +main (int argc, char *argv[]) +{ + try + { + + Agent_i *agent; + ACE_NEW_RETURN (agent, + Agent_i (), 1); + PortableServer::ServantBase_var owner_transfer (agent); + + // ****************************************************** + + // ****************************************************** + // register request interceptor + + + PortableInterceptor::ORBInitializer_ptr temp_initializer = + PortableInterceptor::ORBInitializer::_nil (); + + ACE_NEW_RETURN (temp_initializer, + Client_ORBInitializer (agent), + -1); // No exceptions yet! + PortableInterceptor::ORBInitializer_var orb_initializer = + temp_initializer; + + PortableInterceptor::register_orb_initializer (orb_initializer.in ()); + + + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv); + + int result = + parse_args (argc, argv); + if (result != 0) + return result; + + // Make sure we can support multiple priorities that are required + // for this test. + check_supported_priorities (orb.in ()); + + // Thread Manager for managing task. + ACE_Thread_Manager thread_manager; + + ACE_Barrier thread_barrier (2); + + Agent_Thread agent_thread (orb.in (), agent, &thread_barrier); + + // Create task. + Task task (thread_manager, + orb.in ()); + + // Task activation flags. + long flags = + THR_NEW_LWP | + THR_JOINABLE | + orb->orb_core ()->orb_params ()->thread_creation_flags (); + + if (agent_thread.activate (flags, 1, 0, 0) != 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot activate agent thread\n"), -1); + } + + // Activate task. + result = + task.activate (flags); + + ACE_ASSERT (result != -1); + ACE_UNUSED_ARG (result); + + // Wait for task to exit. + result = + thread_manager.wait (); + ACE_ASSERT (result != -1); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return -1; + } + + return 0; +} |