summaryrefslogtreecommitdiff
path: root/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp')
-rw-r--r--CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp183
1 files changed, 107 insertions, 76 deletions
diff --git a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp b/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
index 66ae2f02a67..977950884dc 100644
--- a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
+++ b/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
@@ -9,6 +9,7 @@
#include "ace/Reactor.h"
#include "ace/High_Res_Timer.h"
+
namespace CIAO_Latency_Test_Sender_Impl
{
//============================================================
@@ -32,13 +33,13 @@ namespace CIAO_Latency_Test_Sender_Impl
{
ACE_UINT64 receive_time = 0;
- //only interested in messages received with a latency_ping = 0 (messages sent beck by receiver)
+ //only interested in messages received with a latency_ping = 0 (messages sent back by receiver)
if( an_instance.ping == 0)
- {
- ACE_High_Res_Timer::gettimeofday_hr ().to_usec ( receive_time);
- this->callback_.read(an_instance, receive_time);
- }
- }
+ {
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec ( receive_time);
+ this->callback_.read(an_instance, receive_time);
+ }
+ }
void
LatencyTest_Listener_exec_i::on_many_data (
@@ -123,7 +124,6 @@ namespace CIAO_Latency_Test_Sender_Impl
//============================================================
Sender_exec_i::Sender_exec_i (void)
: iterations_ (1000),
- keys_ (1),
datalen_(100),
sleep_(10),
matched_(false),
@@ -135,7 +135,8 @@ namespace CIAO_Latency_Test_Sender_Impl
number_of_msg_(0), //number of sent messages
timer_(false),
received_(false),
- seq_num_(0)
+ seq_num_(0),
+ sigma_duration_squared_(0)
{
this->ticker_ = new WriteTicker (*this);
}
@@ -144,6 +145,11 @@ namespace CIAO_Latency_Test_Sender_Impl
{
}
+ static int compare_two_longs(const void *long1, const void *long2)
+ {
+ return (*(CORBA::Long*)long1 - *(CORBA::Long*)long2 );
+ }
+
void
Sender_exec_i::write_one (void)
{
@@ -152,9 +158,9 @@ namespace CIAO_Latency_Test_Sender_Impl
if( (this->number_of_msg_ == 0) || ( this->received_.value()))
{
// all messages send, stop timer
- if((this->iterations_ != 0) && (this->number_of_msg_ >= (this->iterations_ * this->keys_)))
+ if((this->iterations_ != 0) && (this->number_of_msg_ >= this->iterations_ ))
{
- this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_);
+ this->stop();
this->timer_ = false;
}
else
@@ -162,20 +168,20 @@ namespace CIAO_Latency_Test_Sender_Impl
try
{
//send messages with indicator (ping = 1L) so that subscriber knows that this message has to sent back.
- this->last_key_->second->ping = 1L;
- this->last_key_->second->seq_num = this->number_of_msg_;
+ this->test_topic_.ping = 1L;
+ this->test_topic_.seq_num = this->number_of_msg_;;
//keep last sent seq_num, in order to control if message is sent back.
this->seq_num_ = this->number_of_msg_;
-
- ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_);
- this->writer_->write_one (this->last_key_->second, ::DDS::HANDLE_NIL);
- }
+ this->received_ = false;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_);
+ this->writer_->write_one (this->test_topic_, ::DDS::HANDLE_NIL);
+ }
catch (const CCM_DDS::InternalError& )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ")
- ACE_TEXT ("while updating writer info for <%C>.\n"),
- this->last_key_->first.c_str ()));
+ ACE_TEXT ("while updating writer info for <%u>.\n"),
+ this->test_topic_.seq_num));
}
++this->number_of_msg_;
}
@@ -183,7 +189,7 @@ namespace CIAO_Latency_Test_Sender_Impl
}
void
- Sender_exec_i::read(LatencyTest an_instance,ACE_UINT64 receive_time)
+ Sender_exec_i::read(LatencyTest an_instance,ACE_UINT64 receive_time)
{
if (an_instance.seq_num == this->seq_num_.value())
{
@@ -218,19 +224,6 @@ namespace CIAO_Latency_Test_Sender_Impl
{
ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, _guard,
this->mutex_, CORBA::INTERNAL ());
- for (CORBA::UShort i = 1; i < this->keys_ + 1; ++i)
- {
- char key[7];
- LatencyTest *new_key = new LatencyTest;
- ACE_OS::sprintf (key, "KEY_%d", i);
- new_key->key = CORBA::string_dup(key);
- new_key->seq_num = 0;
- new_key->data.allocbuf(this->datalen_);
- // to do : freebuf, where ?
- this->samples_[key] = new_key;
- }
- this->last_key_ = this->samples_.begin ();
-
//this->sleep_ is in ms
unsigned int sec = this->sleep_/1000;
unsigned int usec = (this->sleep_ % 1000) * 1000;
@@ -251,9 +244,13 @@ namespace CIAO_Latency_Test_Sender_Impl
void
Sender_exec_i::record_time (ACE_UINT64 receive_time)
{
- ACE_UINT64 interval = ( receive_time - this->start_time_);
- ++this->count_;
+ ACE_UINT64 interval = ( receive_time - this->start_time_);
+ ++this->count_;
long duration = static_cast <CORBA::Long>(interval);
+ int i = this->count_;
+ // keep all duration times for statistics
+ this->duration_times[i-1] = duration;
+ this->sigma_duration_squared_ += (double)duration * (double)duration;
this->tv_total_ += duration;
if (duration > this->tv_max_.value ()|| (this->tv_max_.value () == 0L))
this->tv_max_ = duration;
@@ -270,19 +267,16 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
void
Sender_exec_i::iterations (::CORBA::ULong iterations)
{
- this->iterations_ = iterations;
- }
-
- ::CORBA::UShort
- Sender_exec_i::keys (void)
- {
- return this->keys_;
- }
-
- void
- Sender_exec_i::keys (::CORBA::UShort keys)
- {
- this->keys_ = keys;
+ if (iterations == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("ERROR: iterations must be greater as '0'\n")));
+ throw ::CORBA::BAD_PARAM ();
+ }
+ else
+ {
+ this->iterations_ = iterations;
+ }
}
::CORBA::UShort
@@ -325,8 +319,7 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
void
Sender_exec_i::datalen (::CORBA::UShort datalen)
{
- // 7 is length of key, has to be removed
- int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong) + 7;
+ int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong);
if((datalen <= overhead_size) || (datalen > MAX_DATA_SEQUENCE_LENGTH))
{
ACE_ERROR ((LM_ERROR,
@@ -334,9 +327,9 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
throw ::CORBA::BAD_PARAM ();
}
this->datalen_ = datalen - overhead_size;
+ this->duration_times = new CORBA::Long[this->iterations_];
}
-
void
Sender_exec_i::set_session_context (::Components::SessionContext_ptr ctx)
{
@@ -375,43 +368,81 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Unknown exception caught\n")));
}
+ //make instances of Topic
+ this->test_topic_.seq_num = 0;
+ this->test_topic_.ping = 0;
+ this->test_topic_.data.length (this->datalen_);
}
-
+
void
- Sender_exec_i::ccm_passivate (void)
+ Sender_exec_i::stop (void)
{
if (this->timer_.value ())
- this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_);
+ {
+ this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_);
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sender_exec_i::stop : Timer canceled.\n")));
+ delete this->ticker_;
+ }
+ }
+
+ void
+ Sender_exec_i::ccm_passivate (void)
+ {
+ this->stop();
}
void
Sender_exec_i::ccm_remove (void)
{
- ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER number of messages sent: %u\n",
- (this->number_of_msg_ + 1)));
-
- if( this->count_.value () > 0)
- {
- double avg = this->tv_total_.value () / this->count_.value ();
- ACE_DEBUG ((LM_DEBUG,"Collecting statistics on %d samples with message size %u.\n"
- " TO DO: calculate stdev, 50%%,90%%,99%% and 99.99%%\n"
- "This is the roundtrip time, *not* the one-way-latency\n"
- "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%, max us\n"
- "------,-------,-------,-------,-------,-------,-------,-------,-------\n"
- "%6d,-------,%7.1f,%7u,-------,-------,-------,-------,%7u\n",
- this->count_.value (),
- this->datalen_,
- this->datalen_,
- avg,
- this->tv_min_.value (),
- this->tv_max_.value ()));
- }
- else
- {
- ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n "
+ ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER number of messages sent: %u\n",
+ (this->number_of_msg_)));
+
+ //sort all duration times
+ qsort(this->duration_times, this->count_,sizeof(CORBA::Long), compare_two_longs);
+ /* Show latency_50_percentile, latency_90_percentile, latency_99_percentile and
+ * latency_99.99_percentile.
+ * For example duration_times[per50] is the median i.e. 50% of the
+ * samples have a latency time <= duration_times[per50]
+ */
+ int per50 = this->count_/2;
+ int per90 = (int)(this->count_ * 0.90);
+ int per99 = (int)(this->count_ * 0.990);
+ int per999 = (int)(this->count_ * 0.999);
+
+ double avg = this->tv_total_.value () / this->count_;
+ //calculate standard deviation
+ double _roundtrip_time_std = sqrt(
+ (this->sigma_duration_squared_ / (double)this->count_) -
+ (avg * avg));
+
+ int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong);
+ CORBA::UShort datalen = overhead_size + this->datalen_;
+// if( this->count_.value () > 0)
+ if( this->count_ > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"Collecting statistics on %d samples with message size %u.\n"
+ "This is the roundtrip time, *not* the one-way-latency\n"
+ "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.9%%, max us\n"
+ "------,-------,-------,-------,-------,-------,-------,-------,-------\n"
+ "%6d,%7.1f,%7.1f,%7u,%7u,%7u,%7u,%7u,%7u\n",
+ this->count_,
+ datalen,
+ datalen,
+ _roundtrip_time_std,
+ avg,
+ this->tv_min_.value (),
+ this->duration_times[per50-1],
+ this->duration_times[per90-1],
+ this->duration_times[per99-1],
+ this->duration_times[per999-1],
+ this->tv_max_.value ()));
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n "
"No samples reveived back.\n"));
- }
+ }
}
extern "C" SENDER_EXEC_Export ::Components::EnterpriseComponent_ptr