diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp | 153 |
1 files changed, 107 insertions, 46 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp index d7ffbb78bda..975963a96b7 100644 --- a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp @@ -6,21 +6,30 @@ #include "tao/TAO.h" #include "ace/Get_Opt.h" #include "ace/High_Res_Timer.h" +#include "ace/Stats.h" ACE_RCSID(Latency, ping, "$Id$") const char *ior_output_file = "pong.ior"; const char *protocol = "RTP/UDP"; int milliseconds = 100; -AVStreams::protocolSpec recv_protocols; -AVStreams::protocolSpec send_protocols; +int message_size = 64; +int respond = 1; +AVStreams::protocolSpec pong_protocols; +AVStreams::protocolSpec ping_protocols; -ACE_Throughput_Stats latency; +ACE_hrtime_t recv_throughput_base = 0; +ACE_Throughput_Stats recv_latency; + +ACE_hrtime_t send_throughput_base = 0; +ACE_Throughput_Stats send_latency; + +CORBA::ORB_ptr the_orb = 0; int parse_args (int argc, char *argv[]) { - ACE_Get_Opt get_opts (argc, argv, "o:s:r:t:"); + ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:b:"); int c; while ((c = get_opts ()) != -1) @@ -32,17 +41,17 @@ parse_args (int argc, char *argv[]) case 'r': { - CORBA::ULong l = recv_protocols.length (); - recv_protocols.length (l + 1); - recv_protocols[l] = CORBA::string_dup (get_opts.optarg); + CORBA::ULong l = ping_protocols.length (); + ping_protocols.length (l + 1); + ping_protocols[l] = CORBA::string_dup (get_opts.optarg); } break; case 's': { - CORBA::ULong l = send_protocols.length (); - send_protocols.length (l + 1); - send_protocols[l] = CORBA::string_dup (get_opts.optarg); + CORBA::ULong l = pong_protocols.length (); + pong_protocols.length (l + 1); + pong_protocols[l] = CORBA::string_dup (get_opts.optarg); } break; @@ -50,6 +59,19 @@ parse_args (int argc, char *argv[]) milliseconds = ACE_OS::atoi (get_opts.optarg); break; + case 'b': + message_size = ACE_OS::atoi (get_opts.optarg); + if (message_size < sizeof(ACE_hrtime_t)) + { + ACE_DEBUG ((LM_DEBUG, "Invalid message size\n")); + message_size = 64; + } + break; + + case 'x': + respond = 0; + break; + case '?': default: ACE_ERROR_RETURN ((LM_ERROR, @@ -65,16 +87,16 @@ parse_args (int argc, char *argv[]) // If no protocols are specified use the default... - if (recv_protocols.length () == 0) + if (pong_protocols.length () == 0) { - recv_protocols.length (1); - recv_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456"); + pong_protocols.length (1); + pong_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456"); } - if (send_protocols.length () == 0) + if (ping_protocols.length () == 0) { - send_protocols.length (1); - send_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345"); + ping_protocols.length (1); + ping_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345"); } // Indicates sucessful parsing of the command line @@ -95,6 +117,9 @@ int main (int argc, char *argv[]) av_core->orb_manager (); CORBA::ORB_var orb = orb_manager->orb (); + the_orb = orb.in (); + // No copying, because the global variable is not used after the + // event loop finishes... CORBA::Object_var poa_object = orb->resolve_initial_references("RootPOA", ACE_TRY_ENV); @@ -113,10 +138,17 @@ int main (int argc, char *argv[]) // Register the video mmdevice object with the ORB - Reactive_Strategy reactive_strategy (orb_manager); - TAO_MMDevice mmdevice_impl (&reactive_strategy); + Reactive_Strategy *reactive_strategy; + ACE_NEW_RETURN (reactive_strategy, + Reactive_Strategy (orb_manager), + 1); + TAO_MMDevice *mmdevice_impl; + ACE_NEW_RETURN (mmdevice_impl, + TAO_MMDevice (reactive_strategy), + 1); + AVStreams::MMDevice_var mmdevice = - mmdevice_impl._this (ACE_TRY_ENV); + mmdevice_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; CORBA::String_var ior = @@ -138,22 +170,32 @@ int main (int argc, char *argv[]) ACE_OS::fclose (output_file); } - Pong_Recv_FDev pong_fdev_impl ("Pong"); - Ping_Send_FDev ping_fdev_impl ("Ping"); + Pong_Recv_FDev* pong_fdev_impl; + ACE_NEW_RETURN (pong_fdev_impl, + Pong_Recv_FDev ("Pong"), + 1); + Ping_Send_FDev* ping_fdev_impl; + ACE_NEW_RETURN (ping_fdev_impl, + Ping_Send_FDev ("Ping"), + 1); AVStreams::FDev_var ping_fdev = - ping_fdev_impl._this (ACE_TRY_ENV); + ping_fdev_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; AVStreams::FDev_var pong_fdev = - pong_fdev_impl._this (ACE_TRY_ENV); + pong_fdev_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; mmdevice->add_fdev (ping_fdev.in (), ACE_TRY_ENV); ACE_TRY_CHECK; - mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV); - ACE_TRY_CHECK; + if (respond == 1) + { + mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + } - if (orb->run () == -1) + ACE_Time_Value tv (120, 0); + if (orb->run (tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); ACE_DEBUG ((LM_DEBUG, "event loop finished\n")); @@ -161,10 +203,12 @@ int main (int argc, char *argv[]) ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); ACE_DEBUG ((LM_DEBUG, "done\n")); - latency.dump_results ("Ping-Pong", gsf); + recv_latency.dump_results ("Receive", gsf); - root_poa->destroy (1, 1, ACE_TRY_ENV); - ACE_TRY_CHECK; + send_latency.dump_results ("Send", gsf); + + // root_poa->destroy (1, 1, ACE_TRY_ENV); + // ACE_TRY_CHECK; } ACE_CATCHANY { @@ -181,7 +225,7 @@ int main (int argc, char *argv[]) Pong_Recv::Pong_Recv (void) : TAO_FlowConsumer ("Pong", - recv_protocols, + pong_protocols, "UNS:pong") { } @@ -190,7 +234,7 @@ int Pong_Recv::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n")); + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n")); callback = &this->callback_; return 0; } @@ -198,7 +242,8 @@ Pong_Recv::get_callback (const char *, int Pong_Recv_Callback::handle_stop (void) { - ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop")); + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop")); + the_orb->shutdown (); return 0; } @@ -207,7 +252,7 @@ Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame, TAO_AV_frame_info *, const ACE_Addr &) { - ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n")); + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n")); ACE_hrtime_t now = ACE_OS::gethrtime (); for (const ACE_Message_Block *i = frame; @@ -224,8 +269,12 @@ Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame, ACE_OS::memcpy (buf, frame->rd_ptr (), sizeof(buf)); - latency.sample (now, - now - buf[0]); + if (recv_throughput_base == 0) + { + recv_throughput_base = now; + } + recv_latency.sample (now - recv_throughput_base, + now - buf[0]); } return 0; } @@ -241,7 +290,7 @@ Pong_Recv_Callback::handle_destroy (void) Ping_Send::Ping_Send (void) : TAO_FlowProducer ("Ping", - send_protocols, + ping_protocols, "UNS:ping") { } @@ -250,37 +299,49 @@ int Ping_Send::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n")); + // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n")); callback = &this->callback_; return 0; } +Ping_Send_Callback::Ping_Send_Callback (void) +{ + this->timeout_ = ACE_Time_Value (0, milliseconds * 1000); + + this->frame_.size (message_size); + this->frame_.wr_ptr (message_size); +} + void Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv, void *&) { - // @@ Naga: why the memory allocation! - ACE_NEW (tv, ACE_Time_Value (0, milliseconds * 1000)); + tv = &this->timeout_; } int Ping_Send_Callback::handle_timeout (void *arg) { - ACE_DEBUG ((LM_DEBUG, "ping timeout\n")); + // ACE_DEBUG ((LM_DEBUG, "ping timeout\n")); - ACE_hrtime_t buf[1]; + ACE_hrtime_t stamp = ACE_OS::gethrtime (); + ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp)); - buf[0] = ACE_OS::gethrtime (); - ACE_Message_Block mb (ACE_reinterpret_cast (char*,buf), - sizeof(buf)); - - int result = this->protocol_object_->send_frame (&mb); + int result = this->protocol_object_->send_frame (&this->frame_); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR, "FTP_Client_Flow_Handler::send - %p\n", ""), -1); + if (send_throughput_base == 0) + { + send_throughput_base = stamp; + } + ACE_hrtime_t now = ACE_OS::gethrtime (); + send_latency.sample (now - send_throughput_base, + now - stamp); + return 0; } |