diff options
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Receiver/Throughput_Receiver.cpp')
-rw-r--r-- | modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Receiver/Throughput_Receiver.cpp | 395 |
1 files changed, 395 insertions, 0 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Receiver/Throughput_Receiver.cpp b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Receiver/Throughput_Receiver.cpp new file mode 100644 index 00000000000..71fe2a8db1d --- /dev/null +++ b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSThroughput/DDS_Receiver/Throughput_Receiver.cpp @@ -0,0 +1,395 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "tao/ORB_Core.h" +#include "ace/Env_Value_T.h" +#include "ace/High_Res_Timer.h" +#include "Throughput_Base.h" +#include "Throughput_BaseSupport.h" +#include "Throughput_BasePlugin.h" + +#include <ndds/ndds_namespace_cpp.h> + +bool shutdown_flag = false; + +CORBA::LongLong count_ = 0; // total count of all received messages +ACE_UINT64 interval_time_ = 0; +CORBA::LongLong interval_messages_received_ = 0; +CORBA::LongLong interval_bytes_received_ = 0; +CORBA::Long interval_data_length_ = 0; +CORBA::UShort run_ = 0; +ACE_UINT64 first_time_ = 0; +CORBA::LongLong messages_lost_ = 0; +CORBA::Boolean logres = false; +CORBA::ULongLong seq_num_ = 0; +CORBA::LongLong demand_ = 0; + +ThroughputTest *instance =0; +::DDS::Topic *topic = 0; +::DDS::Topic *cmd_topic = 0; + +const char *lib_name = "HelloTest_Library"; +const char *cmd_prof_name = "ThroughputCmdQoS"; +const char *prof_name = "ThroughputQoS"; +const char *part_name = "ThroughputPartQoS"; + +CORBA::UShort domain_id = 0; + + int + parse_args (int argc, ACE_TCHAR *argv[]) + { + ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("d:O")); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'd': + domain_id = ACE_OS::atoi (get_opts.opt_arg ()); + break; + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-d <domain_id >" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; + } + +/* The listener of events and data from the middleware */ +class HelloListener: public ::DDS::DataReaderListener { +public: + void on_data_available(::DDS::DataReader *reader); +}; + +/* The listener of events and command data from the middleware */ +class CmdListener: public ::DDS::DataReaderListener { +public: + void on_data_available(::DDS::DataReader *reader); +}; + + void + show_results() + { + if ((count_ > 0) && (interval_time_ > 0)) + { + double per_sec = (double)1000000/ interval_time_; + double mbps = (interval_bytes_received_* per_sec)* (8.0/1000.0/1000.0); + + if(run_ == 1) + { + ACE_DEBUG((LM_DEBUG, + " bytes, demand, samples,sample/s, Mbit/s,lost samples\n" + "------,-------,-------,--------,-------,------------\n" + "%6u,%7q,%7q,%7.1f,%7.1f,%7q\n", + interval_data_length_, + demand_, + interval_messages_received_, + interval_messages_received_* per_sec, + mbps, + messages_lost_)); + } + else + { + ACE_DEBUG((LM_DEBUG,"%6u,%7q,%7q,%7.1f,%7.1f,%7q\n", + interval_data_length_, + demand_, + interval_messages_received_, + interval_messages_received_* per_sec, + mbps, + messages_lost_)); + } + } + } + + void + reset_results() + { + interval_messages_received_ = 0; + interval_bytes_received_ = 0; + interval_time_=0; + seq_num_ = 0; + messages_lost_= 0; + } + + void + handle_run(ThroughputCommand & an_instance) + { + if( an_instance.command == THROUGHPUT_COMMAND_START) + { + logres = true; + reset_results(); + interval_data_length_ = an_instance.data_length; + demand_ = an_instance.current_publisher_effort; + ACE_High_Res_Timer::gettimeofday_hr ().to_usec (first_time_); + } + if( an_instance.command == THROUGHPUT_COMMAND_COMPLETE) + { + logres = false; + ACE_UINT64 last_time; + ACE_High_Res_Timer::gettimeofday_hr ().to_usec (last_time); + interval_time_ = (last_time - first_time_); + ++run_; + show_results(); + if(an_instance.current_publisher_effort == + an_instance.final_publisher_effort) + { + shutdown_flag = true; + } + } + } + + void + record_data (ThroughputTest & an_instance) + { + ++count_; // total count of all received messages + if(logres == true) + { + ++interval_messages_received_; + interval_bytes_received_ += interval_data_length_; + if (an_instance.seq_num != seq_num_) + { + ++messages_lost_; + /* Reset sequence number */ + seq_num_ = an_instance.seq_num; + } + ++seq_num_; + } + } + + int ACE_TMAIN(int argc, ACE_TCHAR** argv) + { + ::DDS::ReturnCode_t retcode; + HelloListener listener; + CmdListener cmd_listener; + int main_result = 1; /* error by default */ + ::DDS::DataReader *data_reader = 0; + ::DDS::DataReader *cmd_data_reader = 0; + const char * type_name_cmd = 0; + const char * type_name = 0; + + ACE_Env_Value<int> id (ACE_TEXT("DDS4CCM_DEFAULT_DOMAIN_ID"), domain_id); + domain_id = id; + + if (parse_args (argc, argv) != 0) + return 1; + + /* Create the domain participant on domain ID 0 */ + ::DDS::DomainParticipant *participant = + ::DDS::DomainParticipantFactory::get_instance()-> + create_participant_with_profile( + domain_id, /* Domain ID */ + lib_name, + part_name, /* QoS */ + 0, /* Listener */ + DDS_STATUS_MASK_NONE); + if (!participant) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unable to create domain participant.\n"))); + goto clean_exit; + } + /* Register type before creating topic */ + type_name = ThroughputTestTypeSupport::get_type_name(); + retcode = ThroughputTestTypeSupport::register_type(participant, type_name); + if (retcode != DDS_RETCODE_OK) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unable to register topic type.\n"))); + goto clean_exit; + } + topic = participant->create_topic( + "Test data", /* Topic name*/ + type_name, /* Type name */ + DDS_TOPIC_QOS_DEFAULT, /* Topic QoS */ + 0, /* Listener */ + DDS_STATUS_MASK_NONE); + if (!topic) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n"))); + goto clean_exit; + } + /* Register type before creating command topic */ + type_name_cmd = ThroughputCommandTypeSupport::get_type_name(); + retcode = ThroughputCommandTypeSupport::register_type( + participant, type_name_cmd); + if (retcode != DDS_RETCODE_OK) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unable to register command topic type.\n"))); + goto clean_exit; + } + /* Create the topic "Hello, World" for the String type */ + cmd_topic = participant->create_topic( + "Command data", /* Topic name*/ + type_name_cmd, /* Type name */ + DDS_TOPIC_QOS_DEFAULT, /* Topic QoS */ + 0, /* Listener */ + DDS_STATUS_MASK_NONE); + if (!cmd_topic) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create cmd_topic.\n"))); + goto clean_exit; + } + /* Create the data reader using the default publisher */ + data_reader = participant->create_datareader_with_profile( + topic, + lib_name, + prof_name, /* QoS */ + &listener, /* Listener */ + DDS_DATA_AVAILABLE_STATUS); + /* Create the command data reader using the default publisher */ + cmd_data_reader = participant->create_datareader_with_profile( + cmd_topic, + lib_name, + cmd_prof_name, /* QoS */ + &cmd_listener, /* Listener */ + DDS_DATA_AVAILABLE_STATUS); + if (!data_reader || !cmd_data_reader) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data reader.\n"))); + goto clean_exit; + } + + /* --- Sleep During Asynchronous Reception ---------------------------- */ + + /* This thread sleeps forever. When a sample is received, RTI Data + * Distribution Service will call the on_data_available_callback function. + */ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ready to read data.\n"))); + for (;;) + { + ACE_OS::sleep (1); + if(shutdown_flag) + { + break; + } + } + + /* --- Clean Up ------------------------------------------------------- */ + + main_result = 0; +clean_exit: + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Exiting."))); + if (count_ > 0) + { + ACE_DEBUG((LM_DEBUG, "SUMMARY RECEIVER:\n " + "Number of samples received: %u \n", + count_)); + } + else + { + ACE_DEBUG((LM_DEBUG, "SUMMARY RECEIVER:\n " + "No samples received\n ")); + } + if (participant) + { + retcode = participant->delete_contained_entities(); + if (retcode != DDS_RETCODE_OK) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n"))); + main_result = 1; + } + retcode = ::DDS::DomainParticipantFactory::get_instance()-> + delete_participant(participant); + if (retcode != DDS_RETCODE_OK) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n"))); + main_result = 1; + } + } + + return main_result; + } + + /* This method gets called back by DDS when one or more data samples + * have been received. + */ + void HelloListener::on_data_available(::DDS::DataReader *reader) + { + /* Perform a safe type-cast from a generic data reader into a + * specific data reader for the type "ThroughputTestDataReader" + */ + ThroughputTestDataReader * test_reader = + ThroughputTestDataReader::narrow(reader); + ThroughputTest *instance = new ThroughputTest; + if (!test_reader) + { + /* In this specific case, this will never fail */ + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("::DDS::StringDataReader::narrow failed.\n"))); + return; + } + + /* Loop until there are messages available in the queue */ + for(;;) + { + ::DDS::SampleInfo info; + ::DDS::ReturnCode_t retcode = test_reader->take_next_sample(*instance, + info); + if (retcode == DDS_RETCODE_NO_DATA) + { + /* No more samples */ + break; + } + else if (retcode != DDS_RETCODE_OK) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unable to take data from data reader," + " error %d.\n"), + retcode)); + return; + } + if (info.valid_data) + { + record_data(*instance); + } + } + } + /* This method gets called back by DDS when one or more data samples + * have beenreceived. + */ + void CmdListener::on_data_available(::DDS::DataReader *reader) + { + ThroughputCommandDataReader * cmd_reader = + ThroughputCommandDataReader::narrow(reader); + ThroughputCommand *instance = new ThroughputCommand; + + if (!cmd_reader) + { + /* In this specific case, this will never fail */ + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("::DDS::StringDataReader::narrow failed.\n"))); + return; + } + + /* Loop until there are messages available in the queue */ + for(;;) + { + ::DDS::SampleInfo info; + ::DDS::ReturnCode_t retcode = cmd_reader->take_next_sample( + *instance, + info); + if (retcode == DDS_RETCODE_NO_DATA) + { + /* No more samples */ + break; + } + else if (retcode != DDS_RETCODE_OK) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unable to take data from data reader," + " error %d.\n"), retcode)); + return; + } + if (info.valid_data) + { + handle_run(*instance); + } + } + } |