summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Latency
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Latency')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc27
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am121
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/README9
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/control.cpp184
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp347
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/ping.h77
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp360
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/pong.h83
-rwxr-xr-xTAO/orbsvcs/tests/AVStreams/Latency/run_test.pl61
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/svc.conf5
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml9
11 files changed, 1283 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc b/TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc
new file mode 100644
index 00000000000..e7f0d165f23
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc
@@ -0,0 +1,27 @@
+// -*- MPC -*-
+// $Id$
+
+project(*ping): avstreamsexe, strategies {
+ exename = ping
+
+ Source_Files {
+ ping.cpp
+ }
+}
+
+project(*pong): avstreamsexe, strategies {
+ exename = pong
+
+ Source_Files {
+ pong.cpp
+ }
+}
+
+project(*cntl): avstreamsexe, strategies {
+ exename = control
+
+ Source_Files {
+ control.cpp
+ }
+}
+
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am
new file mode 100644
index 00000000000..20caef1661f
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am
@@ -0,0 +1,121 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ../bin/mwc.pl -type automake -noreldefs TAO.mwc
+
+ACE_BUILDDIR = $(top_builddir)/..
+ACE_ROOT = $(top_srcdir)/..
+TAO_BUILDDIR = $(top_builddir)
+TAO_ROOT = $(top_srcdir)
+
+noinst_PROGRAMS =
+
+## Makefile.AVS_Latency_Cntl.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS += control
+
+control_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs
+
+control_SOURCES = \
+ control.cpp \
+ ping.h \
+ pong.h
+
+control_LDADD = \
+ $(TAO_BUILDDIR)/tao/libTAO_Strategies.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.AVS_Latency_Ping.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS += ping
+
+ping_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs
+
+ping_SOURCES = \
+ ping.cpp \
+ ping.h
+
+ping_LDADD = \
+ $(TAO_BUILDDIR)/tao/libTAO_Strategies.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.AVS_Latency_Pong.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS += pong
+
+pong_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs
+
+pong_SOURCES = \
+ pong.cpp \
+ pong.h
+
+pong_LDADD = \
+ $(TAO_BUILDDIR)/tao/libTAO_Strategies.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/README b/TAO/orbsvcs/tests/AVStreams/Latency/README
new file mode 100644
index 00000000000..6a96f8cff60
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/README
@@ -0,0 +1,9 @@
+# $Id$
+
+ A simple latency test for the AVStreams pluggable protocol
+framework. Run as follows:
+
+$ ping -o ping.ior
+$ pong -o pong.ior
+$ control -f file://ping.ior -g file://pong.ior
+
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp
new file mode 100644
index 00000000000..07ce3c8e804
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp
@@ -0,0 +1,184 @@
+// $Id$
+
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/FlowSpec_Entry.h"
+#include "tao/PortableServer/PortableServer.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "tao/debug.h"
+#include "ace/Get_Opt.h"
+#include "ace/INET_Addr.h"
+
+ACE_RCSID (Latency,
+ ping,
+ "$Id$")
+
+const char *ping_ior = CORBA::string_dup ("file://ping.ior");
+const char *pong_ior = CORBA::string_dup ("file://pong.ior");
+const char *ping_address = CORBA::string_dup ("localhost:12345");
+const char *pong_address = CORBA::string_dup ("localhost:23456");
+const char *protocol = CORBA::string_dup ("UDP");
+
+int milliseconds = 30000;
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "f:g:s:r:t:p:d");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'f':
+ ping_ior = get_opts.opt_arg ();
+ break;
+
+ case 'g':
+ pong_ior = get_opts.opt_arg ();
+ break;
+
+ case 'r':
+ ping_address = get_opts.opt_arg ();
+ break;
+
+ case 's':
+ pong_address = get_opts.opt_arg ();
+ break;
+
+ case 't':
+ milliseconds = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'p':
+ protocol = get_opts.opt_arg ();
+ break;
+
+ case 'd':
+ TAO_debug_level++;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-f <ping_ior> "
+ "-g <ping_ior> "
+ "-s <ping_address> "
+ "-r <pong_address> "
+ "-t <milliseconds> "
+ "-p protocols "
+ "\n",
+ argv [0]),
+ -1);
+ }
+
+
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int main (int argc, char *argv[])
+{
+ ACE_TRY_NEW_ENV
+ {
+
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv);
+ parse_args (argc, argv);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var poa
+ = PortableServer::POA::_narrow (obj.in ());
+
+ PortableServer::POAManager_var mgr
+ = poa->the_POAManager ();
+
+ mgr->activate ();
+
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ poa.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Connect the two streams and run them...
+ AVStreams::flowSpec flow_spec (2);
+ flow_spec.length (2);
+
+ ACE_INET_Addr ping_addr;
+ ping_addr.set (ping_address);
+ TAO_Forward_FlowSpec_Entry ping ("Ping",
+ "IN",
+ "UNS:ping",
+ "",
+ protocol,
+ &ping_addr);
+ flow_spec[0] = CORBA::string_dup (ping.entry_to_string ());
+
+ ACE_INET_Addr pong_addr;
+ pong_addr.set (pong_address);
+ TAO_Forward_FlowSpec_Entry pong ("Pong",
+ "OUT",
+ "UNS:pong",
+ "",
+ protocol,
+ &pong_addr);
+ flow_spec[1] = CORBA::string_dup (pong.entry_to_string ());
+
+ TAO_StreamCtrl stream_control_impl;
+
+ AVStreams::StreamCtrl_var stream_control =
+ stream_control_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ obj = orb->string_to_object (ping_ior ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ AVStreams::MMDevice_var ping_sender =
+ AVStreams::MMDevice::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ obj = orb->string_to_object (pong_ior ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ AVStreams::MMDevice_var pong_sender =
+ AVStreams::MMDevice::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ AVStreams::streamQoS_var the_qos =
+ new AVStreams::streamQoS;
+
+ stream_control->bind_devs (pong_sender.in (),
+ ping_sender.in (),
+ the_qos.inout (),
+ flow_spec
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ flow_spec.length (0);
+ stream_control->start (flow_spec ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_Time_Value tv (100, 0);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
+ orb->shutdown (1 ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // flow_spec.length (0);
+ // stream_control->stop (flow_spec ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Caught exception:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp
new file mode 100644
index 00000000000..886394ae9f9
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp
@@ -0,0 +1,347 @@
+// $Id$
+
+#include "ping.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+#include "tao/ORB.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
+
+ACE_RCSID (Latency,
+ ping,
+ "$Id$")
+
+const char *ior_output_file = "ping.ior";
+const char *protocol = "RTP/UDP";
+int milliseconds = 100;
+int respond = 1;
+AVStreams::protocolSpec ping_protocols;
+AVStreams::protocolSpec pong_protocols;
+
+Pong_Send_Callback pong_callback;
+
+ACE_hrtime_t recv_base = 0;
+ACE_Throughput_Stats recv_latency;
+
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.opt_arg ();
+ break;
+
+ case 'r':
+ {
+ CORBA::ULong l = ping_protocols.length ();
+ ping_protocols.length (l + 1);
+ ping_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
+ }
+ break;
+
+ case 's':
+ {
+ CORBA::ULong l = pong_protocols.length ();
+ pong_protocols.length (l + 1);
+ pong_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
+ }
+ break;
+
+ case 't':
+ milliseconds = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'x':
+ respond = 0;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile> "
+ "-r <protocol=addr> "
+ "-s <protocol=addr> "
+ "-t <milliseconds> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+
+
+ // If no protocols are specified use the default...
+ if (ping_protocols.length () == 0)
+ {
+ ping_protocols.length (1);
+ ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345");
+ }
+
+ if (pong_protocols.length () == 0)
+ {
+ pong_protocols.length (1);
+ pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456");
+ }
+
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int main (int argc, char *argv[])
+{
+ ACE_TRY_NEW_ENV
+ {
+
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv);
+
+ parse_args (argc, argv);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var poa
+ = PortableServer::POA::_narrow (obj.in ());
+
+ PortableServer::POAManager_var mgr
+ = poa->the_POAManager ();
+
+ mgr->activate ();
+
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ poa.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Register the video mmdevice object with the ORB
+ Reactive_Strategy *reactive_strategy;
+ ACE_NEW_RETURN (reactive_strategy,
+ Reactive_Strategy,
+ 1);
+ reactive_strategy->init (orb.in (), poa.in ());
+ TAO_MMDevice *mmdevice_impl;
+ ACE_NEW_RETURN (mmdevice_impl,
+ TAO_MMDevice (reactive_strategy),
+ 1);
+
+ AVStreams::MMDevice_var mmdevice =
+ mmdevice_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ orb->object_to_string (mmdevice.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ()));
+
+ // If the ior_output_file exists, output the ior to it
+ if (ior_output_file != 0)
+ {
+ FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+ }
+
+ Ping_Recv_FDev* ping_fdev_impl;
+ ACE_NEW_RETURN (ping_fdev_impl,
+ Ping_Recv_FDev ("Ping"),
+ 1);
+ Pong_Send_FDev* pong_fdev_impl;
+ ACE_NEW_RETURN (pong_fdev_impl,
+ Pong_Send_FDev ("Pong"),
+ 1);
+
+ AVStreams::FDev_var ping_fdev =
+ ping_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ AVStreams::FDev_var pong_fdev =
+ pong_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ mmdevice->add_fdev (ping_fdev.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (respond == 1)
+ {
+ mmdevice->add_fdev (pong_fdev.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+
+ orb->run ();
+ ACE_TRY_CHECK;
+
+
+ ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . "));
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_DEBUG ((LM_DEBUG, "done %d \n", gsf));
+
+ recv_latency.dump_results ("Receive", gsf);
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Caught exception:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+// ****************************************************************
+
+Ping_Recv::Ping_Recv (void)
+ : TAO_FlowConsumer ("Ping",
+ ping_protocols,
+ "UNS:ping")
+{
+}
+
+int
+Ping_Recv::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ ACE_DEBUG ((LM_DEBUG,"Ping_Recv::get_callback\n"));
+ callback = &this->callback_;
+ return 0;
+}
+
+Ping_Recv_Callback::Ping_Recv_Callback (void)
+ : count_ (0)
+{
+}
+
+int
+Ping_Recv_Callback::handle_stop (void)
+{
+ ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::stop"));
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+
+ return 0;
+}
+
+int
+Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ this->count_++;
+
+ ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::receive_frame %d\n", this->count_));
+
+ if (this->count_ < 10)
+ {
+ for (const ACE_Message_Block *i = frame;
+ i != 0;
+ i = i->cont ())
+ {
+ ACE_hrtime_t stamp;
+
+ if (i->length () < sizeof(stamp))
+ return 0;
+
+ ACE_OS::memcpy (&stamp, i->rd_ptr (), sizeof(stamp));
+
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+ if (recv_base == 0)
+ {
+ recv_base = now;
+ }
+ else
+ {
+ recv_latency.sample (now - recv_base,
+ now - stamp);
+ }
+
+ if (respond == 1)
+ pong_callback.send_response (stamp);
+ }
+ }
+ else
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+}
+
+int
+Ping_Recv_Callback::handle_destroy (void)
+{
+ ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::destroy\n"));
+ return 0;
+}
+
+// ****************************************************************
+
+Pong_Send::Pong_Send (void)
+ : TAO_FlowProducer ("Pong",
+ pong_protocols,
+ "UNS:pong")
+{
+}
+
+int
+Pong_Send::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ ACE_DEBUG ((LM_DEBUG,"Pong_Send::get_callback\n"));
+ callback = &pong_callback;
+ return 0;
+}
+
+void
+Pong_Send_Callback::get_timeout (ACE_Time_Value *&tv,
+ void *&)
+{
+ // @@ ACE_NEW (tv, ACE_Time_Value (0, milliseconds * 1000));
+ ACE_DEBUG ((LM_DEBUG,"Pong_Send_Callback::get_timeout\n"));
+ tv = 0;
+}
+
+int
+Pong_Send_Callback::handle_timeout (void *)
+{
+ // ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n"));
+ return 0;
+}
+
+int
+Pong_Send_Callback::handle_end_stream (void)
+{
+ return 0;
+}
+
+int
+Pong_Send_Callback::send_response (ACE_hrtime_t stamp)
+{
+ ACE_DEBUG ((LM_DEBUG, "pong send response)\n"));
+
+ ACE_hrtime_t buf[2];
+
+ ACE_Message_Block mb (reinterpret_cast<char*> (buf),
+ sizeof(buf));
+
+ buf[0] = stamp;
+ buf[1] = ACE_OS::gethrtime ();
+ mb.wr_ptr (sizeof(buf));
+
+ int result = this->protocol_object_->send_frame (&mb);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Pong_Send_Callback::send - %p\n",
+ ""),
+ -1);
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/ping.h b/TAO/orbsvcs/tests/AVStreams/Latency/ping.h
new file mode 100644
index 00000000000..88e6fc31b97
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/ping.h
@@ -0,0 +1,77 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// TAO/orbsvcs/tests/AVStreams/Latency
+//
+// = FILENAME
+// ping.h
+//
+// = AUTHOR
+// Carlos O'Ryan
+//
+// ============================================================================
+
+#ifndef TAO_PING_H
+#define TAO_PING_H
+
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/Policy.h"
+#include "orbsvcs/AV/Flows_T.h"
+#include "ace/OS_NS_time.h"
+
+class Ping_Recv_Callback : public TAO_AV_Callback
+{
+public:
+ Ping_Recv_Callback (void);
+ virtual int handle_stop (void);
+ virtual int receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *frame_info = 0,
+ const ACE_Addr &peer_address = ACE_Addr::sap_any);
+ virtual int handle_destroy (void);
+ protected:
+ int count_;
+};
+
+class Ping_Recv : public TAO_FlowConsumer
+{
+public:
+ Ping_Recv (void);
+
+ virtual int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+
+private:
+ Ping_Recv_Callback callback_;
+ // The callback object...
+};
+
+class Pong_Send_Callback : public TAO_AV_Callback
+{
+public:
+
+ int send_response (ACE_hrtime_t stamp);
+ // Ad-hoc method to send a response outside the context of a
+ // handle_timeout.
+
+ virtual int handle_timeout (void *arg);
+ virtual int handle_end_stream (void);
+ virtual void get_timeout (ACE_Time_Value *&tv,
+ void *&arg);
+};
+
+class Pong_Send : public TAO_FlowProducer
+{
+public:
+ Pong_Send (void);
+ virtual int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+};
+
+typedef TAO_AV_Endpoint_Reactive_Strategy_B <TAO_StreamEndPoint_B,TAO_VDev,AV_Null_MediaCtrl> Reactive_Strategy;
+
+typedef TAO_FDev<TAO_FlowProducer,Ping_Recv> Ping_Recv_FDev;
+typedef TAO_FDev<Pong_Send,TAO_FlowConsumer> Pong_Send_FDev;
+
+#endif /* TAO_PING_H */
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
new file mode 100644
index 00000000000..42979b096e9
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
@@ -0,0 +1,360 @@
+// $Id$
+
+#include "pong.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+#include "tao/PortableServer/PortableServer.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "tao/ORB.h"
+#include "tao/debug.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
+
+ACE_RCSID (Latency,
+ ping,
+ "$Id$")
+
+const char *ior_output_file = "pong.ior";
+const char *protocol = "RTP/UDP";
+int milliseconds = 100;
+size_t message_size = 64;
+int respond = 1;
+AVStreams::protocolSpec pong_protocols;
+AVStreams::protocolSpec ping_protocols;
+
+ACE_hrtime_t recv_throughput_base = 0;
+ACE_Throughput_Stats recv_latency;
+
+ACE_hrtime_t send_throughput_base = 0;
+ACE_Throughput_Stats send_latency;
+
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:b:d");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.opt_arg ();
+ break;
+
+ case 'r':
+ {
+ CORBA::ULong l = ping_protocols.length ();
+ ping_protocols.length (l + 1);
+ ping_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
+ }
+ break;
+
+ case 's':
+ {
+ CORBA::ULong l = pong_protocols.length ();
+ pong_protocols.length (l + 1);
+ pong_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
+ }
+ break;
+
+ case 't':
+ milliseconds = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'b':
+ message_size = ACE_OS::atoi (get_opts.opt_arg ());
+ if (message_size < sizeof(ACE_hrtime_t))
+ {
+ ACE_DEBUG ((LM_DEBUG, "Invalid message size\n"));
+ message_size = 64;
+ }
+ break;
+
+ case 'x':
+ respond = 0;
+ break;
+ case 'd':
+ TAO_debug_level++;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile> "
+ "-r <protocol=addr> "
+ "-s <protocol=addr> "
+ "-t <milliseconds> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+
+
+ // If no protocols are specified use the default...
+ if (pong_protocols.length () == 0)
+ {
+ pong_protocols.length (1);
+ pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456");
+ }
+
+ if (ping_protocols.length () == 0)
+ {
+ ping_protocols.length (1);
+ ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345");
+ }
+
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int main (int argc, char *argv[])
+{
+ ACE_TRY_NEW_ENV
+ {
+
+
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv);
+
+ parse_args (argc, argv);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var poa
+ = PortableServer::POA::_narrow (obj.in ());
+
+ PortableServer::POAManager_var mgr
+ = poa->the_POAManager ();
+
+ mgr->activate ();
+
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ poa.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ Reactive_Strategy *reactive_strategy;
+ ACE_NEW_RETURN (reactive_strategy,
+ Reactive_Strategy,
+ 1);
+ reactive_strategy->init (orb.in (), poa.in ());
+ TAO_MMDevice *mmdevice_impl;
+ ACE_NEW_RETURN (mmdevice_impl,
+ TAO_MMDevice (reactive_strategy),
+ 1);
+
+ AVStreams::MMDevice_var mmdevice =
+ mmdevice_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ orb->object_to_string (mmdevice.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ()));
+
+ // If the ior_output_file exists, output the ior to it
+ if (ior_output_file != 0)
+ {
+ FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+ }
+
+ Pong_Recv_FDev* pong_fdev_impl;
+ ACE_NEW_RETURN (pong_fdev_impl,
+ Pong_Recv_FDev ("Pong"),
+ 1);
+ Ping_Send_FDev* ping_fdev_impl;
+ ACE_NEW_RETURN (ping_fdev_impl,
+ Ping_Send_FDev ("Ping"),
+ 1);
+
+ AVStreams::FDev_var ping_fdev =
+ ping_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ AVStreams::FDev_var pong_fdev =
+ pong_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ mmdevice->add_fdev (ping_fdev.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ if (respond == 1)
+ {
+ mmdevice->add_fdev (pong_fdev.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+
+ orb->run ( ACE_ENV_SINGLE_ARG_PARAMETER );
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . "));
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_DEBUG ((LM_DEBUG, "done\n"));
+
+ recv_latency.dump_results ("Receive", gsf);
+
+ send_latency.dump_results ("Send", gsf);
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Caught exception:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+// ****************************************************************
+
+Pong_Recv::Pong_Recv (void)
+ : TAO_FlowConsumer ("Pong",
+ pong_protocols,
+ "UNS:pong")
+{
+}
+
+int
+Pong_Recv::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n"));
+ callback = &this->callback_;
+ return 0;
+}
+
+int
+Pong_Recv_Callback::handle_stop (void)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop"));
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+}
+
+int
+Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n"));
+
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+ for (const ACE_Message_Block *i = frame;
+ i != 0;
+ i = frame->cont ())
+ {
+ ACE_hrtime_t buf[2];
+
+ if (frame->length () < sizeof(buf))
+ {
+ ACE_DEBUG ((LM_DEBUG, "Unexpected message size\n"));
+ return 0;
+ }
+
+ ACE_OS::memcpy (buf, i->rd_ptr (), sizeof(buf));
+
+ if (recv_throughput_base == 0)
+ {
+ recv_throughput_base = now;
+ }
+ recv_latency.sample (now - recv_throughput_base,
+ now - buf[0]);
+ }
+ return 0;
+}
+
+int
+Pong_Recv_Callback::handle_destroy (void)
+{
+ ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::destroy\n"));
+ return 0;
+}
+
+// ****************************************************************
+
+Ping_Send::Ping_Send (void)
+ : TAO_FlowProducer ("Ping",
+ ping_protocols,
+ "UNS:ping")
+{
+}
+
+int
+Ping_Send::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n"));
+ callback = &this->callback_;
+ return 0;
+}
+
+Ping_Send_Callback::Ping_Send_Callback (void)
+ :count_ (0)
+{
+ this->timeout_ = ACE_Time_Value (2);
+
+ this->frame_.size (message_size);
+ this->frame_.wr_ptr (message_size);
+}
+
+void
+Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv,
+ void *&)
+{
+ tv = &this->timeout_;
+}
+
+int
+Ping_Send_Callback::handle_timeout (void *)
+{
+
+ this->count_++;
+
+ ACE_DEBUG ((LM_DEBUG, "Ping timeout frame %d\n", this->count_));
+
+ if (this->count_ > 10)
+ {
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+ }
+
+ ACE_hrtime_t stamp = ACE_OS::gethrtime ();
+ ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp));
+
+ int result = this->protocol_object_->send_frame (&this->frame_);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Ping_Send_Callback::handle_timeout - send_frame - %p\n",
+ ""),
+ -1);
+
+ if (send_throughput_base == 0)
+ {
+ send_throughput_base = stamp;
+ }
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+ send_latency.sample (now - send_throughput_base,
+ now - stamp);
+
+ return 0;
+}
+
+int
+Ping_Send_Callback::handle_end_stream (void)
+{
+ return 0;
+}
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.h b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h
new file mode 100644
index 00000000000..92d90adaad4
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h
@@ -0,0 +1,83 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// TAO/orbsvcs/tests/AVStreams/Latency
+//
+// = FILENAME
+// ping.h
+//
+// = AUTHOR
+// Carlos O'Ryan
+//
+// ============================================================================
+
+#ifndef TAO_PONG_H
+#define TAO_PONG_H
+
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/Policy.h"
+#include "orbsvcs/AV/Flows_T.h"
+
+class Pong_Recv_Callback : public TAO_AV_Callback
+{
+public:
+ virtual int handle_stop (void);
+ virtual int receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *frame_info = 0,
+ const ACE_Addr &peer_address = ACE_Addr::sap_any);
+ virtual int handle_destroy (void);
+};
+
+class Pong_Recv : public TAO_FlowConsumer
+{
+public:
+ Pong_Recv (void);
+
+ virtual int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+
+private:
+ Pong_Recv_Callback callback_;
+ // The callback object...
+};
+
+class Ping_Send_Callback : public TAO_AV_Callback
+{
+public:
+ Ping_Send_Callback (void);
+
+ virtual int handle_timeout (void *arg);
+ virtual int handle_end_stream (void);
+ virtual void get_timeout (ACE_Time_Value *&tv,
+ void *&arg);
+
+private:
+ ACE_Time_Value timeout_;
+ // the timeout value
+
+ ACE_Message_Block frame_;
+ // Pre-allocate the message block to send...
+ int count_;
+
+};
+
+class Ping_Send : public TAO_FlowProducer
+{
+public:
+ Ping_Send (void);
+ virtual int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+
+private:
+ Ping_Send_Callback callback_;
+ // The callback object...
+};
+
+typedef TAO_AV_Endpoint_Reactive_Strategy_A <TAO_StreamEndPoint_A,TAO_VDev,AV_Null_MediaCtrl> Reactive_Strategy;
+
+typedef TAO_FDev<TAO_FlowProducer,Pong_Recv> Pong_Recv_FDev;
+typedef TAO_FDev<Ping_Send,TAO_FlowConsumer> Ping_Send_FDev;
+
+#endif /* TAO_PONG_H */
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/run_test.pl b/TAO/orbsvcs/tests/AVStreams/Latency/run_test.pl
new file mode 100755
index 00000000000..7923bb727d8
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/run_test.pl
@@ -0,0 +1,61 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib '../../../../../bin';
+use PerlACE::Run_Test;
+
+# amount of delay between running the servers
+
+$status = 0;
+
+$pingior = PerlACE::LocalFile ("ping.ior");
+$pongior = PerlACE::LocalFile ("pong.ior");
+
+unlink $pingior, $pongior;
+
+$PING = new PerlACE::Process ("ping", "-o $pingior");
+$PONG = new PerlACE::Process ("pong", "-o $pongior");
+$CTRL = new PerlACE::Process ("control", "-f file://$pingior -g file://$pongior");
+
+print STDERR "Starting Ping\n";
+
+$PING->Spawn ();
+
+if (PerlACE::waitforfile_timed ($pingior, 20) == -1) {
+ print STDERR "ERROR: cannot find file <$pingior>\n";
+ $PING->Kill ();
+ exit 1;
+}
+
+print STDERR "Starting Pong\n";
+
+$PONG->Spawn ();
+if (PerlACE::waitforfile_timed ($pongior, 20) == -1) {
+ print STDERR "ERROR: cannot find file <$pongior>\n";
+ $PING->Kill ();
+ $PONG->Kill ();
+ exit 1;
+}
+
+print STDERR "Starting Control\n";
+
+$CTRL->Spawn();
+
+$PING->WaitKill(100);
+
+$PONG->WaitKill(100);
+
+$control = $CTRL->TerminateWaitKill (5);
+
+if ($control != 0) {
+ print STDERR "ERROR: control returned $control\n";
+ $status = 1;
+}
+
+unlink $pingior, $pongior;
+
+exit $status;
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf
new file mode 100644
index 00000000000..55dab11ac36
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf
@@ -0,0 +1,5 @@
+# $Id$
+#
+static Advanced_Resource_Factory "-ORBReactorType select_st -ORBInputCDRAllocator null -ORBConnectionCacheLock null"
+static Server_Strategy_Factory "-ORBPOALock null -ORBAllowReactivationOfSystemids 0"
+static Client_Strategy_Factory "-ORBProfileLock null -ORBClientConnectionHandler ST"
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml
new file mode 100644
index 00000000000..5f7c667443f
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml
@@ -0,0 +1,9 @@
+<?xml version='1.0'?>
+<!-- Converted from ./orbsvcs/tests/AVStreams/Latency/svc.conf by svcconf-convert.pl -->
+<ACE_Svc_Conf>
+ <!-- $Id$ -->
+ <!-- -->
+ <static id="Advanced_Resource_Factory" params="-ORBReactorType select_st -ORBInputCDRAllocator null -ORBConnectionCacheLock null"/>
+ <static id="Server_Strategy_Factory" params="-ORBPOALock null -ORBAllowReactivationOfSystemids 0"/>
+ <static id="Client_Strategy_Factory" params="-ORBProfileLock null -ORBClientConnectionHandler ST"/>
+</ACE_Svc_Conf>