diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp new file mode 100644 index 00000000000..886394ae9f9 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp @@ -0,0 +1,347 @@ +// $Id$ + +#include "ping.h" +#include "orbsvcs/AV/Protocol_Factory.h" +#include "tao/ORB.h" +#include "tao/Strategies/advanced_resource.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" + +ACE_RCSID (Latency, + ping, + "$Id$") + +const char *ior_output_file = "ping.ior"; +const char *protocol = "RTP/UDP"; +int milliseconds = 100; +int respond = 1; +AVStreams::protocolSpec ping_protocols; +AVStreams::protocolSpec pong_protocols; + +Pong_Send_Callback pong_callback; + +ACE_hrtime_t recv_base = 0; +ACE_Throughput_Stats recv_latency; + + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.opt_arg (); + break; + + case 'r': + { + CORBA::ULong l = ping_protocols.length (); + ping_protocols.length (l + 1); + ping_protocols[l] = CORBA::string_dup (get_opts.opt_arg ()); + } + break; + + case 's': + { + CORBA::ULong l = pong_protocols.length (); + pong_protocols.length (l + 1); + pong_protocols[l] = CORBA::string_dup (get_opts.opt_arg ()); + } + break; + + case 't': + milliseconds = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'x': + respond = 0; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile> " + "-r <protocol=addr> " + "-s <protocol=addr> " + "-t <milliseconds> " + "\n", + argv [0]), + -1); + } + + + // If no protocols are specified use the default... + if (ping_protocols.length () == 0) + { + ping_protocols.length (1); + ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345"); + } + + if (pong_protocols.length () == 0) + { + pong_protocols.length (1); + pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456"); + } + + // Indicates sucessful parsing of the command line + return 0; +} + +int main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + + CORBA::ORB_var orb = CORBA::ORB_init (argc, + argv); + + parse_args (argc, argv); + + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var poa + = PortableServer::POA::_narrow (obj.in ()); + + PortableServer::POAManager_var mgr + = poa->the_POAManager (); + + mgr->activate (); + + TAO_AV_CORE::instance ()->init (orb.in (), + poa.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Register the video mmdevice object with the ORB + Reactive_Strategy *reactive_strategy; + ACE_NEW_RETURN (reactive_strategy, + Reactive_Strategy, + 1); + reactive_strategy->init (orb.in (), poa.in ()); + TAO_MMDevice *mmdevice_impl; + ACE_NEW_RETURN (mmdevice_impl, + TAO_MMDevice (reactive_strategy), + 1); + + AVStreams::MMDevice_var mmdevice = + mmdevice_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (mmdevice.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ())); + + // If the ior_output_file exists, output the ior to it + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + Ping_Recv_FDev* ping_fdev_impl; + ACE_NEW_RETURN (ping_fdev_impl, + Ping_Recv_FDev ("Ping"), + 1); + Pong_Send_FDev* pong_fdev_impl; + ACE_NEW_RETURN (pong_fdev_impl, + Pong_Send_FDev ("Pong"), + 1); + + AVStreams::FDev_var ping_fdev = + ping_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + AVStreams::FDev_var pong_fdev = + pong_fdev_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + mmdevice->add_fdev (ping_fdev.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (respond == 1) + { + mmdevice->add_fdev (pong_fdev.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + orb->run (); + ACE_TRY_CHECK; + + + ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . ")); + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_DEBUG ((LM_DEBUG, "done %d \n", gsf)); + + recv_latency.dump_results ("Receive", gsf); + + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Caught exception:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} + +// **************************************************************** + +Ping_Recv::Ping_Recv (void) + : TAO_FlowConsumer ("Ping", + ping_protocols, + "UNS:ping") +{ +} + +int +Ping_Recv::get_callback (const char *, + TAO_AV_Callback *&callback) +{ + ACE_DEBUG ((LM_DEBUG,"Ping_Recv::get_callback\n")); + callback = &this->callback_; + return 0; +} + +Ping_Recv_Callback::Ping_Recv_Callback (void) + : count_ (0) +{ +} + +int +Ping_Recv_Callback::handle_stop (void) +{ + ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::stop")); + TAO_AV_CORE::instance ()->orb ()->shutdown (); + + return 0; +} + +int +Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *, + const ACE_Addr &) +{ + this->count_++; + + ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::receive_frame %d\n", this->count_)); + + if (this->count_ < 10) + { + for (const ACE_Message_Block *i = frame; + i != 0; + i = i->cont ()) + { + ACE_hrtime_t stamp; + + if (i->length () < sizeof(stamp)) + return 0; + + ACE_OS::memcpy (&stamp, i->rd_ptr (), sizeof(stamp)); + + ACE_hrtime_t now = ACE_OS::gethrtime (); + if (recv_base == 0) + { + recv_base = now; + } + else + { + recv_latency.sample (now - recv_base, + now - stamp); + } + + if (respond == 1) + pong_callback.send_response (stamp); + } + } + else + TAO_AV_CORE::instance ()->orb ()->shutdown (); + return 0; +} + +int +Ping_Recv_Callback::handle_destroy (void) +{ + ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::destroy\n")); + return 0; +} + +// **************************************************************** + +Pong_Send::Pong_Send (void) + : TAO_FlowProducer ("Pong", + pong_protocols, + "UNS:pong") +{ +} + +int +Pong_Send::get_callback (const char *, + TAO_AV_Callback *&callback) +{ + ACE_DEBUG ((LM_DEBUG,"Pong_Send::get_callback\n")); + callback = &pong_callback; + return 0; +} + +void +Pong_Send_Callback::get_timeout (ACE_Time_Value *&tv, + void *&) +{ + // @@ ACE_NEW (tv, ACE_Time_Value (0, milliseconds * 1000)); + ACE_DEBUG ((LM_DEBUG,"Pong_Send_Callback::get_timeout\n")); + tv = 0; +} + +int +Pong_Send_Callback::handle_timeout (void *) +{ + // ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n")); + return 0; +} + +int +Pong_Send_Callback::handle_end_stream (void) +{ + return 0; +} + +int +Pong_Send_Callback::send_response (ACE_hrtime_t stamp) +{ + ACE_DEBUG ((LM_DEBUG, "pong send response)\n")); + + ACE_hrtime_t buf[2]; + + ACE_Message_Block mb (reinterpret_cast<char*> (buf), + sizeof(buf)); + + buf[0] = stamp; + buf[1] = ACE_OS::gethrtime (); + mb.wr_ptr (sizeof(buf)); + + int result = this->protocol_object_->send_frame (&mb); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Pong_Send_Callback::send - %p\n", + ""), + -1); + + return 0; +} |