summaryrefslogtreecommitdiff
path: root/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp')
-rw-r--r--CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp183
1 files changed, 76 insertions, 107 deletions
diff --git a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp b/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
index 977950884dc..66ae2f02a67 100644
--- a/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
+++ b/CIAO/connectors/dds4ccm/performance-tests/Latency/Sender/Latency_Test_Sender_exec.cpp
@@ -9,7 +9,6 @@
#include "ace/Reactor.h"
#include "ace/High_Res_Timer.h"
-
namespace CIAO_Latency_Test_Sender_Impl
{
//============================================================
@@ -33,13 +32,13 @@ namespace CIAO_Latency_Test_Sender_Impl
{
ACE_UINT64 receive_time = 0;
- //only interested in messages received with a latency_ping = 0 (messages sent back by receiver)
+ //only interested in messages received with a latency_ping = 0 (messages sent beck by receiver)
if( an_instance.ping == 0)
- {
- ACE_High_Res_Timer::gettimeofday_hr ().to_usec ( receive_time);
- this->callback_.read(an_instance, receive_time);
- }
- }
+ {
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec ( receive_time);
+ this->callback_.read(an_instance, receive_time);
+ }
+ }
void
LatencyTest_Listener_exec_i::on_many_data (
@@ -124,6 +123,7 @@ namespace CIAO_Latency_Test_Sender_Impl
//============================================================
Sender_exec_i::Sender_exec_i (void)
: iterations_ (1000),
+ keys_ (1),
datalen_(100),
sleep_(10),
matched_(false),
@@ -135,8 +135,7 @@ namespace CIAO_Latency_Test_Sender_Impl
number_of_msg_(0), //number of sent messages
timer_(false),
received_(false),
- seq_num_(0),
- sigma_duration_squared_(0)
+ seq_num_(0)
{
this->ticker_ = new WriteTicker (*this);
}
@@ -145,11 +144,6 @@ namespace CIAO_Latency_Test_Sender_Impl
{
}
- static int compare_two_longs(const void *long1, const void *long2)
- {
- return (*(CORBA::Long*)long1 - *(CORBA::Long*)long2 );
- }
-
void
Sender_exec_i::write_one (void)
{
@@ -158,9 +152,9 @@ namespace CIAO_Latency_Test_Sender_Impl
if( (this->number_of_msg_ == 0) || ( this->received_.value()))
{
// all messages send, stop timer
- if((this->iterations_ != 0) && (this->number_of_msg_ >= this->iterations_ ))
+ if((this->iterations_ != 0) && (this->number_of_msg_ >= (this->iterations_ * this->keys_)))
{
- this->stop();
+ this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_);
this->timer_ = false;
}
else
@@ -168,20 +162,20 @@ namespace CIAO_Latency_Test_Sender_Impl
try
{
//send messages with indicator (ping = 1L) so that subscriber knows that this message has to sent back.
- this->test_topic_.ping = 1L;
- this->test_topic_.seq_num = this->number_of_msg_;;
+ this->last_key_->second->ping = 1L;
+ this->last_key_->second->seq_num = this->number_of_msg_;
//keep last sent seq_num, in order to control if message is sent back.
this->seq_num_ = this->number_of_msg_;
- this->received_ = false;
- ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_);
- this->writer_->write_one (this->test_topic_, ::DDS::HANDLE_NIL);
- }
+
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_);
+ this->writer_->write_one (this->last_key_->second, ::DDS::HANDLE_NIL);
+ }
catch (const CCM_DDS::InternalError& )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ")
- ACE_TEXT ("while updating writer info for <%u>.\n"),
- this->test_topic_.seq_num));
+ ACE_TEXT ("while updating writer info for <%C>.\n"),
+ this->last_key_->first.c_str ()));
}
++this->number_of_msg_;
}
@@ -189,7 +183,7 @@ namespace CIAO_Latency_Test_Sender_Impl
}
void
- Sender_exec_i::read(LatencyTest an_instance,ACE_UINT64 receive_time)
+ Sender_exec_i::read(LatencyTest an_instance,ACE_UINT64 receive_time)
{
if (an_instance.seq_num == this->seq_num_.value())
{
@@ -224,6 +218,19 @@ namespace CIAO_Latency_Test_Sender_Impl
{
ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, _guard,
this->mutex_, CORBA::INTERNAL ());
+ for (CORBA::UShort i = 1; i < this->keys_ + 1; ++i)
+ {
+ char key[7];
+ LatencyTest *new_key = new LatencyTest;
+ ACE_OS::sprintf (key, "KEY_%d", i);
+ new_key->key = CORBA::string_dup(key);
+ new_key->seq_num = 0;
+ new_key->data.allocbuf(this->datalen_);
+ // to do : freebuf, where ?
+ this->samples_[key] = new_key;
+ }
+ this->last_key_ = this->samples_.begin ();
+
//this->sleep_ is in ms
unsigned int sec = this->sleep_/1000;
unsigned int usec = (this->sleep_ % 1000) * 1000;
@@ -244,13 +251,9 @@ namespace CIAO_Latency_Test_Sender_Impl
void
Sender_exec_i::record_time (ACE_UINT64 receive_time)
{
- ACE_UINT64 interval = ( receive_time - this->start_time_);
- ++this->count_;
+ ACE_UINT64 interval = ( receive_time - this->start_time_);
+ ++this->count_;
long duration = static_cast <CORBA::Long>(interval);
- int i = this->count_;
- // keep all duration times for statistics
- this->duration_times[i-1] = duration;
- this->sigma_duration_squared_ += (double)duration * (double)duration;
this->tv_total_ += duration;
if (duration > this->tv_max_.value ()|| (this->tv_max_.value () == 0L))
this->tv_max_ = duration;
@@ -267,16 +270,19 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
void
Sender_exec_i::iterations (::CORBA::ULong iterations)
{
- if (iterations == 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("ERROR: iterations must be greater as '0'\n")));
- throw ::CORBA::BAD_PARAM ();
- }
- else
- {
- this->iterations_ = iterations;
- }
+ this->iterations_ = iterations;
+ }
+
+ ::CORBA::UShort
+ Sender_exec_i::keys (void)
+ {
+ return this->keys_;
+ }
+
+ void
+ Sender_exec_i::keys (::CORBA::UShort keys)
+ {
+ this->keys_ = keys;
}
::CORBA::UShort
@@ -319,7 +325,8 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
void
Sender_exec_i::datalen (::CORBA::UShort datalen)
{
- int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong);
+ // 7 is length of key, has to be removed
+ int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong) + 7;
if((datalen <= overhead_size) || (datalen > MAX_DATA_SEQUENCE_LENGTH))
{
ACE_ERROR ((LM_ERROR,
@@ -327,9 +334,9 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
throw ::CORBA::BAD_PARAM ();
}
this->datalen_ = datalen - overhead_size;
- this->duration_times = new CORBA::Long[this->iterations_];
}
+
void
Sender_exec_i::set_session_context (::Components::SessionContext_ptr ctx)
{
@@ -368,81 +375,43 @@ Sender_exec_i::record_time (ACE_UINT64 receive_time)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Unknown exception caught\n")));
}
- //make instances of Topic
- this->test_topic_.seq_num = 0;
- this->test_topic_.ping = 0;
- this->test_topic_.data.length (this->datalen_);
- }
-
- void
- Sender_exec_i::stop (void)
- {
- if (this->timer_.value ())
- {
- this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_);
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sender_exec_i::stop : Timer canceled.\n")));
- delete this->ticker_;
- }
}
-
+
void
Sender_exec_i::ccm_passivate (void)
{
- this->stop();
+ if (this->timer_.value ())
+ this->context_->get_CCM_object()->_get_orb ()->orb_core ()->reactor ()->cancel_timer (this->ticker_);
}
void
Sender_exec_i::ccm_remove (void)
{
- ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER number of messages sent: %u\n",
- (this->number_of_msg_)));
-
- //sort all duration times
- qsort(this->duration_times, this->count_,sizeof(CORBA::Long), compare_two_longs);
- /* Show latency_50_percentile, latency_90_percentile, latency_99_percentile and
- * latency_99.99_percentile.
- * For example duration_times[per50] is the median i.e. 50% of the
- * samples have a latency time <= duration_times[per50]
- */
- int per50 = this->count_/2;
- int per90 = (int)(this->count_ * 0.90);
- int per99 = (int)(this->count_ * 0.990);
- int per999 = (int)(this->count_ * 0.999);
-
- double avg = this->tv_total_.value () / this->count_;
- //calculate standard deviation
- double _roundtrip_time_std = sqrt(
- (this->sigma_duration_squared_ / (double)this->count_) -
- (avg * avg));
-
- int overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULong);
- CORBA::UShort datalen = overhead_size + this->datalen_;
-// if( this->count_.value () > 0)
- if( this->count_ > 0)
- {
- ACE_DEBUG ((LM_DEBUG,"Collecting statistics on %d samples with message size %u.\n"
- "This is the roundtrip time, *not* the one-way-latency\n"
- "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.9%%, max us\n"
- "------,-------,-------,-------,-------,-------,-------,-------,-------\n"
- "%6d,%7.1f,%7.1f,%7u,%7u,%7u,%7u,%7u,%7u\n",
- this->count_,
- datalen,
- datalen,
- _roundtrip_time_std,
- avg,
- this->tv_min_.value (),
- this->duration_times[per50-1],
- this->duration_times[per90-1],
- this->duration_times[per99-1],
- this->duration_times[per999-1],
- this->tv_max_.value ()));
- }
- else
- {
- ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n "
+ ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER number of messages sent: %u\n",
+ (this->number_of_msg_ + 1)));
+
+ if( this->count_.value () > 0)
+ {
+ double avg = this->tv_total_.value () / this->count_.value ();
+ ACE_DEBUG ((LM_DEBUG,"Collecting statistics on %d samples with message size %u.\n"
+ " TO DO: calculate stdev, 50%%,90%%,99%% and 99.99%%\n"
+ "This is the roundtrip time, *not* the one-way-latency\n"
+ "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%, max us\n"
+ "------,-------,-------,-------,-------,-------,-------,-------,-------\n"
+ "%6d,-------,%7.1f,%7u,-------,-------,-------,-------,%7u\n",
+ this->count_.value (),
+ this->datalen_,
+ this->datalen_,
+ avg,
+ this->tv_min_.value (),
+ this->tv_max_.value ()));
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n "
"No samples reveived back.\n"));
- }
+ }
}
extern "C" SENDER_EXEC_Export ::Components::EnterpriseComponent_ptr