From 1061619f23b837720d6a0ba5d594805db8e88cd2 Mon Sep 17 00:00:00 2001 From: coryan Date: Fri, 13 Aug 1999 22:40:08 +0000 Subject: Implemented throughput and one way measurements. Allowed the user to set the protocol from the command line of the control program. --- TAO/orbsvcs/tests/AVStreams/Latency/Makefile | 12 +- TAO/orbsvcs/tests/AVStreams/Latency/control.cpp | 33 +++-- TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp | 128 ++++++++++++++------ TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp | 153 +++++++++++++++++------- TAO/orbsvcs/tests/AVStreams/Latency/pong.h | 9 +- 5 files changed, 236 insertions(+), 99 deletions(-) diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/Makefile b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile index 0f6c09c15c3..1056ec71782 100644 --- a/TAO/orbsvcs/tests/AVStreams/Latency/Makefile +++ b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile @@ -279,6 +279,8 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs $(TAO_ROOT)/tao/Object_KeyC.h \ $(TAO_ROOT)/tao/Object_KeyC.i \ $(TAO_ROOT)/tao/GIOP.h \ + $(TAO_ROOT)/tao/IOPC.h \ + $(TAO_ROOT)/tao/IOPC.i \ $(TAO_ROOT)/tao/GIOP.i \ $(TAO_ROOT)/tao/Server_Request.i \ $(TAO_ROOT)/tao/Marshal.h \ @@ -332,8 +334,6 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs $(TAO_ROOT)/tao/TimeBaseS_T.cpp \ $(TAO_ROOT)/tao/TimeBaseS.i \ $(TAO_ROOT)/tao/MessagingC.h \ - $(TAO_ROOT)/tao/IOPC.h \ - $(TAO_ROOT)/tao/IOPC.i \ $(TAO_ROOT)/tao/PollableC.h \ $(TAO_ROOT)/tao/MessagingC.i \ $(TAO_ROOT)/tao/MessagingS.i \ @@ -637,6 +637,8 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs $(TAO_ROOT)/tao/Object_KeyC.h \ $(TAO_ROOT)/tao/Object_KeyC.i \ $(TAO_ROOT)/tao/GIOP.h \ + $(TAO_ROOT)/tao/IOPC.h \ + $(TAO_ROOT)/tao/IOPC.i \ $(TAO_ROOT)/tao/GIOP.i \ $(TAO_ROOT)/tao/Server_Request.i \ $(TAO_ROOT)/tao/Marshal.h \ @@ -690,8 +692,6 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs $(TAO_ROOT)/tao/TimeBaseS_T.cpp \ $(TAO_ROOT)/tao/TimeBaseS.i \ $(TAO_ROOT)/tao/MessagingC.h \ - $(TAO_ROOT)/tao/IOPC.h \ - $(TAO_ROOT)/tao/IOPC.i \ $(TAO_ROOT)/tao/PollableC.h \ $(TAO_ROOT)/tao/MessagingC.i \ $(TAO_ROOT)/tao/MessagingS.i \ @@ -999,6 +999,8 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs $(TAO_ROOT)/tao/Object_KeyC.h \ $(TAO_ROOT)/tao/Object_KeyC.i \ $(TAO_ROOT)/tao/GIOP.h \ + $(TAO_ROOT)/tao/IOPC.h \ + $(TAO_ROOT)/tao/IOPC.i \ $(TAO_ROOT)/tao/GIOP.i \ $(TAO_ROOT)/tao/Server_Request.i \ $(TAO_ROOT)/tao/Marshal.h \ @@ -1052,8 +1054,6 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs $(TAO_ROOT)/tao/TimeBaseS_T.cpp \ $(TAO_ROOT)/tao/TimeBaseS.i \ $(TAO_ROOT)/tao/MessagingC.h \ - $(TAO_ROOT)/tao/IOPC.h \ - $(TAO_ROOT)/tao/IOPC.i \ $(TAO_ROOT)/tao/PollableC.h \ $(TAO_ROOT)/tao/MessagingC.i \ $(TAO_ROOT)/tao/MessagingS.i \ diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp index f55cc39902d..0cec49350d2 100644 --- a/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp @@ -12,11 +12,14 @@ const char *ping_ior = "file://ping.ior"; const char *pong_ior = "file://pong.ior"; const char *ping_address = "224.9.9.2:12345"; const char *pong_address = "224.9.9.2:23456"; +const char *protocol = "UDP"; + +int milliseconds = 30000; int parse_args (int argc, char *argv[]) { - ACE_Get_Opt get_opts (argc, argv, "f:g:s:r:"); + ACE_Get_Opt get_opts (argc, argv, "f:g:s:r:t:p:"); int c; while ((c = get_opts ()) != -1) @@ -30,14 +33,22 @@ parse_args (int argc, char *argv[]) pong_ior = get_opts.optarg; break; - case 's': + case 'r': ping_address = get_opts.optarg; break; - case 'r': + case 's': pong_address = get_opts.optarg; break; + case 't': + milliseconds = ACE_OS::atoi (get_opts.optarg); + break; + + case 'p': + protocol = get_opts.optarg; + break; + case '?': default: ACE_ERROR_RETURN ((LM_ERROR, @@ -46,6 +57,8 @@ parse_args (int argc, char *argv[]) "-g " "-s " "-r " + "-t " + "-p protocols " "\n", argv [0]), -1); @@ -64,6 +77,8 @@ int main (int argc, char *argv[]) av_core->init (argc, argv, ACE_TRY_ENV); ACE_TRY_CHECK; + parse_args (argc, argv); + TAO_ORB_Manager* orb_manager = av_core->orb_manager (); @@ -94,7 +109,7 @@ int main (int argc, char *argv[]) "IN", "UNS:ping", "", - "UDP", + protocol, &ping_addr); flow_spec[0] = CORBA::string_dup (ping.entry_to_string ()); @@ -104,12 +119,12 @@ int main (int argc, char *argv[]) "OUT", "UNS:pong", "", - "UDP", + protocol, &pong_addr); flow_spec[1] = CORBA::string_dup (pong.entry_to_string ()); TAO_StreamCtrl stream_control_impl; - + AVStreams::StreamCtrl_var stream_control = stream_control_impl._this (ACE_TRY_ENV); ACE_TRY_CHECK; @@ -142,7 +157,7 @@ int main (int argc, char *argv[]) stream_control->start (flow_spec, ACE_TRY_ENV); ACE_TRY_CHECK; - ACE_Time_Value tv (10, 0); + ACE_Time_Value tv (0, milliseconds * 1000); if (orb->run (tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); ACE_DEBUG ((LM_DEBUG, "event loop finished\n")); @@ -151,8 +166,8 @@ int main (int argc, char *argv[]) stream_control->stop (flow_spec, ACE_TRY_ENV); ACE_TRY_CHECK; - root_poa->destroy (1, 1, ACE_TRY_ENV); - ACE_TRY_CHECK; + // root_poa->destroy (1, 1, ACE_TRY_ENV); + // ACE_TRY_CHECK; } ACE_CATCHANY { diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp index 20f27590825..80799fd09d2 100644 --- a/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp @@ -5,21 +5,29 @@ #include "tao/corba.h" #include "tao/TAO.h" #include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Stats.h" ACE_RCSID(Latency, ping, "$Id$") const char *ior_output_file = "ping.ior"; const char *protocol = "RTP/UDP"; int milliseconds = 100; -AVStreams::protocolSpec recv_protocols; -AVStreams::protocolSpec send_protocols; +int respond = 1; +AVStreams::protocolSpec ping_protocols; +AVStreams::protocolSpec pong_protocols; Pong_Send_Callback pong_callback; +ACE_hrtime_t recv_base = 0; +ACE_Throughput_Stats recv_latency; + +CORBA::ORB_ptr the_orb = 0; + int parse_args (int argc, char *argv[]) { - ACE_Get_Opt get_opts (argc, argv, "o:s:r:t:"); + ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:"); int c; while ((c = get_opts ()) != -1) @@ -31,17 +39,17 @@ parse_args (int argc, char *argv[]) case 'r': { - CORBA::ULong l = recv_protocols.length (); - recv_protocols.length (l + 1); - recv_protocols[l] = CORBA::string_dup (get_opts.optarg); + CORBA::ULong l = ping_protocols.length (); + ping_protocols.length (l + 1); + ping_protocols[l] = CORBA::string_dup (get_opts.optarg); } break; case 's': { - CORBA::ULong l = send_protocols.length (); - send_protocols.length (l + 1); - send_protocols[l] = CORBA::string_dup (get_opts.optarg); + CORBA::ULong l = pong_protocols.length (); + pong_protocols.length (l + 1); + pong_protocols[l] = CORBA::string_dup (get_opts.optarg); } break; @@ -49,6 +57,10 @@ parse_args (int argc, char *argv[]) milliseconds = ACE_OS::atoi (get_opts.optarg); break; + case 'x': + respond = 0; + break; + case '?': default: ACE_ERROR_RETURN ((LM_ERROR, @@ -64,16 +76,16 @@ parse_args (int argc, char *argv[]) // If no protocols are specified use the default... - if (recv_protocols.length () == 0) + if (ping_protocols.length () == 0) { - recv_protocols.length (1); - recv_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345"); + ping_protocols.length (1); + ping_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345"); } - if (send_protocols.length () == 0) + if (pong_protocols.length () == 0) { - send_protocols.length (1); - send_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456"); + pong_protocols.length (1); + pong_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456"); } // Indicates sucessful parsing of the command line @@ -94,6 +106,9 @@ int main (int argc, char *argv[]) av_core->orb_manager (); CORBA::ORB_var orb = orb_manager->orb (); + the_orb = orb.in (); + // No copying, because the global variable is not used after the + // event loop finishes... CORBA::Object_var poa_object = orb->resolve_initial_references("RootPOA", ACE_TRY_ENV); @@ -112,10 +127,17 @@ int main (int argc, char *argv[]) // Register the video mmdevice object with the ORB - Reactive_Strategy reactive_strategy (orb_manager); - TAO_MMDevice mmdevice_impl (&reactive_strategy); + Reactive_Strategy *reactive_strategy; + ACE_NEW_RETURN (reactive_strategy, + Reactive_Strategy (orb_manager), + 1); + TAO_MMDevice *mmdevice_impl; + ACE_NEW_RETURN (mmdevice_impl, + TAO_MMDevice (reactive_strategy), + 1); + AVStreams::MMDevice_var mmdevice = - mmdevice_impl._this (ACE_TRY_ENV); + mmdevice_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; CORBA::String_var ior = @@ -137,27 +159,44 @@ int main (int argc, char *argv[]) ACE_OS::fclose (output_file); } - Ping_Recv_FDev ping_fdev_impl ("Ping"); - Pong_Send_FDev pong_fdev_impl ("Pong"); + Ping_Recv_FDev* ping_fdev_impl; + ACE_NEW_RETURN (ping_fdev_impl, + Ping_Recv_FDev ("Ping"), + 1); + Pong_Send_FDev* pong_fdev_impl; + ACE_NEW_RETURN (pong_fdev_impl, + Pong_Send_FDev ("Pong"), + 1); AVStreams::FDev_var ping_fdev = - ping_fdev_impl._this (ACE_TRY_ENV); + ping_fdev_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; AVStreams::FDev_var pong_fdev = - pong_fdev_impl._this (ACE_TRY_ENV); + pong_fdev_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; mmdevice->add_fdev (ping_fdev.in (), ACE_TRY_ENV); ACE_TRY_CHECK; - mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV); - ACE_TRY_CHECK; - if (orb->run () == -1) + if (respond == 1) + { + mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + ACE_Time_Value tv (120, 0); + if (orb->run (tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); ACE_DEBUG ((LM_DEBUG, "event loop finished\n")); - root_poa->destroy (1, 1, ACE_TRY_ENV); - ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . ")); + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_DEBUG ((LM_DEBUG, "done\n")); + + recv_latency.dump_results ("Receive", gsf); + + // root_poa->destroy (1, 1, ACE_TRY_ENV); + // ACE_TRY_CHECK; } ACE_CATCHANY { @@ -174,7 +213,7 @@ int main (int argc, char *argv[]) Ping_Recv::Ping_Recv (void) : TAO_FlowConsumer ("Ping", - recv_protocols, + ping_protocols, "UNS:ping") { } @@ -183,7 +222,7 @@ int Ping_Recv::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_DEBUG ((LM_DEBUG,"Ping_Recv::get_callback\n")); + // ACE_DEBUG ((LM_DEBUG,"Ping_Recv::get_callback\n")); callback = &this->callback_; return 0; } @@ -192,6 +231,8 @@ int Ping_Recv_Callback::handle_stop (void) { ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::stop")); + the_orb->shutdown (); + return 0; } @@ -200,7 +241,7 @@ Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame, TAO_AV_frame_info *, const ACE_Addr &) { - ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::receive_frame\n")); + // ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::receive_frame\n")); for (const ACE_Message_Block *i = frame; frame != 0; @@ -213,7 +254,19 @@ Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame, ACE_OS::memcpy (&stamp, frame->rd_ptr (), sizeof(stamp)); - pong_callback.send_response (stamp); + ACE_hrtime_t now = ACE_OS::gethrtime (); + if (recv_base == 0) + { + recv_base = now; + } + else + { + recv_latency.sample (now - recv_base, + now - stamp); + } + + if (respond == 1) + pong_callback.send_response (stamp); } return 0; } @@ -221,7 +274,7 @@ Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame, int Ping_Recv_Callback::handle_destroy (void) { - ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::destroy\n")); + // ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::destroy\n")); return 0; } @@ -229,7 +282,7 @@ Ping_Recv_Callback::handle_destroy (void) Pong_Send::Pong_Send (void) : TAO_FlowProducer ("Pong", - send_protocols, + pong_protocols, "UNS:pong") { } @@ -238,7 +291,7 @@ int Pong_Send::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_DEBUG ((LM_DEBUG,"Pong_Send::get_callback\n")); + // ACE_DEBUG ((LM_DEBUG,"Pong_Send::get_callback\n")); callback = &pong_callback; return 0; } @@ -254,7 +307,7 @@ Pong_Send_Callback::get_timeout (ACE_Time_Value *&tv, int Pong_Send_Callback::handle_timeout (void *arg) { - ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n")); + // ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n")); return 0; } @@ -267,15 +320,16 @@ Pong_Send_Callback::handle_end_stream (void) int Pong_Send_Callback::send_response (ACE_hrtime_t stamp) { - ACE_DEBUG ((LM_DEBUG, "pong send response)\n")); + // ACE_DEBUG ((LM_DEBUG, "pong send response)\n")); ACE_hrtime_t buf[2]; - buf[0] = stamp; ACE_Message_Block mb (ACE_reinterpret_cast (char*,buf), sizeof(buf)); + buf[0] = stamp; buf[1] = ACE_OS::gethrtime (); + mb.wr_ptr (sizeof(buf)); int result = this->protocol_object_->send_frame (&mb); if (result < 0) diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp index d7ffbb78bda..975963a96b7 100644 --- a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp @@ -6,21 +6,30 @@ #include "tao/TAO.h" #include "ace/Get_Opt.h" #include "ace/High_Res_Timer.h" +#include "ace/Stats.h" ACE_RCSID(Latency, ping, "$Id$") const char *ior_output_file = "pong.ior"; const char *protocol = "RTP/UDP"; int milliseconds = 100; -AVStreams::protocolSpec recv_protocols; -AVStreams::protocolSpec send_protocols; +int message_size = 64; +int respond = 1; +AVStreams::protocolSpec pong_protocols; +AVStreams::protocolSpec ping_protocols; -ACE_Throughput_Stats latency; +ACE_hrtime_t recv_throughput_base = 0; +ACE_Throughput_Stats recv_latency; + +ACE_hrtime_t send_throughput_base = 0; +ACE_Throughput_Stats send_latency; + +CORBA::ORB_ptr the_orb = 0; int parse_args (int argc, char *argv[]) { - ACE_Get_Opt get_opts (argc, argv, "o:s:r:t:"); + ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:b:"); int c; while ((c = get_opts ()) != -1) @@ -32,17 +41,17 @@ parse_args (int argc, char *argv[]) case 'r': { - CORBA::ULong l = recv_protocols.length (); - recv_protocols.length (l + 1); - recv_protocols[l] = CORBA::string_dup (get_opts.optarg); + CORBA::ULong l = ping_protocols.length (); + ping_protocols.length (l + 1); + ping_protocols[l] = CORBA::string_dup (get_opts.optarg); } break; case 's': { - CORBA::ULong l = send_protocols.length (); - send_protocols.length (l + 1); - send_protocols[l] = CORBA::string_dup (get_opts.optarg); + CORBA::ULong l = pong_protocols.length (); + pong_protocols.length (l + 1); + pong_protocols[l] = CORBA::string_dup (get_opts.optarg); } break; @@ -50,6 +59,19 @@ parse_args (int argc, char *argv[]) milliseconds = ACE_OS::atoi (get_opts.optarg); break; + case 'b': + message_size = ACE_OS::atoi (get_opts.optarg); + if (message_size < sizeof(ACE_hrtime_t)) + { + ACE_DEBUG ((LM_DEBUG, "Invalid message size\n")); + message_size = 64; + } + break; + + case 'x': + respond = 0; + break; + case '?': default: ACE_ERROR_RETURN ((LM_ERROR, @@ -65,16 +87,16 @@ parse_args (int argc, char *argv[]) // If no protocols are specified use the default... - if (recv_protocols.length () == 0) + if (pong_protocols.length () == 0) { - recv_protocols.length (1); - recv_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456"); + pong_protocols.length (1); + pong_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456"); } - if (send_protocols.length () == 0) + if (ping_protocols.length () == 0) { - send_protocols.length (1); - send_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345"); + ping_protocols.length (1); + ping_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345"); } // Indicates sucessful parsing of the command line @@ -95,6 +117,9 @@ int main (int argc, char *argv[]) av_core->orb_manager (); CORBA::ORB_var orb = orb_manager->orb (); + the_orb = orb.in (); + // No copying, because the global variable is not used after the + // event loop finishes... CORBA::Object_var poa_object = orb->resolve_initial_references("RootPOA", ACE_TRY_ENV); @@ -113,10 +138,17 @@ int main (int argc, char *argv[]) // Register the video mmdevice object with the ORB - Reactive_Strategy reactive_strategy (orb_manager); - TAO_MMDevice mmdevice_impl (&reactive_strategy); + Reactive_Strategy *reactive_strategy; + ACE_NEW_RETURN (reactive_strategy, + Reactive_Strategy (orb_manager), + 1); + TAO_MMDevice *mmdevice_impl; + ACE_NEW_RETURN (mmdevice_impl, + TAO_MMDevice (reactive_strategy), + 1); + AVStreams::MMDevice_var mmdevice = - mmdevice_impl._this (ACE_TRY_ENV); + mmdevice_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; CORBA::String_var ior = @@ -138,22 +170,32 @@ int main (int argc, char *argv[]) ACE_OS::fclose (output_file); } - Pong_Recv_FDev pong_fdev_impl ("Pong"); - Ping_Send_FDev ping_fdev_impl ("Ping"); + Pong_Recv_FDev* pong_fdev_impl; + ACE_NEW_RETURN (pong_fdev_impl, + Pong_Recv_FDev ("Pong"), + 1); + Ping_Send_FDev* ping_fdev_impl; + ACE_NEW_RETURN (ping_fdev_impl, + Ping_Send_FDev ("Ping"), + 1); AVStreams::FDev_var ping_fdev = - ping_fdev_impl._this (ACE_TRY_ENV); + ping_fdev_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; AVStreams::FDev_var pong_fdev = - pong_fdev_impl._this (ACE_TRY_ENV); + pong_fdev_impl->_this (ACE_TRY_ENV); ACE_TRY_CHECK; mmdevice->add_fdev (ping_fdev.in (), ACE_TRY_ENV); ACE_TRY_CHECK; - mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV); - ACE_TRY_CHECK; + if (respond == 1) + { + mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + } - if (orb->run () == -1) + ACE_Time_Value tv (120, 0); + if (orb->run (tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); ACE_DEBUG ((LM_DEBUG, "event loop finished\n")); @@ -161,10 +203,12 @@ int main (int argc, char *argv[]) ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); ACE_DEBUG ((LM_DEBUG, "done\n")); - latency.dump_results ("Ping-Pong", gsf); + recv_latency.dump_results ("Receive", gsf); - root_poa->destroy (1, 1, ACE_TRY_ENV); - ACE_TRY_CHECK; + send_latency.dump_results ("Send", gsf); + + // root_poa->destroy (1, 1, ACE_TRY_ENV); + // ACE_TRY_CHECK; } ACE_CATCHANY { @@ -181,7 +225,7 @@ int main (int argc, char *argv[]) Pong_Recv::Pong_Recv (void) : TAO_FlowConsumer ("Pong", - recv_protocols, + pong_protocols, "UNS:pong") { } @@ -190,7 +234,7 @@ int Pong_Recv::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n")); + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n")); callback = &this->callback_; return 0; } @@ -198,7 +242,8 @@ Pong_Recv::get_callback (const char *, int Pong_Recv_Callback::handle_stop (void) { - ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop")); + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop")); + the_orb->shutdown (); return 0; } @@ -207,7 +252,7 @@ Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame, TAO_AV_frame_info *, const ACE_Addr &) { - ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n")); + // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n")); ACE_hrtime_t now = ACE_OS::gethrtime (); for (const ACE_Message_Block *i = frame; @@ -224,8 +269,12 @@ Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame, ACE_OS::memcpy (buf, frame->rd_ptr (), sizeof(buf)); - latency.sample (now, - now - buf[0]); + if (recv_throughput_base == 0) + { + recv_throughput_base = now; + } + recv_latency.sample (now - recv_throughput_base, + now - buf[0]); } return 0; } @@ -241,7 +290,7 @@ Pong_Recv_Callback::handle_destroy (void) Ping_Send::Ping_Send (void) : TAO_FlowProducer ("Ping", - send_protocols, + ping_protocols, "UNS:ping") { } @@ -250,37 +299,49 @@ int Ping_Send::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n")); + // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n")); callback = &this->callback_; return 0; } +Ping_Send_Callback::Ping_Send_Callback (void) +{ + this->timeout_ = ACE_Time_Value (0, milliseconds * 1000); + + this->frame_.size (message_size); + this->frame_.wr_ptr (message_size); +} + void Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv, void *&) { - // @@ Naga: why the memory allocation! - ACE_NEW (tv, ACE_Time_Value (0, milliseconds * 1000)); + tv = &this->timeout_; } int Ping_Send_Callback::handle_timeout (void *arg) { - ACE_DEBUG ((LM_DEBUG, "ping timeout\n")); + // ACE_DEBUG ((LM_DEBUG, "ping timeout\n")); - ACE_hrtime_t buf[1]; + ACE_hrtime_t stamp = ACE_OS::gethrtime (); + ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp)); - buf[0] = ACE_OS::gethrtime (); - ACE_Message_Block mb (ACE_reinterpret_cast (char*,buf), - sizeof(buf)); - - int result = this->protocol_object_->send_frame (&mb); + int result = this->protocol_object_->send_frame (&this->frame_); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR, "FTP_Client_Flow_Handler::send - %p\n", ""), -1); + if (send_throughput_base == 0) + { + send_throughput_base = stamp; + } + ACE_hrtime_t now = ACE_OS::gethrtime (); + send_latency.sample (now - send_throughput_base, + now - stamp); + return 0; } diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.h b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h index 5390bec5391..af52b9a70f7 100644 --- a/TAO/orbsvcs/tests/AVStreams/Latency/pong.h +++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h @@ -19,7 +19,6 @@ #include "orbsvcs/AV/AVStreams_i.h" #include "orbsvcs/AV/Policy.h" #include "orbsvcs/AV/Flows_T.h" -#include "ace/Stats.h" class Pong_Recv_Callback : public TAO_AV_Callback { @@ -47,11 +46,19 @@ private: class Ping_Send_Callback : public TAO_AV_Callback { public: + Ping_Send_Callback (void); virtual int handle_timeout (void *arg); virtual int handle_end_stream (void); virtual void get_timeout (ACE_Time_Value *&tv, void *&arg); + +private: + ACE_Time_Value timeout_; + // the timeout value + + ACE_Message_Block frame_; + // Pre-allocate the message block to send... }; class Ping_Send : public TAO_FlowProducer -- cgit v1.2.1