summaryrefslogtreecommitdiff
path: root/modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Sender/Throughput_Sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Sender/Throughput_Sender.cpp')
-rw-r--r--modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Sender/Throughput_Sender.cpp319
1 files changed, 319 insertions, 0 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Sender/Throughput_Sender.cpp b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Sender/Throughput_Sender.cpp
new file mode 100644
index 00000000000..8b1eac4715f
--- /dev/null
+++ b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Sender/Throughput_Sender.cpp
@@ -0,0 +1,319 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Env_Value_T.h"
+#include "tao/ORB_Core.h"
+#include "Throughput_Base.h"
+#include "Throughput_BaseSupport.h"
+#include "Throughput_BasePlugin.h"
+
+#include <ndds/ndds_namespace_cpp.h>
+
+CORBA::UShort datalen = 1024;
+CORBA::UShort recover_time = 1; // in sec
+CORBA::ULong start_load = 1000;
+CORBA::ULong incr_load = 1000;
+CORBA::ULongLong max_load = 9000;
+CORBA::ULongLong number_of_msg = 0;
+ACE_UINT64 duration_run_ = 10; // in sec
+
+CORBA::ULongLong load = 0;
+ACE_UINT64 start_time = 0;
+
+ThroughputTest *instance = 0;
+ThroughputCommand *instance_cmd = 0;
+::DDS::Topic *topic = 0;
+::DDS::Topic *cmd_topic = 0;
+::DDS::DataWriter *data_writer = 0;
+::DDS::DataWriter *cmd_writer = 0;
+ThroughputTestDataWriter *test_data_writer = 0;
+ThroughputCommandDataWriter *cmd_data_writer = 0;
+
+const char *lib_name = "HelloTest_Library";
+const char *cmd_prof_name = "ThroughputCmdQoS";
+const char *prof_name = "ThroughputQoS";
+const char *part_name = "ThroughputPartQoS";
+
+CORBA::UShort domain_id = 0;
+
+ int
+ parse_args (int argc, ACE_TCHAR *argv[])
+ {
+ ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("d:l:r:s:m:i:O"));
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'd':
+ domain_id = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 'l':
+ datalen = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 'r':
+ recover_time = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 's':
+ start_load = ACE_OS::atol (get_opts.opt_arg ());
+ break;
+ case 'i':
+ incr_load = ACE_OS::atol (get_opts.opt_arg ());
+ break;
+ case 'm':
+ max_load = ACE_OS::atol (get_opts.opt_arg ());
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-d <domain_id >"
+ "-l <datalen >"
+ "-r <recover_times>"
+ "-s <start_load>"
+ "-i <incr_load>"
+ "-m <max_load>"
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+ }
+
+ bool
+ write(void)
+ {
+ CORBA::Boolean test_complete = false;
+ load += incr_load;
+ if ( load > max_load)
+ {
+ return false;
+ }
+ else
+ {
+ instance_cmd->command = THROUGHPUT_COMMAND_START;
+ instance_cmd->data_length = datalen;
+ instance_cmd->current_publisher_effort = load;
+ instance_cmd->final_publisher_effort = max_load;
+ try
+ {
+ cmd_data_writer->write(*instance_cmd,::DDS::HANDLE_NIL);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ")
+ ACE_TEXT ("while writing command.\n")));
+ }
+ test_complete = false;
+ // get start time
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (start_time);
+ while (!test_complete)
+ {
+ for (CORBA::ULongLong current_load = 0;
+ current_load < load && !test_complete;
+ ++current_load, ++instance->seq_num)
+ {
+ try
+ {
+ test_data_writer->write(*instance,::DDS::HANDLE_NIL);
+ ++number_of_msg;
+ }
+ catch (const CORBA::Exception&)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("ERROR: Internal Error ")
+ ACE_TEXT ("while updating writer "
+ "info for <%q>.\n"),
+ number_of_msg));
+ test_complete= true;
+ }
+ }
+ ACE_UINT64 end_time;
+ ACE_High_Res_Timer::gettimeofday_hr ().to_usec (end_time);
+ ACE_UINT64 interval = end_time - start_time;
+ if(interval > (duration_run_ * 1000 * 1000))
+ {
+ test_complete = true;
+ instance_cmd->command = THROUGHPUT_COMMAND_COMPLETE;
+ cmd_data_writer->write ( *instance_cmd, ::DDS::HANDLE_NIL);
+ }
+ if (!test_complete)
+ {
+ ACE_OS::sleep (recover_time);
+ }
+ }
+ return true;
+ }
+ }
+
+ int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
+ {
+ ::DDS::ReturnCode_t retcode;
+ long overhead_size = 0;
+ const char * type_name_cmd = 0;
+ const char * type_name = 0;
+
+ int main_result = 1; /* error by default */
+
+ ACE_Env_Value<int> id (ACE_TEXT("DDS4CCM_DEFAULT_DOMAIN_ID"), domain_id);
+ domain_id = id;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ /* Create the domain participant */
+ ::DDS::DomainParticipant *participant =
+ ::DDS::DomainParticipantFactory::get_instance()->
+ create_participant_with_profile(
+ domain_id, /* Domain ID */
+ lib_name,
+ part_name, /* QoS */
+ 0, /* Listener */
+ DDS_STATUS_MASK_NONE);
+ if (!participant) {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Unable to create domain participant.\n")));
+ goto clean_exit;
+ }
+
+ /* Register type before creating topic */
+ type_name = ThroughputTestTypeSupport::get_type_name();
+ retcode = ThroughputTestTypeSupport::register_type(
+ participant, type_name);
+ if (retcode != DDS_RETCODE_OK)
+ {
+ goto clean_exit;
+ }
+ topic = participant->create_topic(
+ "Test data", /* Topic name*/
+ type_name, /* Type name */
+ DDS_TOPIC_QOS_DEFAULT, /* Topic QoS */
+ 0, /* Listener */
+ DDS_STATUS_MASK_NONE);
+
+
+ if (!topic) {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
+ goto clean_exit;
+ }
+
+ /* Register type before creating topic */
+ type_name_cmd = ThroughputCommandTypeSupport::get_type_name();
+ retcode = ThroughputCommandTypeSupport::register_type(
+ participant, type_name_cmd);
+ if (retcode != DDS_RETCODE_OK)
+ {
+ goto clean_exit;
+ }
+ /* Create the topic "Command World" for the String type */
+ cmd_topic = participant->create_topic("Command data", /* Topic name*/
+ type_name_cmd , /* Type name */
+ DDS_TOPIC_QOS_DEFAULT,/* Topic QoS */
+ 0, /* Listener */
+ DDS_STATUS_MASK_NONE);
+ if (!topic)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
+ goto clean_exit;
+ }
+
+ /* Create the command writer using the default publisher */
+ cmd_writer = participant->create_datawriter_with_profile(
+ cmd_topic,
+ lib_name,
+ cmd_prof_name, /* QoS */
+ 0, /* Listener */
+ DDS_STATUS_MASK_NONE);
+ if (!cmd_writer)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Unable to create cmd data writer.\n")));
+ goto clean_exit;
+ }
+
+ /* Create the data writer using the default publisher */
+ data_writer = participant->create_datawriter_with_profile(
+ topic,
+ lib_name,
+ prof_name, /* QoS */
+ 0, /* Listener */
+ DDS_STATUS_MASK_NONE);
+ if (!data_writer)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data writer.\n")));
+ goto clean_exit;
+ }
+
+ /* Create data sample for writing */
+ instance = ThroughputTestTypeSupport::create_data();
+ if (instance == 0)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data sample.\n")));
+ goto clean_exit;
+ }
+ overhead_size = sizeof(CORBA::ULong) + sizeof(CORBA::ULongLong);
+ instance->key = 1;
+ instance->seq_num = 0;
+ //instance->data.maximum (MAX_DATA_SEQUENCE_LENGTH);
+ instance->data.length(datalen - overhead_size);
+
+ /* Create data sample for writing */
+ instance_cmd = ThroughputCommandTypeSupport::create_data();
+ if (instance_cmd == 0)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create command sample.\n")));
+ goto clean_exit;
+ }
+
+ /* Perform a safe type-cast from a generic data writer into a
+ * specific data writer for the types "ThroughputTestDataWriter"
+ * and "ThroughputCommandDataWriter"
+ */
+ test_data_writer = ThroughputTestDataWriter::narrow(data_writer);
+ cmd_data_writer = ThroughputCommandDataWriter::narrow(cmd_writer);
+ if (!test_data_writer || !cmd_data_writer)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("DDS_StringDataWriter_narrow failed.\n")));
+ goto clean_exit;
+ }
+
+ // Sleep a couple seconds to allow discovery to happen
+ ACE_OS::sleep (1);
+
+ /* --- Write Data ----------------------------------------------------- */
+ for(CORBA::ULong i = start_load; i < (max_load + incr_load); i+= incr_load)
+ {
+ write();
+ ACE_OS::sleep (5);
+ }
+
+ /* --- Clean Up ------------------------------------------------------- */
+ ACE_OS::sleep (5);
+
+ main_result = 0;
+clean_exit:
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Exiting.")));
+ ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER number of messages sent: %Q\n",
+ (number_of_msg)));
+ if (participant)
+ {
+ retcode = participant->delete_contained_entities();
+ if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
+ main_result = 1;
+ }
+ retcode = ::DDS::DomainParticipantFactory::get_instance()->
+ delete_participant(participant);
+ if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
+ main_result = 1;
+ }
+ }
+ return main_result;
+}