summaryrefslogtreecommitdiff
path: root/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/Latency_Sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/Latency_Sender.cpp')
-rw-r--r--modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/Latency_Sender.cpp656
1 files changed, 656 insertions, 0 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/Latency_Sender.cpp b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/Latency_Sender.cpp
new file mode 100644
index 00000000000..eead5a75c95
--- /dev/null
+++ b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/Latency_Sender.cpp
@@ -0,0 +1,656 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "tao/ORB_Core.h"
+#include "ace/Timer_Queue.h"
+#include "ace/Reactor.h"
+#include "ace/Env_Value_T.h"
+#include "Latency_Base.h"
+#include "Latency_BaseSupport.h"
+#include "Latency_BasePlugin.h"
+#include "ace/Tokenizer_T.h"
+
+#include <ndds/ndds_namespace_cpp.h>
+
+// Forward declarations
+class WriteTicker;
+
+// Global variables
+CORBA::UShort iterations_ = 1000;
+CORBA::UShort datalen_ = 100;
+CORBA::UShort datalen_idx_ = 0;
+CORBA::UShort nr_of_runs_ = 10;
+CORBA::UShort sleep_ = 2;
+ACE_UINT64 tv_total_ = 0;
+ACE_UINT64 tv_max_ = 0;
+ACE_UINT64 tv_min_ = 0;
+CORBA::UShort count_ = 0;
+CORBA::UShort number_of_msg_ = 0;
+bool received_ = false;
+CORBA::Long seq_num_ = 0;
+CORBA::Double sigma_duration_squared_;
+ACE_UINT64 start_time_ = 0;
+ACE_UINT64 start_time_test_ = 0;
+ACE_UINT64 end_time_test_ = 0;
+
+ACE_UINT64 * duration_times_;
+CORBA::Short * datalen_range_;
+ACE_UINT64 clock_overhead_;
+
+LatencyTest * instance_ = 0;
+
+LatencyTestDataWriter * test_data_writer_ = 0;
+
+const char * lib_name_ = 0;
+const char * prof_name_ = 0;
+
+CORBA::UShort domain_id_ = 0;
+CORBA::Boolean both_read_write_ = false;
+
+WriteTicker * ticker_ = 0;
+
+/* The listener of events and data from the middleware */
+class HelloListener: public DDSDataReaderListener
+{
+public:
+ void on_data_available(DDSDataReader *reader);
+};
+
+/* The dummy listener of events and data from the middleware */
+class DummyListener: public DDSDataReaderListener
+{
+};
+
+class WriteTicker :public ACE_Event_Handler
+{
+ public:
+ WriteTicker (void);
+ int handle_timeout (const ACE_Time_Value &, const void *);
+};
+
+void
+split_qos (const char * qos)
+{
+ char* buf = const_cast <char *> (qos);
+ ACE_Tokenizer_T<char> tok (buf);
+ tok.delimiter_replace ('#', 0);
+ for (char *p = tok.next (); p; p = tok.next ())
+ {
+ if (!lib_name_)
+ {
+ lib_name_ = p;
+ }
+ else if (!prof_name_)
+ {
+ prof_name_ = p;
+ }
+ }
+ ACE_DEBUG ((LM_DEBUG, "Sender : Found QoS profile %C %C\n",
+ lib_name_,
+ prof_name_));
+}
+
+int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("b:d:i:s:q:O"));
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'd':
+ domain_id_ = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 'i':
+ iterations_ = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 's':
+ sleep_ = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 'b':
+ both_read_write_ = true;
+ break;
+ case 'q':
+ {
+ const char * qos = get_opts.opt_arg ();
+ split_qos (qos);
+ }
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage:\n\n"
+ " -d <domain_id >\n"
+ " -i <iterations >\n"
+ " -s <sleep>\n"
+ " -q <QoS profile>\n"
+ " -b "
+ "\n"),
+ -1);
+ }
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+void
+calculate_clock_overhead (void)
+{
+ int num_of_loops_clock = 320;
+ ACE_UINT64 begin_time;
+ ACE_UINT64 clock_roundtrip_time;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (begin_time);
+ for (int i = 0; i < num_of_loops_clock; ++i)
+ {
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (clock_roundtrip_time);
+ }
+ ACE_UINT64 const total_time = clock_roundtrip_time - begin_time;
+ clock_overhead_ = (ACE_UINT64) (total_time / num_of_loops_clock);
+}
+
+void
+stop (void)
+{
+ if (ticker_)
+ {
+ ACE_Reactor::instance ()->cancel_timer (ticker_);
+ delete ticker_;
+ ticker_ = 0;
+ }
+}
+
+void
+init_values (void)
+{
+ duration_times_ = new ACE_UINT64[iterations_];
+ datalen_range_ = new CORBA::Short[nr_of_runs_];
+ int start = 16;
+ for(int i = 0; i < nr_of_runs_; i++)
+ {
+ datalen_range_[i] = start;
+ start = 2 * start;
+ }
+
+ datalen_ = datalen_range_[0];
+
+ // make instances of Topic
+ instance_->seq_num = 0;
+ instance_->data.length (datalen_);
+ calculate_clock_overhead ();
+}
+
+void
+record_time (ACE_UINT64 receive_time)
+{
+ ++count_;
+ ACE_UINT64 const interval = receive_time - start_time_;
+ ACE_UINT64 const duration = interval - clock_overhead_;
+ if (count_ > iterations_)
+ {
+ ACE_ERROR ((LM_ERROR, "ERROR: Internal error while getting more "
+ "messages back as expected.\n"));
+ }
+ else
+ {
+ duration_times_[count_-1] = duration;
+ sigma_duration_squared_ += (double)duration * (double)duration;
+ tv_total_ += duration;
+ if (duration > tv_max_ || (tv_max_ == 0L))
+ {
+ tv_max_ = duration;
+ }
+ if (duration < tv_min_ || (tv_min_ == 0L))
+ {
+ tv_min_ = duration;
+ }
+ }
+}
+
+void
+reset_results (void)
+{
+ count_ = 0;
+ duration_times_ = new ACE_UINT64[iterations_];
+ tv_total_ = 0L;
+ tv_max_ = 0L;
+ tv_min_ = 0L;
+ number_of_msg_ = 0;
+ received_ = false;
+ seq_num_ = 0;
+ sigma_duration_squared_ = 0;
+}
+
+static int compare_two_longs (const void * long1, const void * long2)
+{
+ return (int)((*(ACE_UINT64*)long1 - *(ACE_UINT64*)long2));
+}
+
+void
+calc_results()
+{
+ // Sort all duration times.
+ qsort(duration_times_,
+ count_,
+ sizeof(ACE_UINT64),
+ compare_two_longs);
+
+ // Show latency_50_percentile, latency_90_percentile,
+ // latency_99_percentile and latency_99.99_percentile.
+ // For example duration_times[per50] is the median i.e. 50% of the
+ // samples have a latency time <= duration_times[per50]
+ int per50 = count_/2;
+ int per90 = (int)(count_ * 0.90);
+ int per99 = (int)(count_ * 0.990);
+ int per9999 = (int)(count_ * 0.9999);
+
+ double avg = 0;
+ double roundtrip_time_std = 0;
+ if (count_ > 0)
+ {
+ avg = (double)(tv_total_ / count_);
+ // Calculate standard deviation.
+ roundtrip_time_std = sqrt(
+ (sigma_duration_squared_ / (double)count_) -
+ (avg * avg));
+ }
+
+ // Show values as float, in order to be comparable with RTI performance test.
+ if (count_ > 0)
+ {
+ if (datalen_idx_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Collecting statistics on %d samples per message size.\n"
+ "This is the roundtrip time, *not* the one-way-latency\n"
+ "Clock overhead %d\n"
+ "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%,"
+ " max us\n"
+ "------,-------,-------,-------,-------,-------,-------,-------,"
+ "-------\n", count_, clock_overhead_));
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n",
+ datalen_,
+ roundtrip_time_std,
+ avg,
+ (double)tv_min_,
+ (double)duration_times_[per50-1],
+ (double)duration_times_[per90-1],
+ (double)duration_times_[per99-1],
+ (double)duration_times_[per9999-1],
+ (double)tv_max_));
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n "
+ "No samples reveived back.\n"));
+ }
+}
+
+
+void
+write_one (void)
+{
+ if ((number_of_msg_ == 0) && (datalen_idx_ == 0))
+ {
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (start_time_test_);
+ }
+ // First message sent always, next messages only as previous sent message
+ // is received back.
+ if ((number_of_msg_ == 0) || received_)
+ {
+ // All messages send, stop timer.
+ if ((iterations_ != 0) &&
+ (number_of_msg_ >= iterations_ ))
+ {
+ if (datalen_idx_ >= (nr_of_runs_ - 1))
+ {
+ stop();
+ calc_results();
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (end_time_test_);
+ ACE_Reactor::instance ()->end_reactor_event_loop ();
+ }
+ else
+ {
+ calc_results();
+ reset_results();
+ ++datalen_idx_;
+ datalen_ = datalen_range_[datalen_idx_];
+ instance_->data.length (datalen_);
+ }
+ }
+ else
+ {
+ try
+ {
+ instance_->seq_num = number_of_msg_;
+ // Keep last sent seq_num, to control if message is sent back.
+ seq_num_ = number_of_msg_;
+ received_ = false;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (start_time_);
+ test_data_writer_->write (*instance_, DDS_HANDLE_NIL);
+ }
+ catch (const CORBA::Exception& )
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ")
+ ACE_TEXT ("while writing sample with sequence_number <%u>.\n"),
+ instance_->seq_num));
+ }
+ ++number_of_msg_;
+ }
+ }
+}
+
+void start (void)
+{
+ ticker_ = new WriteTicker();
+
+ // This->sleep_ is in ms
+ unsigned int sec = sleep_/1000;
+ unsigned int usec = (sleep_ % 1000) * 1000;
+ if (ACE_Reactor::instance ()->schedule_timer (
+ ticker_,
+ 0,
+ ACE_Time_Value (5, 0),
+ ACE_Time_Value (sec, usec)) == -1)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("start : ")
+ ACE_TEXT ("Error scheduling timer")));
+ }
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+}
+
+void
+read (LatencyTest & an_instance, ACE_UINT64 receive_time)
+{
+ if (an_instance.seq_num == seq_num_)
+ {
+ record_time (receive_time);
+ received_ = true;
+ }
+}
+
+int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
+{
+ DDS_ReturnCode_t retcode;
+ ::DDS::DataReader * data_reader = 0;
+ ::DDS::DataReader * dum_data_reader = 0;
+
+ HelloListener listener;
+ DummyListener dum_listener;
+ const char * type_name = 0;
+ int main_result = 1; /* error by default */
+
+ ::DDS::Topic * receive_topic = 0;
+ ::DDS::Topic * send_topic = 0;
+ ::DDS::DataWriter * data_writer = 0;
+ ::DDS::DataWriter * dum_data_writer = 0;
+
+ ACE_Env_Value<int> id (ACE_TEXT("DDS4CCM_DEFAULT_DOMAIN_ID"), domain_id_);
+ domain_id_ = id;
+
+ if (parse_args (argc, argv) != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Error arguments.\n")));
+ return 1;
+ }
+
+ (void) ACE_High_Res_Timer::global_scale_factor ();
+ ACE_Reactor::instance ()->timer_queue()->gettimeofday (&ACE_High_Res_Timer::gettimeofday_hr);
+
+ /* Create the domain participant */
+ DDSDomainParticipant * participant =
+ DDSDomainParticipantFactory::get_instance()->
+ create_participant_with_profile(
+ domain_id_,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!participant)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Sender : Unable to create domain participant.\n")));
+ goto clean_exit;
+ }
+
+ /* Register type before creating topic */
+ type_name = LatencyTestTypeSupport::get_type_name();
+ retcode = LatencyTestTypeSupport::register_type (participant,
+ type_name);
+ if (retcode != DDS_RETCODE_OK)
+ {
+ goto clean_exit;
+ }
+
+ send_topic = participant->create_topic_with_profile (
+ "send",
+ type_name,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!send_topic)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
+ goto clean_exit;
+ }
+
+ receive_topic = participant->create_topic_with_profile (
+ "receive",
+ type_name,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!receive_topic) {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
+ goto clean_exit;
+ }
+ /* Create the data writer using the default publisher */
+ data_writer = participant->create_datawriter_with_profile(
+ send_topic,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!data_writer)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data writer.\n")));
+ goto clean_exit;
+ }
+
+ /* Create a data reader, which will not be used, but is there for
+ * compatibility with DDS4CCM latency test, where there is always a
+ * reader and a writer per connector.
+ */
+ if (both_read_write_)
+ {
+ dum_data_reader = participant->create_datareader_with_profile(
+ send_topic,
+ lib_name_,
+ prof_name_,
+ &dum_listener,
+ DDS_DATA_AVAILABLE_STATUS);
+
+ if (!dum_data_reader )
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create dummy data reader.\n")));
+ goto clean_exit;
+ }
+ }
+
+ data_reader = participant->create_datareader_with_profile(
+ receive_topic,
+ lib_name_,
+ prof_name_,
+ &listener,
+ DDS_DATA_AVAILABLE_STATUS);
+ if (!data_reader)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data reader.\n")));
+ goto clean_exit;
+ }
+
+ /* Create a data writer, which will not be used, but is there for
+ * compatibility with DDS4CCM latency test, where there is always a
+ * reader and a writer per connector
+ */
+ if (both_read_write_)
+ {
+ dum_data_writer = participant->create_datawriter_with_profile(
+ receive_topic,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!dum_data_writer)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create dummy data writer.\n")));
+ goto clean_exit;
+ }
+ }
+
+ /* Create data sample for writing */
+ instance_ = LatencyTestTypeSupport::create_data ();
+ if (instance_ == 0)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data sample.\n")));
+ goto clean_exit;
+ }
+
+ init_values();
+
+ test_data_writer_ = LatencyTestDataWriter::narrow (data_writer);
+ if (!test_data_writer_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("LatencyTestDataWriter_narrow failed.\n")));
+ goto clean_exit;
+ }
+
+ // Sleep a couple seconds to allow discovery to happen
+ ACE_OS::sleep (5);
+
+ // handle writing of messages
+ start();
+
+ /* --- Clean Up --- */
+ ACE_OS::sleep (5);
+ main_result = 0;
+
+clean_exit:
+ const char * read_write_str;
+ if (both_read_write_)
+ {
+ read_write_str = "Used a extra dummy reader and writer per topic.";
+ }
+ else
+ {
+ read_write_str = "Used a reader for one topic and a writer for other topic.";
+ }
+
+ if((nr_of_runs_ -1) != datalen_idx_)
+ {
+ ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER : %u of %u runs completed.\n"
+ " Number of messages sent of last run (%u): %u\n"
+ "%C\n\n",
+ datalen_idx_,
+ nr_of_runs_,
+ datalen_idx_ + 1,
+ number_of_msg_,
+ read_write_str));
+ }
+ else
+ {
+ ACE_UINT64 test_time_usec = end_time_test_ - start_time_test_;
+
+ double sec = (double)test_time_usec / (1000 * 1000);
+ ACE_DEBUG ((LM_DEBUG, "TEST successful, number of runs (%u) of "
+ "%u messages in %3.3f seconds.\n"
+ "%C\n\n",
+ nr_of_runs_,
+ number_of_msg_,
+ sec,
+ read_write_str));
+ }
+ if (participant)
+ {
+ retcode = participant->delete_contained_entities ();
+ if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
+ main_result = 1;
+ }
+ retcode = DDSDomainParticipantFactory::get_instance()->
+ delete_participant (participant);
+ if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
+ main_result = 1;
+ }
+ }
+ return main_result;
+}
+
+void HelloListener::on_data_available(DDSDataReader *reader)
+{
+ LatencyTestDataReader * test_reader =
+ LatencyTestDataReader::narrow (reader);
+ if (!test_reader)
+ {
+ /* In this specific case, this will never fail */
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("LatencyTestDataReader::narrow failed.\n")));
+ return;
+ }
+
+ /* Loop until there are messages available in the queue */
+ for(;;)
+ {
+ ::DDS::SampleInfoSeq info;
+ ::LatencyTestRTISeq sample_req;
+ ::DDS::ReturnCode_t const retcode = test_reader->take(sample_req, info);
+ if (retcode == DDS_RETCODE_NO_DATA)
+ {
+ /* No more samples */
+ break;
+ }
+ else if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Unable to take data from data reader,"
+ " error %d.\n"),
+ retcode));
+ return;
+ }
+ for (::DDS_Long i = 0; i < sample_req.length (); ++i)
+ {
+ if (info[i].valid_data)
+ {
+ ACE_UINT64 receive_time = 0;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (receive_time);
+ read(sample_req[i], receive_time);
+ }
+ }
+ (void) test_reader->return_loan (sample_req, info);
+ }
+}
+
+//============================================================
+// WriteTickerHandler
+//============================================================
+WriteTicker::WriteTicker ()
+{
+}
+
+int
+WriteTicker::handle_timeout (const ACE_Time_Value &, const void *)
+{
+ write_one();
+ return 0;
+}