diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Latency')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc | 27 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am | 121 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/README | 9 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/control.cpp | 184 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp | 347 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/ping.h | 77 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp | 360 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/pong.h | 83 | ||||
-rwxr-xr-x | TAO/orbsvcs/tests/AVStreams/Latency/run_test.pl | 61 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/svc.conf | 5 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml | 9 |
11 files changed, 1283 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc b/TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc new file mode 100644 index 00000000000..e7f0d165f23 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/AVS_Latency.mpc @@ -0,0 +1,27 @@ +// -*- MPC -*- +// $Id$ + +project(*ping): avstreamsexe, strategies { + exename = ping + + Source_Files { + ping.cpp + } +} + +project(*pong): avstreamsexe, strategies { + exename = pong + + Source_Files { + pong.cpp + } +} + +project(*cntl): avstreamsexe, strategies { + exename = control + + Source_Files { + control.cpp + } +} + diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am new file mode 100644 index 00000000000..20caef1661f --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile.am @@ -0,0 +1,121 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_ROOT = $(top_srcdir) + +noinst_PROGRAMS = + +## Makefile.AVS_Latency_Cntl.am + +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS += control + +control_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +control_SOURCES = \ + control.cpp \ + ping.h \ + pong.h + +control_LDADD = \ + $(TAO_BUILDDIR)/tao/libTAO_Strategies.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.AVS_Latency_Ping.am + +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS += ping + +ping_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +ping_SOURCES = \ + ping.cpp \ + ping.h + +ping_LDADD = \ + $(TAO_BUILDDIR)/tao/libTAO_Strategies.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.AVS_Latency_Pong.am + +if !BUILD_ACE_FOR_TAO + +noinst_PROGRAMS += pong + +pong_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs + +pong_SOURCES = \ + pong.cpp \ + pong.h + +pong_LDADD = \ + $(TAO_BUILDDIR)/tao/libTAO_Strategies.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/README b/TAO/orbsvcs/tests/AVStreams/Latency/README new file mode 100644 index 00000000000..6a96f8cff60 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/README @@ -0,0 +1,9 @@ +# $Id$ + + A simple latency test for the AVStreams pluggable protocol +framework. Run as follows: + +$ ping -o ping.ior +$ pong -o pong.ior +$ control -f file://ping.ior -g file://pong.ior + diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp new file mode 100644 index 00000000000..07ce3c8e804 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp @@ -0,0 +1,184 @@ +// $Id$ + +#include "orbsvcs/AV/AVStreams_i.h" +#include "orbsvcs/AV/FlowSpec_Entry.h" +#include "tao/PortableServer/PortableServer.h" +#include "tao/Strategies/advanced_resource.h" +#include "tao/debug.h" +#include "ace/Get_Opt.h" +#include "ace/INET_Addr.h" + +ACE_RCSID (Latency, + ping, + "$Id$") + +const char *ping_ior = CORBA::string_dup ("file://ping.ior"); +const char *pong_ior = CORBA::string_dup ("file://pong.ior"); +const char *ping_address = CORBA::string_dup ("localhost:12345"); +const char *pong_address = CORBA::string_dup ("localhost:23456"); +const char *protocol = CORBA::string_dup ("UDP"); + +int milliseconds = 30000; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "f:g:s:r:t:p:d"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'f': + ping_ior = get_opts.opt_arg (); + break; + + case 'g': + pong_ior = get_opts.opt_arg (); + break; + + case 'r': + ping_address = get_opts.opt_arg (); + break; + + case 's': + pong_address = get_opts.opt_arg (); + break; + + case 't': + milliseconds = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'p': + protocol = get_opts.opt_arg (); + break; + + case 'd': + TAO_debug_level++; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-f <ping_ior> " + "-g <ping_ior> " + "-s <ping_address> " + "-r <pong_address> " + "-t <milliseconds> " + "-p protocols " + "\n", + argv [0]), + -1); + } + + + // Indicates sucessful parsing of the command line + return 0; +} + +int main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + + CORBA::ORB_var orb = CORBA::ORB_init (argc, + argv); + parse_args (argc, argv); + + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var poa + = PortableServer::POA::_narrow (obj.in ()); + + PortableServer::POAManager_var mgr + = poa->the_POAManager (); + + mgr->activate (); + + TAO_AV_CORE::instance ()->init (orb.in (), + poa.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Connect the two streams and run them... + AVStreams::flowSpec flow_spec (2); + flow_spec.length (2); + + ACE_INET_Addr ping_addr; + ping_addr.set (ping_address); + TAO_Forward_FlowSpec_Entry ping ("Ping", + "IN", + "UNS:ping", + "", + protocol, + &ping_addr); + flow_spec[0] = CORBA::string_dup (ping.entry_to_string ()); + + ACE_INET_Addr pong_addr; + pong_addr.set (pong_address); + TAO_Forward_FlowSpec_Entry pong ("Pong", + "OUT", + "UNS:pong", + "", + protocol, + &pong_addr); + flow_spec[1] = CORBA::string_dup (pong.entry_to_string ()); + + TAO_StreamCtrl stream_control_impl; + + AVStreams::StreamCtrl_var stream_control = + stream_control_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + obj = orb->string_to_object (ping_ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + AVStreams::MMDevice_var ping_sender = + AVStreams::MMDevice::_narrow (obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + obj = orb->string_to_object (pong_ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + AVStreams::MMDevice_var pong_sender = + AVStreams::MMDevice::_narrow (obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + AVStreams::streamQoS_var the_qos = + new AVStreams::streamQoS; + + stream_control->bind_devs (pong_sender.in (), + ping_sender.in (), + the_qos.inout (), + flow_spec + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + flow_spec.length (0); + stream_control->start (flow_spec ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_Time_Value tv (100, 0); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "event loop finished\n")); + orb->shutdown (1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // flow_spec.length (0); + // stream_control->stop (flow_spec ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Caught exception:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp new file mode 100644 index 00000000000..886394ae9f9 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp @@ -0,0 +1,347 @@ +// $Id$ + +#include "ping.h" +#include "orbsvcs/AV/Protocol_Factory.h" +#include "tao/ORB.h" +#include "tao/Strategies/advanced_resource.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" + +ACE_RCSID (Latency, + ping, + "$Id$") + +const char *ior_output_file = "ping.ior"; +const char *protocol = "RTP/UDP"; +int milliseconds = 100; +int respond = 1; +AVStreams::protocolSpec ping_protocols; +AVStreams::protocolSpec pong_protocols; + +Pong_Send_Callback pong_callback; + +ACE_hrtime_t recv_base = 0; +ACE_Throughput_Stats recv_latency; + + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.opt_arg (); + break; + + case 'r': + { + CORBA::ULong l = ping_protocols.length (); + ping_protocols.length (l + 1); + ping_protocols[l] = CORBA::string_dup (get_opts.opt_arg ()); + } + break; + + case 's': + { + CORBA::ULong l = pong_protocols.length (); + pong_protocols.length (l + 1); + pong_protocols[l] = CORBA::string_dup (get_opts.opt_arg ()); + } + break; + + case 't': + milliseconds = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'x': + respond = 0; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile> " + "-r <protocol=addr> " + "-s <protocol=addr> " + "-t <milliseconds> " + "\n", + argv [0]), + -1); + } + + + // If no protocols are specified use the default... + if (ping_protocols.length () == 0) + { + ping_protocols.length (1); + ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345"); + } + + if (pong_protocols.length () == 0) + { + pong_protocols.length (1); + pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456"); + } + + // Indicates sucessful parsing of the command line + return 0; +} + +int main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + + CORBA::ORB_var orb = CORBA::ORB_init (argc, + argv); + + parse_args (argc, argv); + + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var poa + = PortableServer::POA::_narrow (obj.in ()); + + PortableServer::POAManager_var mgr + = poa->the_POAManager (); + + mgr->activate (); + + TAO_AV_CORE::instance ()->init (orb.in (), + poa.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Register the video mmdevice object with the ORB + Reactive_Strategy *reactive_strategy; + ACE_NEW_RETURN (reactive_strategy, + Reactive_Strategy, + 1); + reactive_strategy->init (orb.in (), poa.in ()); + TAO_MMDevice *mmdevice_impl; + ACE_NEW_RETURN (mmdevice_impl, + TAO_MMDevice (reactive_strategy), + 1); + + AVStreams::MMDevice_var mmdevice = + mmdevice_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (mmdevice.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ())); + + // If the ior_output_file exists, output the ior to it + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + Ping_Recv_FDev* ping_fdev_impl; + ACE_NEW_RETURN (ping_fdev_impl, + Ping_Recv_FDev ("Ping"), + 1); + Pong_Send_FDev* pong_fdev_impl; + ACE_NEW_RETURN (pong_fdev_impl, + Pong_Send_FDev ("Pong"), + 1); + + AVStreams::FDev_var ping_fdev = + ping_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + AVStreams::FDev_var pong_fdev = + pong_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + mmdevice->add_fdev (ping_fdev.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (respond == 1) + { + mmdevice->add_fdev (pong_fdev.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + orb->run (); + ACE_TRY_CHECK; + + + ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . ")); + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_DEBUG ((LM_DEBUG, "done %d \n", gsf)); + + recv_latency.dump_results ("Receive", gsf); + + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Caught exception:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} + +// **************************************************************** + +Ping_Recv::Ping_Recv (void) + : TAO_FlowConsumer ("Ping", + ping_protocols, + "UNS:ping") +{ +} + +int +Ping_Recv::get_callback (const char *, + TAO_AV_Callback *&callback) +{ + ACE_DEBUG ((LM_DEBUG,"Ping_Recv::get_callback\n")); + callback = &this->callback_; + return 0; +} + +Ping_Recv_Callback::Ping_Recv_Callback (void) + : count_ (0) +{ +} + +int +Ping_Recv_Callback::handle_stop (void) +{ + ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::stop")); + TAO_AV_CORE::instance ()->orb ()->shutdown (); + + return 0; +} + +int +Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *, + const ACE_Addr &) +{ + this->count_++; + + ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::receive_frame %d\n", this->count_)); + + if (this->count_ < 10) + { + for (const ACE_Message_Block *i = frame; + i != 0; + i = i->cont ()) + { + ACE_hrtime_t stamp; + + if (i->length () < sizeof(stamp)) + return 0; + + ACE_OS::memcpy (&stamp, i->rd_ptr (), sizeof(stamp)); + + ACE_hrtime_t now = ACE_OS::gethrtime (); + if (recv_base == 0) + { + recv_base = now; + } + else + { + recv_latency.sample (now - recv_base, + now - stamp); + } + + if (respond == 1) + pong_callback.send_response (stamp); + } + } + else + TAO_AV_CORE::instance ()->orb ()->shutdown (); + return 0; +} + +int +Ping_Recv_Callback::handle_destroy (void) +{ + ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::destroy\n")); + return 0; +} + +// **************************************************************** + +Pong_Send::Pong_Send (void) + : TAO_FlowProducer ("Pong", + pong_protocols, + "UNS:pong") +{ +} + +int +Pong_Send::get_callback (const char *, + TAO_AV_Callback *&callback) +{ + ACE_DEBUG ((LM_DEBUG,"Pong_Send::get_callback\n")); + callback = &pong_callback; + return 0; +} + +void +Pong_Send_Callback::get_timeout (ACE_Time_Value *&tv, + void *&) +{ + // @@ ACE_NEW (tv, ACE_Time_Value (0, milliseconds * 1000)); + ACE_DEBUG ((LM_DEBUG,"Pong_Send_Callback::get_timeout\n")); + tv = 0; +} + +int +Pong_Send_Callback::handle_timeout (void *) +{ + // ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n")); + return 0; +} + +int +Pong_Send_Callback::handle_end_stream (void) +{ + return 0; +} + +int +Pong_Send_Callback::send_response (ACE_hrtime_t stamp) +{ + ACE_DEBUG ((LM_DEBUG, "pong send response)\n")); + + ACE_hrtime_t buf[2]; + + ACE_Message_Block mb (reinterpret_cast<char*> (buf), + sizeof(buf)); + + buf[0] = stamp; + buf[1] = ACE_OS::gethrtime (); + mb.wr_ptr (sizeof(buf)); + + int result = this->protocol_object_->send_frame (&mb); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Pong_Send_Callback::send - %p\n", + ""), + -1); + + return 0; +} diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/ping.h b/TAO/orbsvcs/tests/AVStreams/Latency/ping.h new file mode 100644 index 00000000000..88e6fc31b97 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/ping.h @@ -0,0 +1,77 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// TAO/orbsvcs/tests/AVStreams/Latency +// +// = FILENAME +// ping.h +// +// = AUTHOR +// Carlos O'Ryan +// +// ============================================================================ + +#ifndef TAO_PING_H +#define TAO_PING_H + +#include "orbsvcs/AV/AVStreams_i.h" +#include "orbsvcs/AV/Policy.h" +#include "orbsvcs/AV/Flows_T.h" +#include "ace/OS_NS_time.h" + +class Ping_Recv_Callback : public TAO_AV_Callback +{ +public: + Ping_Recv_Callback (void); + virtual int handle_stop (void); + virtual int receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *frame_info = 0, + const ACE_Addr &peer_address = ACE_Addr::sap_any); + virtual int handle_destroy (void); + protected: + int count_; +}; + +class Ping_Recv : public TAO_FlowConsumer +{ +public: + Ping_Recv (void); + + virtual int get_callback (const char *flowname, + TAO_AV_Callback *&callback); + +private: + Ping_Recv_Callback callback_; + // The callback object... +}; + +class Pong_Send_Callback : public TAO_AV_Callback +{ +public: + + int send_response (ACE_hrtime_t stamp); + // Ad-hoc method to send a response outside the context of a + // handle_timeout. + + virtual int handle_timeout (void *arg); + virtual int handle_end_stream (void); + virtual void get_timeout (ACE_Time_Value *&tv, + void *&arg); +}; + +class Pong_Send : public TAO_FlowProducer +{ +public: + Pong_Send (void); + virtual int get_callback (const char *flowname, + TAO_AV_Callback *&callback); +}; + +typedef TAO_AV_Endpoint_Reactive_Strategy_B <TAO_StreamEndPoint_B,TAO_VDev,AV_Null_MediaCtrl> Reactive_Strategy; + +typedef TAO_FDev<TAO_FlowProducer,Ping_Recv> Ping_Recv_FDev; +typedef TAO_FDev<Pong_Send,TAO_FlowConsumer> Pong_Send_FDev; + +#endif /* TAO_PING_H */ diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp new file mode 100644 index 00000000000..42979b096e9 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp @@ -0,0 +1,360 @@ +// $Id$ + +#include "pong.h" +#include "orbsvcs/AV/Protocol_Factory.h" +#include "tao/PortableServer/PortableServer.h" +#include "tao/Strategies/advanced_resource.h" +#include "tao/ORB.h" +#include "tao/debug.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" + +ACE_RCSID (Latency, + ping, + "$Id$") + +const char *ior_output_file = "pong.ior"; +const char *protocol = "RTP/UDP"; +int milliseconds = 100; +size_t message_size = 64; +int respond = 1; +AVStreams::protocolSpec pong_protocols; +AVStreams::protocolSpec ping_protocols; + +ACE_hrtime_t recv_throughput_base = 0; +ACE_Throughput_Stats recv_latency; + +ACE_hrtime_t send_throughput_base = 0; +ACE_Throughput_Stats send_latency; + + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:b:d"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.opt_arg (); + break; + + case 'r': + { + CORBA::ULong l = ping_protocols.length (); + ping_protocols.length (l + 1); + ping_protocols[l] = CORBA::string_dup (get_opts.opt_arg ()); + } + break; + + case 's': + { + CORBA::ULong l = pong_protocols.length (); + pong_protocols.length (l + 1); + pong_protocols[l] = CORBA::string_dup (get_opts.opt_arg ()); + } + break; + + case 't': + milliseconds = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'b': + message_size = ACE_OS::atoi (get_opts.opt_arg ()); + if (message_size < sizeof(ACE_hrtime_t)) + { + ACE_DEBUG ((LM_DEBUG, "Invalid message size\n")); + message_size = 64; + } + break; + + case 'x': + respond = 0; + break; + case 'd': + TAO_debug_level++; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile> " + "-r <protocol=addr> " + "-s <protocol=addr> " + "-t <milliseconds> " + "\n", + argv [0]), + -1); + } + + + // If no protocols are specified use the default... + if (pong_protocols.length () == 0) + { + pong_protocols.length (1); + pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456"); + } + + if (ping_protocols.length () == 0) + { + ping_protocols.length (1); + ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345"); + } + + // Indicates sucessful parsing of the command line + return 0; +} + +int main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + + + CORBA::ORB_var orb = CORBA::ORB_init (argc, + argv); + + parse_args (argc, argv); + + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var poa + = PortableServer::POA::_narrow (obj.in ()); + + PortableServer::POAManager_var mgr + = poa->the_POAManager (); + + mgr->activate (); + + TAO_AV_CORE::instance ()->init (orb.in (), + poa.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + Reactive_Strategy *reactive_strategy; + ACE_NEW_RETURN (reactive_strategy, + Reactive_Strategy, + 1); + reactive_strategy->init (orb.in (), poa.in ()); + TAO_MMDevice *mmdevice_impl; + ACE_NEW_RETURN (mmdevice_impl, + TAO_MMDevice (reactive_strategy), + 1); + + AVStreams::MMDevice_var mmdevice = + mmdevice_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (mmdevice.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ())); + + // If the ior_output_file exists, output the ior to it + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + Pong_Recv_FDev* pong_fdev_impl; + ACE_NEW_RETURN (pong_fdev_impl, + Pong_Recv_FDev ("Pong"), + 1); + Ping_Send_FDev* ping_fdev_impl; + ACE_NEW_RETURN (ping_fdev_impl, + Ping_Send_FDev ("Ping"), + 1); + + AVStreams::FDev_var ping_fdev = + ping_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + AVStreams::FDev_var pong_fdev = + pong_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + mmdevice->add_fdev (ping_fdev.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (respond == 1) + { + mmdevice->add_fdev (pong_fdev.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + orb->run ( ACE_ENV_SINGLE_ARG_PARAMETER ); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "event loop finished\n")); + + ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . ")); + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_DEBUG ((LM_DEBUG, "done\n")); + + recv_latency.dump_results ("Receive", gsf); + + send_latency.dump_results ("Send", gsf); + + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Caught exception:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} + +// **************************************************************** + +Pong_Recv::Pong_Recv (void) + : TAO_FlowConsumer ("Pong", + pong_protocols, + "UNS:pong") +{ +} + +int +Pong_Recv::get_callback (const char *, + TAO_AV_Callback *&callback) +{ + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n")); + callback = &this->callback_; + return 0; +} + +int +Pong_Recv_Callback::handle_stop (void) +{ + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop")); + TAO_AV_CORE::instance ()->orb ()->shutdown (); + return 0; +} + +int +Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *, + const ACE_Addr &) +{ + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n")); + + ACE_hrtime_t now = ACE_OS::gethrtime (); + for (const ACE_Message_Block *i = frame; + i != 0; + i = frame->cont ()) + { + ACE_hrtime_t buf[2]; + + if (frame->length () < sizeof(buf)) + { + ACE_DEBUG ((LM_DEBUG, "Unexpected message size\n")); + return 0; + } + + ACE_OS::memcpy (buf, i->rd_ptr (), sizeof(buf)); + + if (recv_throughput_base == 0) + { + recv_throughput_base = now; + } + recv_latency.sample (now - recv_throughput_base, + now - buf[0]); + } + return 0; +} + +int +Pong_Recv_Callback::handle_destroy (void) +{ + ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::destroy\n")); + return 0; +} + +// **************************************************************** + +Ping_Send::Ping_Send (void) + : TAO_FlowProducer ("Ping", + ping_protocols, + "UNS:ping") +{ +} + +int +Ping_Send::get_callback (const char *, + TAO_AV_Callback *&callback) +{ + // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n")); + callback = &this->callback_; + return 0; +} + +Ping_Send_Callback::Ping_Send_Callback (void) + :count_ (0) +{ + this->timeout_ = ACE_Time_Value (2); + + this->frame_.size (message_size); + this->frame_.wr_ptr (message_size); +} + +void +Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv, + void *&) +{ + tv = &this->timeout_; +} + +int +Ping_Send_Callback::handle_timeout (void *) +{ + + this->count_++; + + ACE_DEBUG ((LM_DEBUG, "Ping timeout frame %d\n", this->count_)); + + if (this->count_ > 10) + { + TAO_AV_CORE::instance ()->orb ()->shutdown (); + return 0; + } + + ACE_hrtime_t stamp = ACE_OS::gethrtime (); + ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp)); + + int result = this->protocol_object_->send_frame (&this->frame_); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Ping_Send_Callback::handle_timeout - send_frame - %p\n", + ""), + -1); + + if (send_throughput_base == 0) + { + send_throughput_base = stamp; + } + ACE_hrtime_t now = ACE_OS::gethrtime (); + send_latency.sample (now - send_throughput_base, + now - stamp); + + return 0; +} + +int +Ping_Send_Callback::handle_end_stream (void) +{ + return 0; +} diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.h b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h new file mode 100644 index 00000000000..92d90adaad4 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h @@ -0,0 +1,83 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// TAO/orbsvcs/tests/AVStreams/Latency +// +// = FILENAME +// ping.h +// +// = AUTHOR +// Carlos O'Ryan +// +// ============================================================================ + +#ifndef TAO_PONG_H +#define TAO_PONG_H + +#include "orbsvcs/AV/AVStreams_i.h" +#include "orbsvcs/AV/Policy.h" +#include "orbsvcs/AV/Flows_T.h" + +class Pong_Recv_Callback : public TAO_AV_Callback +{ +public: + virtual int handle_stop (void); + virtual int receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *frame_info = 0, + const ACE_Addr &peer_address = ACE_Addr::sap_any); + virtual int handle_destroy (void); +}; + +class Pong_Recv : public TAO_FlowConsumer +{ +public: + Pong_Recv (void); + + virtual int get_callback (const char *flowname, + TAO_AV_Callback *&callback); + +private: + Pong_Recv_Callback callback_; + // The callback object... +}; + +class Ping_Send_Callback : public TAO_AV_Callback +{ +public: + Ping_Send_Callback (void); + + virtual int handle_timeout (void *arg); + virtual int handle_end_stream (void); + virtual void get_timeout (ACE_Time_Value *&tv, + void *&arg); + +private: + ACE_Time_Value timeout_; + // the timeout value + + ACE_Message_Block frame_; + // Pre-allocate the message block to send... + int count_; + +}; + +class Ping_Send : public TAO_FlowProducer +{ +public: + Ping_Send (void); + virtual int get_callback (const char *flowname, + TAO_AV_Callback *&callback); + +private: + Ping_Send_Callback callback_; + // The callback object... +}; + +typedef TAO_AV_Endpoint_Reactive_Strategy_A <TAO_StreamEndPoint_A,TAO_VDev,AV_Null_MediaCtrl> Reactive_Strategy; + +typedef TAO_FDev<TAO_FlowProducer,Pong_Recv> Pong_Recv_FDev; +typedef TAO_FDev<Ping_Send,TAO_FlowConsumer> Ping_Send_FDev; + +#endif /* TAO_PONG_H */ diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/run_test.pl b/TAO/orbsvcs/tests/AVStreams/Latency/run_test.pl new file mode 100755 index 00000000000..7923bb727d8 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/run_test.pl @@ -0,0 +1,61 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib '../../../../../bin'; +use PerlACE::Run_Test; + +# amount of delay between running the servers + +$status = 0; + +$pingior = PerlACE::LocalFile ("ping.ior"); +$pongior = PerlACE::LocalFile ("pong.ior"); + +unlink $pingior, $pongior; + +$PING = new PerlACE::Process ("ping", "-o $pingior"); +$PONG = new PerlACE::Process ("pong", "-o $pongior"); +$CTRL = new PerlACE::Process ("control", "-f file://$pingior -g file://$pongior"); + +print STDERR "Starting Ping\n"; + +$PING->Spawn (); + +if (PerlACE::waitforfile_timed ($pingior, 20) == -1) { + print STDERR "ERROR: cannot find file <$pingior>\n"; + $PING->Kill (); + exit 1; +} + +print STDERR "Starting Pong\n"; + +$PONG->Spawn (); +if (PerlACE::waitforfile_timed ($pongior, 20) == -1) { + print STDERR "ERROR: cannot find file <$pongior>\n"; + $PING->Kill (); + $PONG->Kill (); + exit 1; +} + +print STDERR "Starting Control\n"; + +$CTRL->Spawn(); + +$PING->WaitKill(100); + +$PONG->WaitKill(100); + +$control = $CTRL->TerminateWaitKill (5); + +if ($control != 0) { + print STDERR "ERROR: control returned $control\n"; + $status = 1; +} + +unlink $pingior, $pongior; + +exit $status; diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf new file mode 100644 index 00000000000..55dab11ac36 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf @@ -0,0 +1,5 @@ +# $Id$ +# +static Advanced_Resource_Factory "-ORBReactorType select_st -ORBInputCDRAllocator null -ORBConnectionCacheLock null" +static Server_Strategy_Factory "-ORBPOALock null -ORBAllowReactivationOfSystemids 0" +static Client_Strategy_Factory "-ORBProfileLock null -ORBClientConnectionHandler ST" diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml new file mode 100644 index 00000000000..5f7c667443f --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/svc.conf.xml @@ -0,0 +1,9 @@ +<?xml version='1.0'?> +<!-- Converted from ./orbsvcs/tests/AVStreams/Latency/svc.conf by svcconf-convert.pl --> +<ACE_Svc_Conf> + <!-- $Id$ --> + <!-- --> + <static id="Advanced_Resource_Factory" params="-ORBReactorType select_st -ORBInputCDRAllocator null -ORBConnectionCacheLock null"/> + <static id="Server_Strategy_Factory" params="-ORBPOALock null -ORBAllowReactivationOfSystemids 0"/> + <static id="Client_Strategy_Factory" params="-ORBProfileLock null -ORBClientConnectionHandler ST"/> +</ACE_Svc_Conf> |