diff options
Diffstat (limited to 'CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/LatencyTT_Test_Sender_exec.cpp')
-rw-r--r-- | CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/LatencyTT_Test_Sender_exec.cpp | 691 |
1 files changed, 0 insertions, 691 deletions
diff --git a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/LatencyTT_Test_Sender_exec.cpp b/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/LatencyTT_Test_Sender_exec.cpp deleted file mode 100644 index 41c9267aef7..00000000000 --- a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/LatencyTT_Test_Sender_exec.cpp +++ /dev/null @@ -1,691 +0,0 @@ -// -*- C++ -*- -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.dre.vanderbilt.edu/~schmidt/TAO.html - **/ - -#include "LatencyTT_Test_Sender_exec.h" -#include "tao/ORB_Core.h" -#include "ace/Reactor.h" -#include "ace/OS_NS_math.h" - -#include "ace/Timer_Queue.h" -#include "ace/High_Res_Timer.h" -#include "dds4ccm/impl/dds4ccm_conf.h" - -namespace CIAO_LatencyTT_Test_Sender_Impl -{ - static int compare_two_longs (const void *long1, const void *long2) - { - return (int)(*(ACE_UINT64*)long1 - *(ACE_UINT64*)long2); - } - - /** - * WriteTicker - */ - - WriteTicker::WriteTicker (Sender_exec_i &callback) - : callback_ (callback) - { - } - - int - WriteTicker::handle_timeout (const ACE_Time_Value &, const void *) - { - // Notify the subscribers - this->callback_.write_one (); - return 0; - } - - /** - * Facet Executor Implementation Class: ping_listen_data_listener_exec_i - */ - - ping_listen_data_listener_exec_i::ping_listen_data_listener_exec_i ( - ::LatencyTT_Test::CCM_Sender_Context_ptr ctx, - Sender_exec_i &callback) - : ciao_context_ ( - ::LatencyTT_Test::CCM_Sender_Context::_duplicate (ctx)) - , callback_ (callback) - { - } - - ping_listen_data_listener_exec_i::~ping_listen_data_listener_exec_i (void) - { - } - - // Operations from ::LatencyTT_Test::LatencyTTTestConn::Listener - - void - ping_listen_data_listener_exec_i::on_one_data (const ::LatencyTTTest & datum, - const ::CCM_DDS::ReadInfo & /* info */) - { - ACE_UINT64 receive_time = 0; - ACE_High_Res_Timer::gettimeofday_hr ().to_usec (receive_time); - this->callback_.read(const_cast<LatencyTTTest&> (datum), receive_time); - } - - void - ping_listen_data_listener_exec_i::on_many_data (const ::LatencyTTTestSeq & /* data */, - const ::CCM_DDS::ReadInfoSeq & /* infos */) - { - /* Your code here. */ - } - - /** - * Facet Executor Implementation Class: connector_status_exec_i - */ - - connector_status_exec_i::connector_status_exec_i ( - ::LatencyTT_Test::CCM_Sender_Context_ptr ctx, - Sender_exec_i &callback, - Atomic_Boolean &matched, - int number_of_subscribers, - Atomic_Long &unexpected_count) - : ciao_context_ ( - ::LatencyTT_Test::CCM_Sender_Context::_duplicate (ctx)) - , callback_ (callback) - , matched_ (matched) - , number_of_subscribers_ (number_of_subscribers) - , unexpected_count_(unexpected_count) - { - } - - connector_status_exec_i::~connector_status_exec_i (void) - { - } - - // Operations from ::CCM_DDS::ConnectorStatusListener - - void - connector_status_exec_i::on_inconsistent_topic (::DDS::Topic_ptr /* the_topic */, - const ::DDS::InconsistentTopicStatus & /* status */) - { - /* Your code here. */ - } - - void - connector_status_exec_i::on_requested_incompatible_qos (::DDS::DataReader_ptr /* the_reader */, - const ::DDS::RequestedIncompatibleQosStatus & /* status */) - { - /* Your code here. */ - } - - void - connector_status_exec_i::on_sample_rejected (::DDS::DataReader_ptr /* the_reader */, - const ::DDS::SampleRejectedStatus & /* status */) - { - /* Your code here. */ - } - - void - connector_status_exec_i::on_offered_deadline_missed (::DDS::DataWriter_ptr /* the_writer */, - const ::DDS::OfferedDeadlineMissedStatus & /* status */) - { - /* Your code here. */ - } - - void - connector_status_exec_i::on_offered_incompatible_qos (::DDS::DataWriter_ptr /* the_writer */, - const ::DDS::OfferedIncompatibleQosStatus & /* status */) - { - /* Your code here. */ - } - - void - connector_status_exec_i::on_unexpected_status (::DDS::Entity_ptr the_entity, - ::DDS::StatusKind status_kind) - { - ++this->unexpected_count_; - if (! ::CORBA::is_nil (the_entity) && - status_kind == DDS::PUBLICATION_MATCHED_STATUS) - { - ::DDS::PublicationMatchedStatus_var stat; - ::DDS::DataWriter_var wr = ::DDS::DataWriter::_narrow(the_entity); - if(::CORBA::is_nil(wr)) - { - throw ::CORBA::INTERNAL (); - } - ::DDS::ReturnCode_t retval = wr->get_publication_matched_status (stat.out ()); - if (retval == DDS::RETCODE_OK) - { - if (stat.in ().current_count >= this->number_of_subscribers_ && - !this->matched_.value()) - { - this->matched_ = true; - this->callback_.start(); - } - } - } - } - - /** - * Component Executor Implementation Class: Sender_exec_i - */ - - Sender_exec_i::Sender_exec_i (void) - : iterations_ (1000) - , sleep_ (10) - , number_of_sub_ (1) - - , datalen_ (100) - , datalen_idx_ (0) - , nr_of_runs_ (10) - , matched_ (false) - , tv_total_ (0L) - , tv_max_ (0L) - , tv_min_ (0L) - , count_ (0) - , number_of_msg_ (0) // Number of sent messages - , timer_ (false) - , received_ (false) - , seq_num_ (0) - , sigma_duration_squared_ (0) - , start_time_ (0) - , start_time_test_ (0) - , end_time_test_ (0) - , duration_times_ (0) - , datalen_range_ (0) - , _clock_overhead_ (0) - , unexpected_count_ (0) - { - ACE_NEW_THROW_EX (this->ticker_, - WriteTicker (*this), - ::CORBA::NO_MEMORY ()); - ACE_NEW_THROW_EX (this->datalen_range_, - ::CORBA::Short[this->nr_of_runs_], - ::CORBA::NO_MEMORY ()); - } - - Sender_exec_i::~Sender_exec_i (void) - { - delete this->ticker_; - delete [] this->duration_times_; - delete [] datalen_range_; - } - - // Supported operations and attributes. - ACE_Reactor* - Sender_exec_i::reactor (void) - { - ACE_Reactor* reactor = 0; - ::CORBA::Object_var ccm_object = - this->ciao_context_->get_CCM_object(); - if (! ::CORBA::is_nil (ccm_object.in ())) - { - ::CORBA::ORB_var orb = ccm_object->_get_orb (); - if (! ::CORBA::is_nil (orb.in ())) - { - reactor = orb->orb_core ()->reactor (); - } - } - if (reactor == 0) - { - throw ::CORBA::INTERNAL (); - } - return reactor; - } - - void - Sender_exec_i::write_one (void) - { - if (this->number_of_msg_ == 0 && this->datalen_idx_ == 0) - { - ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_test_); - } - // First message sent always, next messages only as previous sent message - // is received back. - if (this->number_of_msg_ == 0 || this->received_.value ()) - { - // All messages send, stop timer. - if (this->iterations_ != 0 && - this->number_of_msg_ >= this->iterations_) - { - if( this->datalen_idx_ >= (this->nr_of_runs_ - 1)) - { - this->stop (); - this->timer_ = false; - this->calc_results (); - ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->end_time_test_); - - } - else - { - this->calc_results (); - this->reset_results (); - ++this->datalen_idx_; - this->datalen_ = this->datalen_range_[this->datalen_idx_]; - this->test_topic_.data.length (this->datalen_); - } - } - else - { - try - { - this->test_topic_.seq_num = this->number_of_msg_; - - // Keep last sent seq_num, to control if message is sent back. - this->seq_num_ = this->number_of_msg_; - this->received_ = false; - ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_); - this->writer_->write_one (this->test_topic_, ::DDS::HANDLE_NIL); - } - catch (const CCM_DDS::InternalError& ) - { - ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") - ACE_TEXT ("while wrinting sample with sequence_number <%u>.\n"), - this->test_topic_.seq_num)); - } - ++this->number_of_msg_; - } - } - } - - void - Sender_exec_i::read (LatencyTTTest & an_instance, ACE_UINT64 receive_time) - { - if (an_instance.seq_num == this->seq_num_) - { - this->record_time (receive_time); - this->received_ = true; - } - } - - void - Sender_exec_i::reset_results() - { - delete [] this->duration_times_; - this->count_ = 0; - - ACE_NEW_THROW_EX (this->duration_times_, - ACE_UINT64[this->iterations_], - ::CORBA::NO_MEMORY ()); - - this->tv_total_ = 0L; - this->tv_max_ = 0L; - this->tv_min_ = 0L; - this->number_of_msg_ = 0; - this->received_ = false; - this->seq_num_ = 0; - this->sigma_duration_squared_ = 0; - } - - void - Sender_exec_i::calc_results (void) - { - // Sort all duration times. - qsort(this->duration_times_, - this->count_, - sizeof(ACE_UINT64), - compare_two_longs); - - // Show latency_50_percentile, latency_90_percentile, - // latency_99_percentile and latency_99.99_percentile. - // For example duration_times[per50] is the median i.e. 50% of the - // samples have a latency time <= duration_times[per50] - int const per50 = this->count_ / 2; - int const per90 = (int)(this->count_ * 0.90); - int const per99 = (int)(this->count_ * 0.990); - int const per9999 = (int)(this->count_ * 0.9999); - - double const avg = (double)(this->tv_total_ / this->count_); - // Calculate standard deviation. - double const roundtrip_time_std = sqrt( - (this->sigma_duration_squared_ / (double)this->count_) - - (avg * avg)); - - // Show values as float, in order to be comparable with RTI performance test. - if (this->count_ > 0) - { - if (this->datalen_idx_ == 0) - { - #if (CIAO_DDS4CCM_CONTEXT_SWITCH==1) - ACE_DEBUG ((LM_DEBUG, "\n\nYES, we're using a threadswitch between " - "DDS and CCM\n\n")); - #else - ACE_DEBUG ((LM_DEBUG, "\n\nNO, we're not using a threadswitch between " - "DDS and CCM\n\n")); - #endif - ACE_DEBUG ((LM_DEBUG, - "Collecting statistics on %d samples per message size.\n" - "This is the roundtrip time, *not* the one-way-latency\n" - "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%," - " max us\n" - "------,-------,-------,-------,-------,-------,-------,-------," - "-------\n" - "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n", - this->count_, - this->datalen_, - roundtrip_time_std, - avg, - (double)this->tv_min_, - (double)this->duration_times_[per50-1], - (double)this->duration_times_[per90-1], - (double)this->duration_times_[per99-1], - (double)this->duration_times_[per9999-1], - (double)this->tv_max_)); - } - else - { - ACE_DEBUG ((LM_DEBUG, - "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n", - this->datalen_, - roundtrip_time_std, - avg, - (double)this->tv_min_, - (double)this->duration_times_[per50-1], - (double)this->duration_times_[per90-1], - (double)this->duration_times_[per99-1], - (double)this->duration_times_[per9999-1], - (double)this->tv_max_)); - } - } - else - { - ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n " - "No samples received back.\n")); - } - } - - void - Sender_exec_i::start (void) - { - // This->sleep_ is in ms - unsigned int sec = this->sleep_ / 1000; - unsigned int usec = (this->sleep_ % 1000) * 1000; - ACE_DEBUG ((LM_DEBUG, "Sender_exec_i::start - " - "Start test with interval <%u.%u>\n", - sec, usec)); - if (this->reactor ()->schedule_timer( - this->ticker_, - 0, - ACE_Time_Value (5, 0), - ACE_Time_Value (sec, usec)) == -1) - { - ACE_ERROR ((LM_ERROR, ACE_TEXT ("Sender_exec_i::start : ") - ACE_TEXT ("Error scheduling timer"))); - } - this->timer_ = true; - } - - void - Sender_exec_i::stop (void) - { - if (this->timer_.value ()) - { - this->reactor ()->cancel_timer (this->ticker_); - } - } - - void - Sender_exec_i::record_time (ACE_UINT64 receive_time) - { - ACE_UINT64 interval = receive_time - this->start_time_; - ACE_UINT64 duration = interval - this->_clock_overhead_; - int i = ++this->count_; - this->duration_times_[i-1] = duration; - this->sigma_duration_squared_ += (double)duration * (double)duration; - this->tv_total_ += duration; - if (duration > this->tv_max_ || this->tv_max_ == 0L) - { - this->tv_max_ = duration; - } - if (duration < this->tv_min_ || this->tv_min_ == 0L) - { - this->tv_min_ = duration; - } - } - - void - Sender_exec_i::calculate_clock_overhead (void) - { - int num_of_loops_clock = 320; - ACE_UINT64 begin_time = 0; - ACE_UINT64 clock_roundtrip_time = 0; - ACE_High_Res_Timer::gettimeofday_hr ().to_usec (begin_time); - for (int i = 0; i < num_of_loops_clock; ++i) - { - ACE_High_Res_Timer::gettimeofday_hr ().to_usec (clock_roundtrip_time); - } - ACE_UINT64 total_time = clock_roundtrip_time - begin_time; - this->_clock_overhead_ = (long)(total_time / num_of_loops_clock); - } - - void - Sender_exec_i::init_values (void) - { - delete [] this->duration_times_; - ACE_NEW_THROW_EX (this->duration_times_, - ACE_UINT64[this->iterations_], - ::CORBA::NO_MEMORY ()); - int start = 16; - for (int i = 0; i < this->nr_of_runs_; i++) - { - this->datalen_range_[i] = start; - start = 2 * start; - } - - this->datalen_ = this->datalen_range_[0]; - - // make instances of Topic - this->test_topic_.seq_num = 0; - this->test_topic_.data.length (this->datalen_); - calculate_clock_overhead (); - } - - // Component attributes and port operations. - - ::LatencyTT_Test::LatencyTTTestConn::CCM_Listener_ptr - Sender_exec_i::get_ping_listen_data_listener (void) - { - if ( ::CORBA::is_nil (this->ciao_ping_listen_data_listener_.in ())) - { - ping_listen_data_listener_exec_i *tmp = 0; - ACE_NEW_RETURN ( - tmp, - ping_listen_data_listener_exec_i ( - this->ciao_context_.in (), - *this), - ::LatencyTT_Test::LatencyTTTestConn::CCM_Listener::_nil ()); - - this->ciao_ping_listen_data_listener_ = tmp; - } - - return - ::LatencyTT_Test::LatencyTTTestConn::CCM_Listener::_duplicate ( - this->ciao_ping_listen_data_listener_.in ()); - } - - ::CCM_DDS::CCM_PortStatusListener_ptr - Sender_exec_i::get_ping_listen_status (void) - { - return ::CCM_DDS::CCM_PortStatusListener::_nil (); - } - - ::CCM_DDS::CCM_ConnectorStatusListener_ptr - Sender_exec_i::get_connector_status (void) - { - if ( ::CORBA::is_nil (this->ciao_connector_status_.in ())) - { - connector_status_exec_i *tmp = 0; - ACE_NEW_RETURN ( - tmp, - connector_status_exec_i ( - this->ciao_context_.in (), - *this, - this->matched_, - this->number_of_sub_, - this->unexpected_count_), - ::CCM_DDS::CCM_ConnectorStatusListener::_nil ()); - - this->ciao_connector_status_ = tmp; - } - - return - ::CCM_DDS::CCM_ConnectorStatusListener::_duplicate ( - this->ciao_connector_status_.in ()); - } - - ::CORBA::ULong - Sender_exec_i::iterations (void) - { - return this->iterations_; - } - - void - Sender_exec_i::iterations ( - const ::CORBA::ULong iterations) - { - this->iterations_ = iterations; - } - - ::CORBA::UShort - Sender_exec_i::sleep (void) - { - return this->sleep_; - } - - void - Sender_exec_i::sleep ( - const ::CORBA::UShort sleep) - { - this->sleep_ = sleep; - } - - ::CORBA::UShort - Sender_exec_i::number_of_sub (void) - { - return this->number_of_sub_; - } - - void - Sender_exec_i::number_of_sub ( - const ::CORBA::UShort number_of_sub) - { - if (number_of_sub > 0) - { - this->number_of_sub_ = number_of_sub; - } - else - { - this->number_of_sub_ = 1; - } - } - - // Operations from Components::SessionComponent. - - void - Sender_exec_i::set_session_context ( - ::Components::SessionContext_ptr ctx) - { - this->ciao_context_ = - ::LatencyTT_Test::CCM_Sender_Context::_narrow (ctx); - - if ( ::CORBA::is_nil (this->ciao_context_.in ())) - { - throw ::CORBA::INTERNAL (); - } - } - - void - Sender_exec_i::configuration_complete (void) - { - /* Your code here. */ - } - - void - Sender_exec_i::ccm_activate (void) - { - try - { - this->writer_ = this->ciao_context_->get_connection_info_write_data (); - ::CCM_DDS::DataListenerControl_var dlc = - this->ciao_context_->get_connection_ping_listen_data_control (); - dlc->mode (::CCM_DDS::ONE_BY_ONE); - } - catch (const CORBA::Exception& ex) - { - ex._tao_print_exception ("Exception caught:"); - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Exception caught\n"))); - } - catch (...) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Unknown exception caught\n"))); - } - this->init_values(); - } - - void - Sender_exec_i::ccm_passivate (void) - { - this->stop (); - } - - void - Sender_exec_i::ccm_remove (void) - { - if (this->nr_of_runs_ -1 != this->datalen_idx_) - { - if (this->datalen_idx_ == 0) - { - ACE_ERROR ((LM_ERROR, "ERROR SENDER: No run has taken place.\n")); - } - else - { - ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER : %u of %u runs completed.\n" - " Number of messages sent of last run (%u): %u\n", - this->datalen_idx_, - this->nr_of_runs_, - this->datalen_idx_ + 1, - this->number_of_msg_)); - } - } - else - { - ACE_UINT64 test_time_usec = this->end_time_test_ - - this->start_time_test_; - - double sec = (double)test_time_usec / (1000 * 1000); - ACE_DEBUG ((LM_DEBUG, "TEST successful, number of runs (%u) of " - "%u messages in %3.3f seconds.\n", - this->nr_of_runs_, - this->number_of_msg_, sec)); - } - ACE_DEBUG ((LM_DEBUG, "\tNumber of unexpected events : %u\n", - this->unexpected_count_.value ())); - } - - extern "C" SENDER_EXEC_Export ::Components::EnterpriseComponent_ptr - create_LatencyTT_Test_Sender_Impl (void) - { - ::Components::EnterpriseComponent_ptr retval = - ::Components::EnterpriseComponent::_nil (); - - ACE_NEW_NORETURN ( - retval, - Sender_exec_i); - - return retval; - } -} |