diff options
Diffstat (limited to 'CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp')
-rw-r--r-- | CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp | 183 |
1 files changed, 107 insertions, 76 deletions
diff --git a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp b/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp index 66ae2f02a67..977950884dc 100644 --- a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp +++ b/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp @@ -9,6 +9,7 @@ #include "ace/Reactor.h" #include "ace/High_Res_Timer.h" + namespace CIAO_Latency_Test_Sender_Impl { //============================================================ @@ -32,13 +33,13 @@ namespace CIAO_Latency_Test_Sender_Impl { ACE_UINT64 receive_time = 0; - //only interested in messages received with a latency_ping = 0 (messages sent beck by receiver) + //only interested in messages received with a latency_ping = 0 (messages sent back by receiver) if( an_instance.ping == 0) - { - ACE_High_Res_Timer::gettimeofday_hr ().to_usec ( receive_time); - this->callback_.read(an_instance, receive_time); - } - } + { + ACE_High_Res_Timer::gettimeofday_hr ().to_usec ( receive_time); + this->callback_.read(an_instance, receive_time); + } + } void LatencyTest_Listener_exec_i::on_many_data ( @@ -123,7 +124,6 @@ namespace CIAO_Latency_Test_Sender_Impl //============================================================ Sender_exec_i::Sender_exec_i (void) : iterations_ (1000), - keys_ (1), datalen_(100), sleep_(10), matched_(false), @@ -135,7 +135,8 @@ namespace CIAO_Latency_Test_Sender_Impl number_of_msg_(0), //number of sent messages timer_(false), received_(false), - seq_num_(0) + seq_num_(0), + sigma_duration_squared_(0) { this->ticker_ = new WriteTicker (*this); } @@ -144,6 +145,11 @@ namespace CIAO_Latency_Test_Sender_Impl { } + static int compare_two_longs(const void *long1, const void *long2) + { + return (*(CORBA::Long*)long1 - *(CORBA::Long*)long2 ); + } + void Sender_exec_i::write_one (void) { @@ -152,9 +158,9 @@ namespace CIAO_Latency_Test_Sender_Impl if( (this->number_of_msg_ == 0) || ( this->received_.value())) { // all messages send, stop timer - if((this->iterations_ != 0) && (this->number_of_msg_ >= (this->iterations_ * this->keys_))) + if((this->iterations_ != 0) && (this->number_of_msg_ >= this->iterations_ )) { - this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_); + this->stop(); this->timer_ = false; } else @@ -162,20 +168,20 @@ namespace CIAO_Latency_Test_Sender_Impl try { //send messages with indicator (ping = 1L) so that subscriber knows that this message has to sent back. - this->last_key_->second->ping = 1L; - this->last_key_->second->seq_num = this->number_of_msg_; + this->test_topic_.ping = 1L; + this->test_topic_.seq_num = this->number_of_msg_;; //keep last sent seq_num, in order to control if message is sent back. this->seq_num_ = this->number_of_msg_; - - ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_); - this->writer_->write_one (this->last_key_->second, ::DDS::HANDLE_NIL); - } + this->received_ = false; + ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_); + this->writer_->write_one (this->test_topic_, ::DDS::HANDLE_NIL); + } catch (const CCM_DDS::InternalError& ) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") - ACE_TEXT ("while updating writer info for <%C>.\n"), - this->last_key_->first.c_str ())); + ACE_TEXT ("while updating writer info for <%u>.\n"), + this->test_topic_.seq_num)); } ++this->number_of_msg_; } @@ -183,7 +189,7 @@ namespace CIAO_Latency_Test_Sender_Impl } void - Sender_exec_i::read(LatencyTest an_instance,ACE_UINT64 receive_time) + Sender_exec_i::read(LatencyTest an_instance,ACE_UINT64 receive_time) { if (an_instance.seq_num == this->seq_num_.value()) { @@ -218,19 +224,6 @@ namespace CIAO_Latency_Test_Sender_Impl { ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, _guard, this->mutex_, CORBA::INTERNAL ()); - for (CORBA::UShort i = 1; i < this->keys_ + 1; ++i) - { - char key[7]; - LatencyTest *new_key = new LatencyTest; - ACE_OS::sprintf (key, "KEY_%d", i); - new_key->key = CORBA::string_dup(key); - new_key->seq_num = 0; - new_key->data.allocbuf(this->datalen_); - // to do : freebuf, where ? - this->samples_[key] = new_key; - } - this->last_key_ = this->samples_.begin (); - //this->sleep_ is in ms unsigned int sec = this->sleep_/1000; unsigned int usec = (this->sleep_ % 1000) * 1000; @@ -251,9 +244,13 @@ namespace CIAO_Latency_Test_Sender_Impl void Sender_exec_i::record_time (ACE_UINT64 receive_time) { - ACE_UINT64 interval = ( receive_time - this->start_time_); - ++this->count_; + ACE_UINT64 interval = ( receive_time - this->start_time_); + ++this->count_; long duration = static_cast <CORBA::Long>(interval); + int i = this->count_; + // keep all duration times for statistics + this->duration_times[i-1] = duration; + this->sigma_duration_squared_ += (double)duration * (double)duration; this->tv_total_ += duration; if (duration > this->tv_max_.value ()|| (this->tv_max_.value () == 0L)) this->tv_max_ = duration; @@ -270,19 +267,16 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time) void Sender_exec_i::iterations (::CORBA::ULong iterations) { - this->iterations_ = iterations; - } - - ::CORBA::UShort - Sender_exec_i::keys (void) - { - return this->keys_; - } - - void - Sender_exec_i::keys (::CORBA::UShort keys) - { - this->keys_ = keys; + if (iterations == 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("ERROR: iterations must be greater as '0'\n"))); + throw ::CORBA::BAD_PARAM (); + } + else + { + this->iterations_ = iterations; + } } ::CORBA::UShort @@ -325,8 +319,7 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time) void Sender_exec_i::datalen (::CORBA::UShort datalen) { - // 7 is length of key, has to be removed - int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong) + 7; + int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong); if((datalen <= overhead_size) || (datalen > MAX_DATA_SEQUENCE_LENGTH)) { ACE_ERROR ((LM_ERROR, @@ -334,9 +327,9 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time) throw ::CORBA::BAD_PARAM (); } this->datalen_ = datalen - overhead_size; + this->duration_times = new CORBA::Long[this->iterations_]; } - void Sender_exec_i::set_session_context (::Components::SessionContext_ptr ctx) { @@ -375,43 +368,81 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time) ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Unknown exception caught\n"))); } + //make instances of Topic + this->test_topic_.seq_num = 0; + this->test_topic_.ping = 0; + this->test_topic_.data.length (this->datalen_); } - + void - Sender_exec_i::ccm_passivate (void) + Sender_exec_i::stop (void) { if (this->timer_.value ()) - this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_); + { + this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_); + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sender_exec_i::stop : Timer canceled.\n"))); + delete this->ticker_; + } + } + + void + Sender_exec_i::ccm_passivate (void) + { + this->stop(); } void Sender_exec_i::ccm_remove (void) { - ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER number of messages sent: %u\n", - (this->number_of_msg_ + 1))); - - if( this->count_.value () > 0) - { - double avg = this->tv_total_.value () / this->count_.value (); - ACE_DEBUG ((LM_DEBUG,"Collecting statistics on %d samples with message size %u.\n" - " TO DO: calculate stdev, 50%%,90%%,99%% and 99.99%%\n" - "This is the roundtrip time, *not* the one-way-latency\n" - "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%, max us\n" - "------,-------,-------,-------,-------,-------,-------,-------,-------\n" - "%6d,-------,%7.1f,%7u,-------,-------,-------,-------,%7u\n", - this->count_.value (), - this->datalen_, - this->datalen_, - avg, - this->tv_min_.value (), - this->tv_max_.value ())); - } - else - { - ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n " + ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER number of messages sent: %u\n", + (this->number_of_msg_))); + + //sort all duration times + qsort(this->duration_times, this->count_,sizeof(CORBA::Long), compare_two_longs); + /* Show latency_50_percentile, latency_90_percentile, latency_99_percentile and + * latency_99.99_percentile. + * For example duration_times[per50] is the median i.e. 50% of the + * samples have a latency time <= duration_times[per50] + */ + int per50 = this->count_/2; + int per90 = (int)(this->count_ * 0.90); + int per99 = (int)(this->count_ * 0.990); + int per999 = (int)(this->count_ * 0.999); + + double avg = this->tv_total_.value () / this->count_; + //calculate standard deviation + double _roundtrip_time_std = sqrt( + (this->sigma_duration_squared_ / (double)this->count_) - + (avg * avg)); + + int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong); + CORBA::UShort datalen = overhead_size + this->datalen_; +// if( this->count_.value () > 0) + if( this->count_ > 0) + { + ACE_DEBUG ((LM_DEBUG,"Collecting statistics on %d samples with message size %u.\n" + "This is the roundtrip time, *not* the one-way-latency\n" + "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.9%%, max us\n" + "------,-------,-------,-------,-------,-------,-------,-------,-------\n" + "%6d,%7.1f,%7.1f,%7u,%7u,%7u,%7u,%7u,%7u\n", + this->count_, + datalen, + datalen, + _roundtrip_time_std, + avg, + this->tv_min_.value (), + this->duration_times[per50-1], + this->duration_times[per90-1], + this->duration_times[per99-1], + this->duration_times[per999-1], + this->tv_max_.value ())); + } + else + { + ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n " "No samples reveived back.\n")); - } + } } extern "C" SENDER_EXEC_Export ::Components::EnterpriseComponent_ptr |