summaryrefslogtreecommitdiff
path: root/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Receiver/Latency_Receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Receiver/Latency_Receiver.cpp')
-rw-r--r--modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Receiver/Latency_Receiver.cpp355
1 files changed, 355 insertions, 0 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Receiver/Latency_Receiver.cpp b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Receiver/Latency_Receiver.cpp
new file mode 100644
index 00000000000..a58a38193c6
--- /dev/null
+++ b/modules/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Receiver/Latency_Receiver.cpp
@@ -0,0 +1,355 @@
+// $Id$
+
+#include "ace/Get_Opt.h"
+#include "tao/ORB_Core.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Env_Value_T.h"
+#include "ace/Tokenizer_T.h"
+#include "Latency_Base.h"
+#include "Latency_BaseSupport.h"
+#include "Latency_BasePlugin.h"
+
+#include <ndds/ndds_namespace_cpp.h>
+
+bool shutdown_flag_ = false;
+
+CORBA::LongLong count_ = 0; // total count of all received messages
+
+LatencyTest * instance_ =0;
+LatencyTestDataWriter * test_data_writer_ = 0;
+
+const char * lib_name_ = 0;
+const char * prof_name_ = 0;
+
+CORBA::UShort domain_id_ = 0;
+CORBA::Boolean both_read_write_ = false;
+
+void
+split_qos (const char * qos)
+{
+ char* buf = const_cast <char *> (qos);
+ ACE_Tokenizer_T<char> tok (buf);
+ tok.delimiter_replace ('#', 0);
+ for (char *p = tok.next (); p; p = tok.next ())
+ {
+ if (!lib_name_)
+ {
+ lib_name_ = p;
+ }
+ else if (!prof_name_)
+ {
+ prof_name_ = p;
+ }
+ }
+ ACE_DEBUG ((LM_DEBUG, "Receiver : Found QoS profile %C %C\n",
+ lib_name_,
+ prof_name_));
+}
+
+int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("d:b:q:O"));
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'b':
+ both_read_write_ = true;
+ break;
+ case 'd':
+ domain_id_ = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 'q':
+ {
+ const char * qos = get_opts.opt_arg ();
+ split_qos (qos);
+ }
+ break;
+ case '?':
+ default:
+ printf("c = <%c>\n",c);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage:\n\n"
+ " -d <domain_id>\n"
+ " -q <QoS profile>\n"
+ " -b use both a writer and reader per topic.\n"
+ "\n"),
+ -1);
+ }
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+/* The listener of events and data from the middleware */
+class HelloListener: public ::DDS::DataReaderListener {
+public:
+ void on_data_available(::DDS::DataReader *reader);
+};
+
+/* The dummy listener of events and data from the middleware */
+class DummyListener: public ::DDS::DataReaderListener {
+};
+
+void
+write_back (LatencyTest & an_instance)
+{
+ ++count_;
+
+ try
+ {
+ test_data_writer_->write(an_instance, ::DDS::HANDLE_NIL);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception caught:");
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ")
+ ACE_TEXT ("while writing ping back.\n")));
+ }
+}
+
+int ACE_TMAIN(int argc, ACE_TCHAR** argv)
+{
+ ::DDS::ReturnCode_t retcode;
+ HelloListener listener;
+ DummyListener dum_listener;
+ ::DDS::DataReader *data_reader = 0;
+ ::DDS::DataReader *dum_data_reader = 0;
+ const char * type_name = 0;
+ ::DDS::Topic * send_topic = 0;
+ ::DDS::Topic * receive_topic = 0;
+ ::DDS::DataWriter * data_writer = 0;
+ ::DDS::DataWriter * dum_data_writer = 0;
+
+ ACE_Env_Value<int> id (ACE_TEXT("DDS4CCM_DEFAULT_DOMAIN_ID"), domain_id_);
+ domain_id_ = id;
+
+ int main_result = 1; /* error by default */
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ /* Create the domain participant on domain ID 0 */
+ ::DDS::DomainParticipant *participant =
+ ::DDS::DomainParticipantFactory::get_instance()->
+ create_participant_with_profile(
+ domain_id_,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!participant)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Receiver : Unable to create domain participant.\n")));
+ goto clean_exit;
+ }
+ /* Register type before creating topic */
+ type_name = LatencyTestTypeSupport::get_type_name ();
+ retcode = LatencyTestTypeSupport::register_type (participant, type_name);
+ if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Unable to register topic type.\n")));
+ goto clean_exit;
+ }
+
+ send_topic = participant->create_topic_with_profile (
+ "send",
+ type_name,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!send_topic)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
+ goto clean_exit;
+ }
+
+ receive_topic = participant->create_topic_with_profile (
+ "receive",
+ type_name,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!receive_topic)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
+ goto clean_exit;
+ }
+
+ /* Create the data reader using the default publisher */
+ data_reader = participant->create_datareader_with_profile(
+ send_topic,
+ lib_name_,
+ prof_name_,
+ &listener,
+ DDS_DATA_AVAILABLE_STATUS);
+
+ if (!data_reader )
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data reader.\n")));
+ goto clean_exit;
+ }
+
+ /* Create a data writer, which will not be used, but is there for
+ * compatibility with DDS4CCM latency test, where there is always a
+ * reader and a writer per connector
+ */
+ if (both_read_write_)
+ {
+ dum_data_writer = participant->create_datawriter_with_profile(
+ send_topic,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!dum_data_writer)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data writer.\n")));
+ goto clean_exit;
+ }
+ }
+ /* Create the data writer using the default publisher */
+ data_writer = participant->create_datawriter_with_profile(
+ receive_topic,
+ lib_name_,
+ prof_name_,
+ 0,
+ DDS_STATUS_MASK_NONE);
+ if (!data_writer)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data writer.\n")));
+ goto clean_exit;
+ }
+
+ /* Create a data reader, which will not be used, but is there for
+ * compatibility with DDS4CCM latency test, where there is always a
+ * reader and a writer per connector.
+ */
+ if (both_read_write_)
+ {
+ dum_data_reader = participant->create_datareader_with_profile(
+ receive_topic,
+ lib_name_,
+ prof_name_,
+ &dum_listener,
+ DDS_DATA_AVAILABLE_STATUS);
+
+ if (!dum_data_reader )
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data reader.\n")));
+ goto clean_exit;
+ }
+ }
+
+ test_data_writer_ = LatencyTestDataWriter::narrow (data_writer);
+ if (!test_data_writer_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("DDS_StringDataWriter_narrow failed.\n")));
+ goto clean_exit;
+ }
+
+ /* --- Sleep During Asynchronous Reception ---------------------------- */
+
+ /* This thread sleeps forever. When a sample is received, RTI Data
+ * Distribution Service will call the on_data_available_callback function.
+ */
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ready to read data.\n")));
+ for (;;)
+ {
+ ACE_OS::sleep (1);
+ if (shutdown_flag_)
+ {
+ break;
+ }
+ }
+
+ /* --- Clean Up ------------------------------------------------------- */
+
+ main_result = 0;
+clean_exit:
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Exiting.")));
+ if (count_ > 0)
+ {
+ ACE_DEBUG((LM_DEBUG, "SUMMARY RECEIVER:\n "
+ "Number of samples received: %u \n",
+ count_));
+ }
+ else
+ {
+ ACE_DEBUG((LM_DEBUG, "SUMMARY RECEIVER:\n "
+ "No samples received\n "));
+ }
+ if (participant)
+ {
+ retcode = participant->delete_contained_entities ();
+ if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
+ main_result = 1;
+ }
+ retcode = ::DDS::DomainParticipantFactory::get_instance ()->
+ delete_participant(participant);
+ if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
+ main_result = 1;
+ }
+ }
+
+ return main_result;
+}
+
+/* This method gets called back by DDS when one or more data samples
+ * have been received.
+ */
+void HelloListener::on_data_available(::DDS::DataReader *reader)
+{
+ /* Perform a safe type-cast from a generic data reader into a
+ * specific data reader for the type "LatencyTestDataReader"
+ */
+ LatencyTestDataReader * test_reader =
+ LatencyTestDataReader::narrow (reader);
+ if (!test_reader)
+ {
+ /* In this specific case, this will never fail */
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("::DDS::StringDataReader::narrow failed.\n")));
+ return;
+ }
+
+ /* Loop until there are messages available in the queue */
+ for(;;)
+ {
+ ::DDS::SampleInfoSeq info;
+ ::LatencyTestRTISeq sample_req;
+ ::DDS::ReturnCode_t const retcode = test_reader->take(sample_req, info);
+ if (retcode == DDS_RETCODE_NO_DATA)
+ {
+ /* No more samples */
+ break;
+ }
+ else if (retcode != DDS_RETCODE_OK)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Unable to take data from data reader,"
+ " error %d.\n"),
+ retcode));
+ return;
+ }
+ for (::DDS_Long i = 0; i < sample_req.length (); ++i)
+ {
+ if (info[i].valid_data)
+ {
+ write_back(sample_req[i]);
+ }
+ }
+ (void) test_reader->return_loan (sample_req, info);
+ }
+}