summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp348
1 files changed, 348 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
new file mode 100644
index 00000000000..5b176e2aba8
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
@@ -0,0 +1,348 @@
+// $Id$
+
+#include "pong.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+#include "tao/PortableServer/PortableServer.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "tao/ORB.h"
+#include "tao/debug.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
+
+ACE_RCSID (Latency,
+ ping,
+ "$Id$")
+
+const char *ior_output_file = "pong.ior";
+const char *protocol = "RTP/UDP";
+int milliseconds = 100;
+size_t message_size = 64;
+int respond = 1;
+AVStreams::protocolSpec pong_protocols;
+AVStreams::protocolSpec ping_protocols;
+
+ACE_hrtime_t recv_throughput_base = 0;
+ACE_Throughput_Stats recv_latency;
+
+ACE_hrtime_t send_throughput_base = 0;
+ACE_Throughput_Stats send_latency;
+
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:b:d");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.opt_arg ();
+ break;
+
+ case 'r':
+ {
+ CORBA::ULong l = ping_protocols.length ();
+ ping_protocols.length (l + 1);
+ ping_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
+ }
+ break;
+
+ case 's':
+ {
+ CORBA::ULong l = pong_protocols.length ();
+ pong_protocols.length (l + 1);
+ pong_protocols[l] = CORBA::string_dup (get_opts.opt_arg ());
+ }
+ break;
+
+ case 't':
+ milliseconds = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'b':
+ message_size = ACE_OS::atoi (get_opts.opt_arg ());
+ if (message_size < sizeof(ACE_hrtime_t))
+ {
+ ACE_DEBUG ((LM_DEBUG, "Invalid message size\n"));
+ message_size = 64;
+ }
+ break;
+
+ case 'x':
+ respond = 0;
+ break;
+ case 'd':
+ TAO_debug_level++;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile> "
+ "-r <protocol=addr> "
+ "-s <protocol=addr> "
+ "-t <milliseconds> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+
+
+ // If no protocols are specified use the default...
+ if (pong_protocols.length () == 0)
+ {
+ pong_protocols.length (1);
+ pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456");
+ }
+
+ if (ping_protocols.length () == 0)
+ {
+ ping_protocols.length (1);
+ ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345");
+ }
+
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int main (int argc, char *argv[])
+{
+ try
+ {
+
+
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv);
+
+ parse_args (argc, argv);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA");
+
+ PortableServer::POA_var poa
+ = PortableServer::POA::_narrow (obj.in ());
+
+ PortableServer::POAManager_var mgr
+ = poa->the_POAManager ();
+
+ mgr->activate ();
+
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ poa.in ());
+
+ Reactive_Strategy *reactive_strategy;
+ ACE_NEW_RETURN (reactive_strategy,
+ Reactive_Strategy,
+ 1);
+ reactive_strategy->init (orb.in (), poa.in ());
+ TAO_MMDevice *mmdevice_impl;
+ ACE_NEW_RETURN (mmdevice_impl,
+ TAO_MMDevice (reactive_strategy),
+ 1);
+
+ AVStreams::MMDevice_var mmdevice =
+ mmdevice_impl->_this ();
+
+ CORBA::String_var ior =
+ orb->object_to_string (mmdevice.in ());
+
+ ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ()));
+
+ // If the ior_output_file exists, output the ior to it
+ if (ior_output_file != 0)
+ {
+ FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+ }
+
+ Pong_Recv_FDev* pong_fdev_impl;
+ ACE_NEW_RETURN (pong_fdev_impl,
+ Pong_Recv_FDev ("Pong"),
+ 1);
+ Ping_Send_FDev* ping_fdev_impl;
+ ACE_NEW_RETURN (ping_fdev_impl,
+ Ping_Send_FDev ("Ping"),
+ 1);
+
+ AVStreams::FDev_var ping_fdev =
+ ping_fdev_impl->_this ();
+ AVStreams::FDev_var pong_fdev =
+ pong_fdev_impl->_this ();
+
+ mmdevice->add_fdev (ping_fdev.in ());
+ if (respond == 1)
+ {
+ mmdevice->add_fdev (pong_fdev.in ());
+ }
+
+ orb->run ( );
+
+ ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . "));
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_DEBUG ((LM_DEBUG, "done\n"));
+
+ recv_latency.dump_results ("Receive", gsf);
+
+ send_latency.dump_results ("Send", gsf);
+
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Caught exception:");
+ return 1;
+ }
+
+ return 0;
+}
+
+// ****************************************************************
+
+Pong_Recv::Pong_Recv (void)
+ : TAO_FlowConsumer ("Pong",
+ pong_protocols,
+ "UNS:pong")
+{
+}
+
+int
+Pong_Recv::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n"));
+ callback = &this->callback_;
+ return 0;
+}
+
+int
+Pong_Recv_Callback::handle_stop (void)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop"));
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+}
+
+int
+Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n"));
+
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+ for (const ACE_Message_Block *i = frame;
+ i != 0;
+ i = frame->cont ())
+ {
+ ACE_hrtime_t buf[2];
+
+ if (frame->length () < sizeof(buf))
+ {
+ ACE_DEBUG ((LM_DEBUG, "Unexpected message size\n"));
+ return 0;
+ }
+
+ ACE_OS::memcpy (buf, i->rd_ptr (), sizeof(buf));
+
+ if (recv_throughput_base == 0)
+ {
+ recv_throughput_base = now;
+ }
+ recv_latency.sample (now - recv_throughput_base,
+ now - buf[0]);
+ }
+ return 0;
+}
+
+int
+Pong_Recv_Callback::handle_destroy (void)
+{
+ ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::destroy\n"));
+ return 0;
+}
+
+// ****************************************************************
+
+Ping_Send::Ping_Send (void)
+ : TAO_FlowProducer ("Ping",
+ ping_protocols,
+ "UNS:ping")
+{
+}
+
+int
+Ping_Send::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n"));
+ callback = &this->callback_;
+ return 0;
+}
+
+Ping_Send_Callback::Ping_Send_Callback (void)
+ :count_ (0)
+{
+ this->timeout_ = ACE_Time_Value (2);
+
+ this->frame_.size (message_size);
+ this->frame_.wr_ptr (message_size);
+}
+
+void
+Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv,
+ void *&)
+{
+ tv = &this->timeout_;
+}
+
+int
+Ping_Send_Callback::handle_timeout (void *)
+{
+
+ this->count_++;
+
+ ACE_DEBUG ((LM_DEBUG, "Ping timeout frame %d\n", this->count_));
+
+ if (this->count_ > 10)
+ {
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+ }
+
+ ACE_hrtime_t stamp = ACE_OS::gethrtime ();
+ ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp));
+
+ int result = this->protocol_object_->send_frame (&this->frame_);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Ping_Send_Callback::handle_timeout - send_frame - %p\n",
+ ""),
+ -1);
+
+ if (send_throughput_base == 0)
+ {
+ send_throughput_base = stamp;
+ }
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+ send_latency.sample (now - send_throughput_base,
+ now - stamp);
+
+ return 0;
+}
+
+int
+Ping_Send_Callback::handle_end_stream (void)
+{
+ return 0;
+}