diff options
author | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-03-04 23:15:20 +0000 |
---|---|---|
committer | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-03-04 23:15:20 +0000 |
commit | bf6f160aca86a102694a47dc6f490b90158e4fc8 (patch) | |
tree | 3675bf549dc0cf7649bc81f934a25376f72bfa98 /TAO/CIAO/performance-tests/Protocols/Sender/Sender_exec.cpp | |
parent | 55b9df75cdae298ca921cff58503782fd67e075b (diff) | |
download | ATCD-bf6f160aca86a102694a47dc6f490b90158e4fc8.tar.gz |
This commit was manufactured by cvs2svn to create branch
'xsc_handler_1'.
Diffstat (limited to 'TAO/CIAO/performance-tests/Protocols/Sender/Sender_exec.cpp')
-rw-r--r-- | TAO/CIAO/performance-tests/Protocols/Sender/Sender_exec.cpp | 716 |
1 files changed, 716 insertions, 0 deletions
diff --git a/TAO/CIAO/performance-tests/Protocols/Sender/Sender_exec.cpp b/TAO/CIAO/performance-tests/Protocols/Sender/Sender_exec.cpp new file mode 100644 index 00000000000..95e8800fa83 --- /dev/null +++ b/TAO/CIAO/performance-tests/Protocols/Sender/Sender_exec.cpp @@ -0,0 +1,716 @@ +// $Id$ + +#include "tao/ORB_Constants.h" +#include "tao/debug.h" +#include "tao/RTCORBA/RTCORBA.h" +#include "tao/RTCORBA/Network_Priority_Mapping_Manager.h" +#include "tao/RTCORBA/Network_Priority_Mapping.h" +#include "tao/RTCORBA/RT_Policy_i.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_time.h" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" +#include "ace/Array.h" +#include "ace/Sample_History.h" +#include "Custom_Network_Priority_Mapping.h" +#include "Custom_Network_Priority_Mapping.cpp" +#include "Sender_exec.h" + +static ACE_UINT32 gsf = 0; + +double +to_seconds (ACE_UINT64 hrtime, + ACE_UINT32 sf) +{ + double seconds = +#if defined ACE_LACKS_LONGLONG_T + hrtime / sf; +#else /* ! ACE_LACKS_LONGLONG_T */ + static_cast<double> (ACE_UINT64_DBLCAST_ADAPTER (hrtime / sf)); +#endif /* ! ACE_LACKS_LONGLONG_T */ + seconds /= ACE_HR_SCALE_CONVERSION; + + return seconds; +} + +ACE_UINT64 +to_hrtime (double seconds, + ACE_UINT32 sf) +{ + return ACE_UINT64 (seconds * sf * ACE_HR_SCALE_CONVERSION); +} + +class Worker +{ +public: + Worker (CORBA::ORB_ptr orb, + RTCORBA::RTORB_ptr rtorb, + CORBA::PolicyManager_ptr policy_manager, + Protocols::test_ptr test, + ::CORBA::ULong iterations, + ::CORBA::ULong invocation_rate, + ::CORBA::Boolean count_missed_end_deadlines, + ::CORBA::Boolean do_dump_history, + ::CORBA::Boolean print_missed_invocations, + ::CORBA::ULong message_size, + ::CORBA::ULong test_protocol_tag, + ::CORBA::ULong number_of_connection_attempts, + ::CORBA::Boolean enable_diffserv_code_points, + ::Protocols::Sender_Controller::Test_Type test_type); + + void run (void); + + void print_stats (void); + + void setup (void); + +private: + + ACE_hrtime_t deadline_for_current_call (CORBA::ULong i); + void missed_start_deadline (CORBA::ULong invocation); + void missed_end_deadline (CORBA::ULong invocation); + + RTCORBA::RTORB_var rtorb_; + CORBA::PolicyManager_var policy_manager_; + Protocols::test_var test_; + ACE_Sample_History history_; + ACE_hrtime_t interval_between_calls_; + ACE_hrtime_t test_start_; + ACE_hrtime_t test_end_; + CORBA::ULong missed_start_deadlines_; + CORBA::ULong missed_end_deadlines_; + + typedef ACE_Array_Base<CORBA::ULong> Missed_Invocations; + Missed_Invocations missed_start_invocations_; + Missed_Invocations missed_end_invocations_; + + CORBA::PolicyList base_protocol_policy_; + CORBA::PolicyList test_protocol_policy_; + + CORBA::Long session_id_; + + ::CORBA::ULong iterations_; + ::CORBA::ULong invocation_rate_; + ::CORBA::Boolean count_missed_end_deadlines_; + ::CORBA::Boolean do_dump_history_; + ::CORBA::Boolean print_missed_invocations_; + ::CORBA::ULong message_size_; + ::CORBA::ULong test_protocol_tag_; + ::CORBA::ULong number_of_connection_attempts_; + ::CORBA::Boolean enable_diffserv_code_points_; + ::Protocols::Sender_Controller::Test_Type test_type_; +}; + +Worker::Worker (CORBA::ORB_ptr orb, + RTCORBA::RTORB_ptr rtorb, + CORBA::PolicyManager_ptr policy_manager, + Protocols::test_ptr test, + ::CORBA::ULong iterations, + ::CORBA::ULong invocation_rate, + ::CORBA::Boolean count_missed_end_deadlines, + ::CORBA::Boolean do_dump_history, + ::CORBA::Boolean print_missed_invocations, + ::CORBA::ULong message_size, + ::CORBA::ULong test_protocol_tag, + ::CORBA::ULong number_of_connection_attempts, + ::CORBA::Boolean enable_diffserv_code_points, + ::Protocols::Sender_Controller::Test_Type test_type) + : rtorb_ (RTCORBA::RTORB::_duplicate (rtorb)), + policy_manager_ (CORBA::PolicyManager::_duplicate (policy_manager)), + test_ (Protocols::test::_duplicate (test)), + history_ (iterations), + interval_between_calls_ (), + missed_start_deadlines_ (0), + missed_end_deadlines_ (0), + missed_start_invocations_ (iterations), + missed_end_invocations_ (iterations), + iterations_ (iterations), + invocation_rate_ (invocation_rate), + count_missed_end_deadlines_ (count_missed_end_deadlines), + do_dump_history_ (do_dump_history), + print_missed_invocations_ (print_missed_invocations), + message_size_ (message_size), + test_protocol_tag_ (test_protocol_tag), + number_of_connection_attempts_ (number_of_connection_attempts), + enable_diffserv_code_points_ (enable_diffserv_code_points), + test_type_ (test_type) +{ + // Each sender will have a random session id. This helps in + // identifying late packets arriving at the server. + ACE_OS::srand ((unsigned) ACE_OS::time (NULL)); + this->session_id_ = ACE_OS::rand (); + + // Interval is inverse of rate. + this->interval_between_calls_ = + to_hrtime (1 / double (this->invocation_rate_), gsf); + + // Base protocol is used for setting up and tearing down the test. + this->base_protocol_policy_.length (1); + + // Test protocol is the one being tested. + this->test_protocol_policy_.length (1); + + RTCORBA::ProtocolProperties_var base_transport_protocol_properties = + TAO_Protocol_Properties_Factory::create_transport_protocol_property (IOP::TAG_INTERNET_IOP, + orb->orb_core ()); + + RTCORBA::TCPProtocolProperties_var tcp_base_transport_protocol_properties = + RTCORBA::TCPProtocolProperties::_narrow (base_transport_protocol_properties.in ()); + + tcp_base_transport_protocol_properties->enable_network_priority (this->enable_diffserv_code_points_); + + RTCORBA::ProtocolList protocols; + protocols.length (1); + protocols[0].transport_protocol_properties = + base_transport_protocol_properties; + protocols[0].orb_protocol_properties = + RTCORBA::ProtocolProperties::_nil (); + + // IIOP is always used for the base protocol. + protocols[0].protocol_type = IOP::TAG_INTERNET_IOP; + + this->base_protocol_policy_[0] = + this->rtorb_->create_client_protocol_policy (protocols); + + // User decides the test protocol. + protocols[0].protocol_type = test_protocol_tag; + + RTCORBA::ProtocolProperties_var test_transport_protocol_properties = + TAO_Protocol_Properties_Factory::create_transport_protocol_property (protocols[0].protocol_type, + orb->orb_core ()); + + if (protocols[0].protocol_type == TAO_TAG_DIOP_PROFILE) + { + RTCORBA::UserDatagramProtocolProperties_var udp_test_transport_protocol_properties = + RTCORBA::UserDatagramProtocolProperties::_narrow (test_transport_protocol_properties.in ()); + + udp_test_transport_protocol_properties->enable_network_priority (enable_diffserv_code_points); + } + else if (protocols[0].protocol_type == TAO_TAG_SCIOP_PROFILE) + { + RTCORBA::StreamControlProtocolProperties_var sctp_test_transport_protocol_properties = + RTCORBA::StreamControlProtocolProperties::_narrow (test_transport_protocol_properties.in ()); + + sctp_test_transport_protocol_properties->enable_network_priority (enable_diffserv_code_points); + } + else if (protocols[0].protocol_type == IOP::TAG_INTERNET_IOP) + { + RTCORBA::TCPProtocolProperties_var tcp_test_transport_protocol_properties = + RTCORBA::TCPProtocolProperties::_narrow (test_transport_protocol_properties.in ()); + + tcp_test_transport_protocol_properties->enable_network_priority (enable_diffserv_code_points); + } + + protocols[0].transport_protocol_properties = + test_transport_protocol_properties; + + this->test_protocol_policy_[0] = + this->rtorb_->create_client_protocol_policy (protocols); +} + +void +Worker::print_stats (void) +{ + CORBA::ULong missed_total_deadlines = + this->missed_start_deadlines_ + this->missed_end_deadlines_; + + CORBA::ULong made_total_deadlines = + this->iterations_ - missed_total_deadlines; + + ACE_DEBUG ((LM_DEBUG, + "\n************ Statistics ************\n\n")); + + // + // Senders-side stats for PACED invocations are not too relevant + // since we are doing one way calls. + // + if (this->test_type_ == ::Protocols::Sender_Controller::PACED) + { + ACE_DEBUG ((LM_DEBUG, + "Rate = %d/sec; Iterations = %d; ", + this->invocation_rate_, + this->iterations_)); + + if (this->count_missed_end_deadlines_) + ACE_DEBUG ((LM_DEBUG, + "Deadlines made/missed[start,end]/%% = %d/%d[%d,%d]/%.2f%%; Effective Rate = %.2f\n", + made_total_deadlines, + missed_total_deadlines, + this->missed_start_deadlines_, + this->missed_end_deadlines_, + made_total_deadlines * 100 / (double) this->iterations_, + made_total_deadlines / to_seconds (this->test_end_ - this->test_start_, gsf))); + else + ACE_DEBUG ((LM_DEBUG, + "Deadlines made/missed/%% = %d/%d/%.2f%%; Effective Rate = %.2f\n", + made_total_deadlines, + missed_total_deadlines, + made_total_deadlines * 100 / (double) this->iterations_, + made_total_deadlines / to_seconds (this->test_end_ - this->test_start_, gsf))); + + if (this->print_missed_invocations_) + { + ACE_DEBUG ((LM_DEBUG, "\nMissed start invocations are:\n")); + + for (CORBA::ULong j = 0; + j < this->missed_start_deadlines_; + ++j) + { + ACE_DEBUG ((LM_DEBUG, + "%d ", + this->missed_start_invocations_[j])); + } + + ACE_DEBUG ((LM_DEBUG, "\n")); + + if (this->count_missed_end_deadlines_) + { + ACE_DEBUG ((LM_DEBUG, "\nMissed end invocations are:\n")); + + for (CORBA::ULong j = 0; + j < this->missed_end_deadlines_; + ++j) + { + ACE_DEBUG ((LM_DEBUG, + "%d ", + this->missed_end_invocations_[j])); + } + + ACE_DEBUG ((LM_DEBUG, "\n")); + } + } + } + + // Individual calls are relevant for the PACED and LATENCY tests. + if (this->test_type_ == ::Protocols::Sender_Controller::PACED || + this->test_type_ == ::Protocols::Sender_Controller::LATENCY) + { + if (this->do_dump_history_) + { + this->history_.dump_samples ("HISTORY", gsf); + } + + ACE_Basic_Stats stats; + this->history_.collect_basic_stats (stats); + stats.dump_results ("Total", gsf); + + ACE_Throughput_Stats::dump_throughput ("Total", gsf, + this->test_end_ - this->test_start_, + this->iterations_); + } + else + { + ACE_hrtime_t elapsed_time = + this->test_end_ - this->test_start_; + + double seconds = + to_seconds (elapsed_time, gsf); + + ACE_hrtime_t bits = this->iterations_; + bits *= this->message_size_ * 8; + + ACE_DEBUG ((LM_DEBUG, + "%Q bits sent in %5.1f seconds at a rate of %5.2f Mbps\n", + bits, + seconds, + bits / seconds / 1000 / 1000)); + } +} + +ACE_hrtime_t +Worker::deadline_for_current_call (CORBA::ULong i) +{ + ACE_hrtime_t deadline_for_current_call = + this->interval_between_calls_; + + deadline_for_current_call *= (i + 1); + + deadline_for_current_call += this->test_start_; + + return deadline_for_current_call; +} + +void +Worker::missed_start_deadline (CORBA::ULong invocation) +{ + this->missed_start_invocations_[this->missed_start_deadlines_++] = + invocation; +} + +void +Worker::missed_end_deadline (CORBA::ULong invocation) +{ + if (this->count_missed_end_deadlines_) + this->missed_end_invocations_[this->missed_end_deadlines_++] = + invocation; +} + +void +Worker::setup (void) +{ + // Make sure we have a connection to the server using the test + // protocol. + this->policy_manager_->set_policy_overrides (this->test_protocol_policy_, + CORBA::SET_OVERRIDE); + + // Since the network maybe unavailable temporarily, make sure to try + // for a few times before giving up. + for (CORBA::ULong j = 0;;) + { + try + { + // Send a message to ensure that the connection is setup. + this->test_->oneway_sync (); + + break; + } + catch (CORBA::TRANSIENT &) + { + ++j; + + if (j < this->number_of_connection_attempts_) + { + ACE_OS::sleep (1); + + continue; + } + } + + ACE_ERROR ((LM_ERROR, + "Cannot setup test protocol\n")); + + ACE_OS::exit (-1); + } + + const char *test_protocol = 0; + if (this->test_protocol_tag_ == IOP::TAG_INTERNET_IOP) + test_protocol = "IIOP"; + else if (this->test_protocol_tag_ == TAO_TAG_DIOP_PROFILE) + test_protocol = "DIOP"; + else if (this->test_protocol_tag_ == TAO_TAG_SCIOP_PROFILE) + test_protocol = "SCIOP"; + + // Use IIOP for setting up the test since the test protocol maybe + // unreliable. + this->policy_manager_->set_policy_overrides (this->base_protocol_policy_, + CORBA::SET_OVERRIDE); + + // Since the network maybe unavailable temporarily, make sure to try + // for a few times before giving up. + for (CORBA::ULong k = 0;;) + { + try + { + // Let the server know what to expect.. + this->test_->start_test (this->session_id_, + test_protocol, + this->invocation_rate_, + this->message_size_, + this->iterations_); + + break; + } + catch (CORBA::TRANSIENT &) + { + ACE_OS::sleep (1); + + if (k < this->number_of_connection_attempts_) + { + ACE_OS::sleep (1); + + continue; + } + } + + ACE_ERROR ((LM_ERROR, + "Cannot setup base protocol\n")); + + ACE_OS::exit (-1); + } + + return; +} + +void +Worker::run (void) +{ + // Select the test protocol for these invocation. + this->policy_manager_->set_policy_overrides (this->test_protocol_policy_, + CORBA::SET_OVERRIDE); + + // Payload. + ::Protocols::test::octets_var payload (new ::Protocols::test::octets); + payload->length (this->message_size_); + + CORBA::Octet *buffer = + payload->get_buffer (); + + // Not necessary but good for debugging. + ACE_OS::memset (buffer, + 1, + this->message_size_ * sizeof (CORBA::Octet)); + + // Record the start time of the test. + this->test_start_ = + ACE_OS::gethrtime (); + + // Test with several iterations. + for (CORBA::ULong i = 0; + i < this->iterations_; + ++i) + { + ACE_hrtime_t time_before_call = 0; + ACE_hrtime_t deadline_for_current_call = 0; + + // For PACED and LATENCY, each sender call is individually + // noted. + if (this->test_type_ == ::Protocols::Sender_Controller::PACED || + this->test_type_ == ::Protocols::Sender_Controller::LATENCY) + { + time_before_call = + ACE_OS::gethrtime (); + + // Pacing code. + if (this->test_type_ == ::Protocols::Sender_Controller::PACED) + { + deadline_for_current_call = + this->deadline_for_current_call (i); + + if (time_before_call > deadline_for_current_call) + { + this->missed_start_deadline (i); + continue; + } + } + } + + // Use oneways for PACING and THROUGHPUT. + if (this->test_type_ == ::Protocols::Sender_Controller::PACED || + this->test_type_ == ::Protocols::Sender_Controller::THROUGHPUT) + { + this->test_->oneway_method (this->session_id_, + i, + payload.in ()); + } + else + { + // Use twoway calls for LATENCY. + this->test_->twoway_method (this->session_id_, + i, + payload.inout ()); + } + + // For PACED and LATENCY, each sender call is individually + // noted. + if (this->test_type_ == ::Protocols::Sender_Controller::PACED || + this->test_type_ == ::Protocols::Sender_Controller::LATENCY) + { + ACE_hrtime_t time_after_call = + ACE_OS::gethrtime (); + + if (this->test_type_ == ::Protocols::Sender_Controller::LATENCY) + this->history_.sample ((time_after_call - time_before_call) / 2); + else + this->history_.sample (time_after_call - time_before_call); + + if (this->test_type_ == ::Protocols::Sender_Controller::PACED) + { + if (time_after_call > deadline_for_current_call) + { + this->missed_end_deadline (i); + continue; + } + + ACE_hrtime_t sleep_time = + deadline_for_current_call - time_after_call; + + ACE_OS::sleep (ACE_Time_Value (0, + long (to_seconds (sleep_time, gsf) * + ACE_ONE_SECOND_IN_USECS))); + } + } + } + + // This call is used to ensure that all the THROUGHPUT related data + // has reached the server. + if (this->test_type_ == ::Protocols::Sender_Controller::THROUGHPUT && + this->test_protocol_tag_ != TAO_TAG_DIOP_PROFILE) + { + this->test_->twoway_sync (); + } + + // Record end time for the test. + this->test_end_ = ACE_OS::gethrtime (); + + // Use IIOP to indicate end of test to server. + this->policy_manager_->set_policy_overrides (this->base_protocol_policy_, + CORBA::SET_OVERRIDE); + + // Tell server that the test is over. + this->test_->end_test (); +} + +SenderImpl::SenderExec_i::SenderExec_i (void) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::SenderExec_i\n")); +} + +void +SenderImpl::SenderExec_i::start (::CORBA::ULong iterations, + ::CORBA::ULong invocation_rate, + ::CORBA::Boolean count_missed_end_deadlines, + ::CORBA::Boolean do_dump_history, + ::CORBA::Boolean print_missed_invocations, + ::CORBA::ULong message_size, + ::CORBA::ULong test_protocol_tag, + ::CORBA::Boolean print_statistics, + ::CORBA::ULong number_of_connection_attempts, + ::CORBA::Boolean enable_diffserv_code_points, + ::CORBA::Short priority, + ::Protocols::Sender_Controller::Test_Type test_type) + throw (CORBA::SystemException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::start\n")); + + gsf = ACE_High_Res_Timer::global_scale_factor (); + + int argc = 0; + char **argv = 0; + + this->orb_ = + CORBA::ORB_init (argc, + argv, + ""); + + CORBA::Object_var object = + this->orb_->resolve_initial_references ("RTORB"); + + RTCORBA::RTORB_var rtorb = + RTCORBA::RTORB::_narrow (object.in ()); + + object = + this->orb_->resolve_initial_references ("ORBPolicyManager"); + + CORBA::PolicyManager_var policy_manager = + CORBA::PolicyManager::_narrow (object.in ()); + + object = + this->orb_->resolve_initial_references ("NetworkPriorityMappingManager"); + + RTCORBA::NetworkPriorityMappingManager_var mapping_manager = + RTCORBA::NetworkPriorityMappingManager::_narrow (object.in ()); + + Custom_Network_Priority_Mapping *custom_network_priority_mapping = + new Custom_Network_Priority_Mapping; + + // Set the desired corba priority on the network mapping manager + custom_network_priority_mapping->corba_priority (priority); + + mapping_manager->mapping (custom_network_priority_mapping); + + Protocols::test_var test = + this->context_->get_connection_reader (); + + Worker worker (this->orb_.in (), + rtorb.in (), + policy_manager.in (), + test.in (), + iterations, + invocation_rate, + count_missed_end_deadlines, + do_dump_history, + print_missed_invocations, + message_size, + test_protocol_tag, + number_of_connection_attempts, + enable_diffserv_code_points, + test_type); + + worker.setup (); + + worker.run (); + + if (print_statistics) + worker.print_stats (); +} + +void +SenderImpl::SenderExec_i::shutdown (void) + throw (CORBA::SystemException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::shutdown\n")); + this->orb_->shutdown (); +} + +void +SenderImpl::SenderExec_i::set_session_context (Components::SessionContext_ptr ctx) + throw (CORBA::SystemException, + Components::CCMException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::set_session_context\n")); + + this->context_ = + SenderImpl::SenderExec_Context::_narrow (ctx); + + if (CORBA::is_nil (this->context_.in ())) + throw CORBA::INTERNAL (); +} + +void +SenderImpl::SenderExec_i::ccm_activate (void) + throw (CORBA::SystemException, + Components::CCMException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::ccm_activate\n")); +} + +void +SenderImpl::SenderExec_i::ccm_passivate (void) + throw (CORBA::SystemException, + Components::CCMException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::ccm_passivate\n")); +} + +void +SenderImpl::SenderExec_i::ccm_remove (void) + throw (CORBA::SystemException, + Components::CCMException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::ccm_remove\n")); +} + + +void +SenderImpl::SenderExec_i::ciao_preactivate (void) + throw (CORBA::SystemException, + Components::CCMException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::ccm_preactivate\n")); +} + +void +SenderImpl::SenderExec_i::ciao_postactivate (void) + throw (CORBA::SystemException, + Components::CCMException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderExec_i::ccm_postactivate\n")); +} + +::Components::EnterpriseComponent_ptr +SenderImpl::SenderHomeExec_i::create (void) + throw (CORBA::SystemException, + Components::CCMException) +{ + ACE_DEBUG ((LM_DEBUG, "SenderImpl::SenderHome_exec::create\n")); + return new SenderImpl::SenderExec_i; +} + +extern "C" SENDER_EXEC_Export ::Components::HomeExecutorBase_ptr +createSenderHome_Impl (void) +{ + ACE_DEBUG ((LM_DEBUG, "createSenderHome_Impl\n")); + return new SenderImpl::SenderHomeExec_i; +} |