summaryrefslogtreecommitdiff
path: root/modules/CIAO/connectors/dds4ccm/performance-tests/LatencyTT/Sender/LatencyTT_Test_Sender_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/performance-tests/LatencyTT/Sender/LatencyTT_Test_Sender_exec.cpp')
-rw-r--r--modules/CIAO/connectors/dds4ccm/performance-tests/LatencyTT/Sender/LatencyTT_Test_Sender_exec.cpp562
1 files changed, 562 insertions, 0 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/performance-tests/LatencyTT/Sender/LatencyTT_Test_Sender_exec.cpp b/modules/CIAO/connectors/dds4ccm/performance-tests/LatencyTT/Sender/LatencyTT_Test_Sender_exec.cpp
new file mode 100644
index 00000000000..780657de268
--- /dev/null
+++ b/modules/CIAO/connectors/dds4ccm/performance-tests/LatencyTT/Sender/LatencyTT_Test_Sender_exec.cpp
@@ -0,0 +1,562 @@
+// -*- C++ -*-
+// $Id$
+
+#include "LatencyTT_Test_Sender_exec.h"
+#include "ciao/Logger/Log_Macros.h"
+#include "tao/ORB_Core.h"
+#include "ace/Timer_Queue.h"
+#include "ace/Reactor.h"
+#include "ace/High_Res_Timer.h"
+
+namespace CIAO_LatencyTT_Test_Sender_Impl
+{
+
+ //============================================================
+ // LatencyTTTest_Listener_exec_i
+ //============================================================
+ LatencyTTTest_Listener_exec_i::LatencyTTTest_Listener_exec_i (
+ Sender_exec_i &callback)
+ : callback_ (callback)
+ {
+ }
+
+ LatencyTTTest_Listener_exec_i::~LatencyTTTest_Listener_exec_i (void)
+ {
+ }
+
+ void
+ LatencyTTTest_Listener_exec_i::on_one_data (
+ const LatencyTTTest & an_instance,
+ const ::CCM_DDS::ReadInfo &)
+ {
+ ACE_UINT64 receive_time = 0;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec ( receive_time);
+ this->callback_.read(const_cast<LatencyTTTest&> (an_instance), receive_time);
+ }
+
+ void
+ LatencyTTTest_Listener_exec_i::on_many_data (
+ const LatencyTTTestSeq & /*an_instance*/,
+ const ::CCM_DDS::ReadInfoSeq & /*info*/)
+ {
+ }
+ //============================================================
+ // Facet Executor Implementation Class: ConnectorStatusListener_exec_i
+ //============================================================
+ ConnectorStatusListener_exec_i::ConnectorStatusListener_exec_i (
+ Atomic_Boolean &matched,
+ int number_of_subscribers,
+ Sender_exec_i &callback)
+ : callback_ (callback),
+ matched_ (matched),
+ number_of_subscribers_ (number_of_subscribers)
+ {
+ }
+
+ ConnectorStatusListener_exec_i::~ConnectorStatusListener_exec_i (void)
+ {
+ }
+
+ // Operations from ::CCM_DDS::ConnectorStatusListener
+ void ConnectorStatusListener_exec_i::on_inconsistent_topic(
+ ::DDS::Topic_ptr /*the_topic*/,
+ const DDS::InconsistentTopicStatus & /*status*/)
+ {
+ }
+
+ void ConnectorStatusListener_exec_i::on_requested_incompatible_qos(
+ ::DDS::DataReader_ptr /*the_reader*/,
+ const DDS::RequestedIncompatibleQosStatus & /*status*/)
+ {
+ }
+
+ void ConnectorStatusListener_exec_i::on_sample_rejected(
+ ::DDS::DataReader_ptr /*the_reader*/,
+ const DDS::SampleRejectedStatus & /*status*/)
+ {
+ }
+
+ void ConnectorStatusListener_exec_i::on_offered_deadline_missed(
+ ::DDS::DataWriter_ptr /*the_writer*/,
+ const DDS::OfferedDeadlineMissedStatus & /*status*/)
+ {
+ }
+
+ void ConnectorStatusListener_exec_i::on_offered_incompatible_qos(
+ ::DDS::DataWriter_ptr /*the_writer*/,
+ const DDS::OfferedIncompatibleQosStatus & /*status*/)
+ {
+ }
+
+ void ConnectorStatusListener_exec_i::on_unexpected_status(
+ ::DDS::Entity_ptr the_entity,
+ ::DDS::StatusKind status_kind)
+ {
+ if (! ::CORBA::is_nil (the_entity) &&
+ status_kind == DDS::PUBLICATION_MATCHED_STATUS)
+ {
+ ::DDS::PublicationMatchedStatus_var stat;
+ DDS::DataWriter_var wr = ::DDS::DataWriter::_narrow(the_entity);
+ if(::CORBA::is_nil(wr))
+ {
+ throw ::CORBA::INTERNAL ();
+ }
+ ::DDS::ReturnCode_t retval = wr->get_publication_matched_status(stat.out ());
+ if (retval == DDS::RETCODE_OK)
+ {
+ if (stat.in().current_count >= this->number_of_subscribers_ &&
+ !this->matched_.value())
+ {
+ this->matched_ = true;
+ this->callback_.start();
+ }
+ }
+ }
+ }
+ //============================================================
+ // WriteTickerHandler
+ //============================================================
+ WriteTicker::WriteTicker (Sender_exec_i &callback)
+ : callback_ (callback)
+ {
+ }
+
+ int
+ WriteTicker::handle_timeout (const ACE_Time_Value &, const void *)
+ {
+ // Notify the subscribers
+ this->callback_.write_one ();
+ return 0;
+ }
+
+ //============================================================
+ // Component Executor Implementation Class: Sender_exec_i
+ //============================================================
+ Sender_exec_i::Sender_exec_i (void)
+ : iterations_ (1000),
+ datalen_(100),
+ datalen_idx_(0),
+ nr_of_runs_(10),
+ sleep_(10),
+ matched_(false),
+ number_of_subscribers_(1),
+ tv_total_ (0L),
+ tv_max_ (0L),
+ tv_min_ (0L),
+ count_ (0), // Number of returned messages.
+ number_of_msg_(0), // Number of sent messages.
+ timer_(false),
+ received_(false),
+ seq_num_(0),
+ sigma_duration_squared_(0)
+ {
+ this->ticker_ = new WriteTicker (*this);
+ }
+
+ Sender_exec_i::~Sender_exec_i (void)
+ {
+ }
+
+ static int compare_two_longs (const void *long1, const void *long2)
+ {
+ return (int)(*(ACE_UINT64*)long1 - *(ACE_UINT64*)long2);
+ }
+
+ void
+ Sender_exec_i::write_one (void)
+ {
+ if((this->number_of_msg_ == 0) && (this->datalen_idx_ == 0))
+ {
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_test_);
+ }
+ // First message sent always, next messages only as previous sent message
+ // is received back.
+ if( (this->number_of_msg_ == 0) || ( this->received_.value()))
+ {
+ // All messages send, stop timer.
+ if((this->iterations_ != 0) &&
+ (this->number_of_msg_ >= this->iterations_ ))
+ {
+ if( this->datalen_idx_ >= (this->nr_of_runs_ - 1))
+ {
+ this->stop();
+ this->timer_ = false;
+ this->calc_results();
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->end_time_test_);
+
+ }
+ else
+ {
+ this->calc_results();
+ this->reset_results();
+ ++this->datalen_idx_;
+ this->datalen_ = this->datalen_range_[this->datalen_idx_];
+ this->test_topic_.data.length(this->datalen_);
+ }
+ }
+ else
+ {
+ try
+ {
+ this->test_topic_.seq_num = this->number_of_msg_;
+
+ // Keep last sent seq_num, to control if message is sent back.
+ this->seq_num_ = this->number_of_msg_;
+ this->received_ = false;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_);
+ this->writer_->write_one (this->test_topic_, ::DDS::HANDLE_NIL);
+ }
+ catch (const CCM_DDS::InternalError& )
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ")
+ ACE_TEXT ("while wrinting sample with sequence_number <%u>.\n"),
+ this->test_topic_.seq_num));
+ }
+ ++this->number_of_msg_;
+ }
+ }
+ }
+
+ void
+ Sender_exec_i::read(LatencyTTTest & an_instance,ACE_UINT64 receive_time)
+ {
+ if (an_instance.seq_num == this->seq_num_)
+ {
+ this->record_time( receive_time);
+ this->received_ = true;
+ }
+ }
+
+ void
+ Sender_exec_i::reset_results()
+ {
+ this->count_ = 0;
+ this->duration_times = new ACE_UINT64[this->iterations_];
+ this->tv_total_ = 0L;
+ this->tv_max_ = 0L;
+ this->tv_min_ = 0L;
+ this->number_of_msg_ = 0;
+ this->received_ = false;
+ this->seq_num_ = 0;
+ this->sigma_duration_squared_ = 0;
+ }
+
+ void
+ Sender_exec_i::calc_results()
+ {
+ // Sort all duration times.
+ qsort(this->duration_times,
+ this->count_,
+ sizeof(ACE_UINT64),
+ compare_two_longs);
+
+ // Show latency_50_percentile, latency_90_percentile,
+ // latency_99_percentile and latency_99.99_percentile.
+ // For example duration_times[per50] is the median i.e. 50% of the
+ // samples have a latency time <= duration_times[per50]
+ int const per50 = this->count_ / 2;
+ int const per90 = (int)(this->count_ * 0.90);
+ int const per99 = (int)(this->count_ * 0.990);
+ int const per9999 = (int)(this->count_ * 0.9999);
+
+ double const avg = (double)(this->tv_total_ / this->count_);
+ // Calculate standard deviation.
+ double const roundtrip_time_std = sqrt(
+ (this->sigma_duration_squared_ / (double)this->count_) -
+ (avg * avg));
+
+ // Show values as float, in order to be comparable with RTI performance test.
+ if (this->count_ > 0)
+ {
+ if( this->datalen_idx_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Collecting statistics on %d samples per message size.\n"
+ "This is the roundtrip time, *not* the one-way-latency\n"
+ "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%,"
+ " max us\n"
+ "------,-------,-------,-------,-------,-------,-------,-------,"
+ "-------\n"
+ "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n",
+ this->count_,
+ this->datalen_,
+ roundtrip_time_std,
+ avg,
+ (double)this->tv_min_,
+ (double)this->duration_times[per50-1],
+ (double)this->duration_times[per90-1],
+ (double)this->duration_times[per99-1],
+ (double)this->duration_times[per9999-1],
+ (double)this->tv_max_));
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n",
+ this->datalen_,
+ roundtrip_time_std,
+ avg,
+ (double)this->tv_min_,
+ (double)this->duration_times[per50-1],
+ (double)this->duration_times[per90-1],
+ (double)this->duration_times[per99-1],
+ (double)this->duration_times[per9999-1],
+ (double)this->tv_max_));
+ }
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n "
+ "No samples received back.\n"));
+ }
+ }
+
+ ::CCM_DDS::CCM_ConnectorStatusListener_ptr
+ Sender_exec_i::get_connector_status (void)
+ {
+ return new ConnectorStatusListener_exec_i (
+ this->matched_,
+ this->number_of_subscribers_,
+ *this);
+ }
+
+ ::LatencyTT_Test::LatencyTTTestConn::CCM_Listener_ptr
+ Sender_exec_i::get_ping_listen_data_listener (void)
+ {
+ return new LatencyTTTest_Listener_exec_i (*this);
+ }
+
+ ::CCM_DDS::CCM_PortStatusListener_ptr
+ Sender_exec_i::get_ping_listen_status (void)
+ {
+ return ::CCM_DDS::CCM_PortStatusListener::_nil ();
+ }
+
+ void
+ Sender_exec_i::start (void)
+ {
+ // This->sleep_ is in ms
+ unsigned int sec = this->sleep_/1000;
+ unsigned int usec = (this->sleep_ % 1000) * 1000;
+ (void) ACE_High_Res_Timer::global_scale_factor ();
+ this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->timer_queue()->gettimeofday (&ACE_High_Res_Timer::gettimeofday_hr);
+ if (this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->schedule_timer(
+ this->ticker_,
+ 0,
+ ACE_Time_Value (5, 0),
+ ACE_Time_Value (sec, usec)) == -1)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Sender_exec_i::start : ")
+ ACE_TEXT ("Error scheduling timer")));
+ }
+ this->timer_ = true;
+ }
+
+ void
+ Sender_exec_i::record_time (ACE_UINT64 receive_time)
+ {
+ ACE_UINT64 interval = receive_time - this->start_time_;
+ ACE_UINT64 duration = interval - this->_clock_overhead_;
+ int i = ++this->count_;
+ this->duration_times[i-1] = duration;
+ this->sigma_duration_squared_ += (double)duration * (double)duration;
+ this->tv_total_ += duration;
+ if (duration > this->tv_max_ || (this->tv_max_ == 0L))
+ {
+ this->tv_max_ = duration;
+ }
+ if (duration < this->tv_min_ || (this->tv_min_ == 0L))
+ {
+ this->tv_min_ = duration;
+ }
+ }
+
+ ::CORBA::ULong
+ Sender_exec_i::iterations (void)
+ {
+ return this->iterations_;
+ }
+
+ void
+ Sender_exec_i::iterations (::CORBA::ULong iterations)
+ {
+ if (iterations == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("ERROR: Number of iterations must be greater as '0'\n")));
+ throw ::CORBA::BAD_PARAM ();
+ }
+ else
+ {
+ this->iterations_ = iterations;
+ }
+ }
+
+ ::CORBA::UShort
+ Sender_exec_i::sleep (void)
+ {
+ return this->sleep_;
+ }
+
+ void
+ Sender_exec_i::sleep (::CORBA::UShort sleep)
+ {
+ this->sleep_ = sleep;
+ }
+
+ ::CORBA::UShort
+ Sender_exec_i::number_of_sub (void)
+ {
+ return this->number_of_subscribers_;
+ }
+
+ void
+ Sender_exec_i::number_of_sub (::CORBA::UShort number_of_sub)
+ {
+ if (number_of_sub > 0)
+ {
+ this->number_of_subscribers_ = number_of_sub;
+ }
+ else
+ {
+ this->number_of_subscribers_ = 1;
+ }
+ }
+
+ void
+ Sender_exec_i::calculate_clock_overhead()
+ {
+ int num_of_loops_clock = 320;
+ ACE_UINT64 begin_time;
+ ACE_UINT64 clock_roundtrip_time;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (begin_time);
+ for (int i = 0; i < num_of_loops_clock; ++i)
+ {
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (clock_roundtrip_time);
+ }
+ ACE_UINT64 total_time = clock_roundtrip_time - begin_time;
+ this->_clock_overhead_ = (long)(total_time /num_of_loops_clock);
+ }
+
+ void
+ Sender_exec_i::init_values (void)
+ {
+ this->duration_times = new ACE_UINT64[this->iterations_];
+ this->datalen_range_ = new CORBA::Short[this->nr_of_runs_];
+ int start = 16;
+ for(int i = 0; i < this->nr_of_runs_; i++)
+ {
+ this->datalen_range_[i] = start;
+ start = 2 * start;
+ }
+
+ this->datalen_ = this->datalen_range_[0];
+
+ // make instances of Topic
+ this->test_topic_.seq_num = 0;
+ this->test_topic_.data.length (this->datalen_);
+ calculate_clock_overhead();
+ }
+
+ void
+ Sender_exec_i::set_session_context (::Components::SessionContext_ptr ctx)
+ {
+ this->context_ =
+ ::LatencyTT_Test::CCM_Sender_Context::_narrow (ctx);
+
+ if ( ::CORBA::is_nil (this->context_.in ()))
+ {
+ throw ::CORBA::INTERNAL ();
+ }
+ }
+
+ void
+ Sender_exec_i::configuration_complete (void)
+ {
+ }
+
+ void
+ Sender_exec_i::ccm_activate (void)
+ {
+ try
+ {
+ this->writer_ = this->context_->get_connection_info_write_data ();
+ ::CCM_DDS::DataListenerControl_var dlc =
+ this->context_->get_connection_ping_listen_data_control ();
+ dlc->mode (::CCM_DDS::ONE_BY_ONE);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Exception caught\n")));
+ }
+ catch (...)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Unknown exception caught\n")));
+ }
+ this->init_values();
+ }
+
+ void
+ Sender_exec_i::stop (void)
+ {
+ if (this->timer_.value ())
+ {
+ this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_);
+ delete this->ticker_;
+ }
+ }
+
+ void
+ Sender_exec_i::ccm_passivate (void)
+ {
+ this->stop();
+ }
+
+ void
+ Sender_exec_i::ccm_remove (void)
+ {
+ if ((this->nr_of_runs_ -1) != this->datalen_idx_)
+ {
+ if (this->datalen_idx_ == 0)
+ {
+ ACE_ERROR ((LM_ERROR, "ERROR SENDER: No run has taken place.\n"));
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER : %u of %u runs completed.\n"
+ " Number of messages sent of last run (%u): %u\n",
+ this->datalen_idx_,
+ this->nr_of_runs_,
+ this->datalen_idx_ + 1,
+ this->number_of_msg_));
+ }
+ }
+ else
+ {
+ ACE_UINT64 test_time_usec = this->end_time_test_ -
+ this->start_time_test_;
+
+ double sec = (double)test_time_usec / (1000 * 1000);
+ ACE_DEBUG ((LM_DEBUG, "TEST successful, number of runs (%u) of "
+ "%u messages in %3.3f seconds.\n",
+ this->nr_of_runs_,
+ this->number_of_msg_, sec));
+ }
+ }
+
+ extern "C" SENDER_EXEC_Export ::Components::EnterpriseComponent_ptr
+ create_LatencyTT_Test_Sender_Impl (void)
+ {
+ ::Components::EnterpriseComponent_ptr retval =
+ ::Components::EnterpriseComponent::_nil ();
+
+ ACE_NEW_NORETURN (
+ retval,
+ Sender_exec_i);
+
+ return retval;
+ }
+}