summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-13 22:40:08 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-13 22:40:08 +0000
commit1061619f23b837720d6a0ba5d594805db8e88cd2 (patch)
tree88d15db277649f78e2897de99a21927c68b567fc
parentf0eceadff68bb4ceaf895a1a6dddb64fbf1808f1 (diff)
downloadATCD-pluggable_av.tar.gz
Implemented throughput and one way measurements.pluggable_av
Allowed the user to set the protocol from the command line of the control program.
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/Makefile12
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/control.cpp33
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp128
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp153
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Latency/pong.h9
5 files changed, 236 insertions, 99 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/Makefile b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile
index 0f6c09c15c3..1056ec71782 100644
--- a/TAO/orbsvcs/tests/AVStreams/Latency/Makefile
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/Makefile
@@ -279,6 +279,8 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs
$(TAO_ROOT)/tao/Object_KeyC.h \
$(TAO_ROOT)/tao/Object_KeyC.i \
$(TAO_ROOT)/tao/GIOP.h \
+ $(TAO_ROOT)/tao/IOPC.h \
+ $(TAO_ROOT)/tao/IOPC.i \
$(TAO_ROOT)/tao/GIOP.i \
$(TAO_ROOT)/tao/Server_Request.i \
$(TAO_ROOT)/tao/Marshal.h \
@@ -332,8 +334,6 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs
$(TAO_ROOT)/tao/TimeBaseS_T.cpp \
$(TAO_ROOT)/tao/TimeBaseS.i \
$(TAO_ROOT)/tao/MessagingC.h \
- $(TAO_ROOT)/tao/IOPC.h \
- $(TAO_ROOT)/tao/IOPC.i \
$(TAO_ROOT)/tao/PollableC.h \
$(TAO_ROOT)/tao/MessagingC.i \
$(TAO_ROOT)/tao/MessagingS.i \
@@ -637,6 +637,8 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs
$(TAO_ROOT)/tao/Object_KeyC.h \
$(TAO_ROOT)/tao/Object_KeyC.i \
$(TAO_ROOT)/tao/GIOP.h \
+ $(TAO_ROOT)/tao/IOPC.h \
+ $(TAO_ROOT)/tao/IOPC.i \
$(TAO_ROOT)/tao/GIOP.i \
$(TAO_ROOT)/tao/Server_Request.i \
$(TAO_ROOT)/tao/Marshal.h \
@@ -690,8 +692,6 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs
$(TAO_ROOT)/tao/TimeBaseS_T.cpp \
$(TAO_ROOT)/tao/TimeBaseS.i \
$(TAO_ROOT)/tao/MessagingC.h \
- $(TAO_ROOT)/tao/IOPC.h \
- $(TAO_ROOT)/tao/IOPC.i \
$(TAO_ROOT)/tao/PollableC.h \
$(TAO_ROOT)/tao/MessagingC.i \
$(TAO_ROOT)/tao/MessagingS.i \
@@ -999,6 +999,8 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs
$(TAO_ROOT)/tao/Object_KeyC.h \
$(TAO_ROOT)/tao/Object_KeyC.i \
$(TAO_ROOT)/tao/GIOP.h \
+ $(TAO_ROOT)/tao/IOPC.h \
+ $(TAO_ROOT)/tao/IOPC.i \
$(TAO_ROOT)/tao/GIOP.i \
$(TAO_ROOT)/tao/Server_Request.i \
$(TAO_ROOT)/tao/Marshal.h \
@@ -1052,8 +1054,6 @@ CPPFLAGS += -I$(TAO_ROOT)/orbsvcs
$(TAO_ROOT)/tao/TimeBaseS_T.cpp \
$(TAO_ROOT)/tao/TimeBaseS.i \
$(TAO_ROOT)/tao/MessagingC.h \
- $(TAO_ROOT)/tao/IOPC.h \
- $(TAO_ROOT)/tao/IOPC.i \
$(TAO_ROOT)/tao/PollableC.h \
$(TAO_ROOT)/tao/MessagingC.i \
$(TAO_ROOT)/tao/MessagingS.i \
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp
index f55cc39902d..0cec49350d2 100644
--- a/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/control.cpp
@@ -12,11 +12,14 @@ const char *ping_ior = "file://ping.ior";
const char *pong_ior = "file://pong.ior";
const char *ping_address = "224.9.9.2:12345";
const char *pong_address = "224.9.9.2:23456";
+const char *protocol = "UDP";
+
+int milliseconds = 30000;
int
parse_args (int argc, char *argv[])
{
- ACE_Get_Opt get_opts (argc, argv, "f:g:s:r:");
+ ACE_Get_Opt get_opts (argc, argv, "f:g:s:r:t:p:");
int c;
while ((c = get_opts ()) != -1)
@@ -30,14 +33,22 @@ parse_args (int argc, char *argv[])
pong_ior = get_opts.optarg;
break;
- case 's':
+ case 'r':
ping_address = get_opts.optarg;
break;
- case 'r':
+ case 's':
pong_address = get_opts.optarg;
break;
+ case 't':
+ milliseconds = ACE_OS::atoi (get_opts.optarg);
+ break;
+
+ case 'p':
+ protocol = get_opts.optarg;
+ break;
+
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
@@ -46,6 +57,8 @@ parse_args (int argc, char *argv[])
"-g <ping_ior> "
"-s <ping_address> "
"-r <pong_address> "
+ "-t <milliseconds> "
+ "-p protocols "
"\n",
argv [0]),
-1);
@@ -64,6 +77,8 @@ int main (int argc, char *argv[])
av_core->init (argc, argv, ACE_TRY_ENV);
ACE_TRY_CHECK;
+ parse_args (argc, argv);
+
TAO_ORB_Manager* orb_manager =
av_core->orb_manager ();
@@ -94,7 +109,7 @@ int main (int argc, char *argv[])
"IN",
"UNS:ping",
"",
- "UDP",
+ protocol,
&ping_addr);
flow_spec[0] = CORBA::string_dup (ping.entry_to_string ());
@@ -104,12 +119,12 @@ int main (int argc, char *argv[])
"OUT",
"UNS:pong",
"",
- "UDP",
+ protocol,
&pong_addr);
flow_spec[1] = CORBA::string_dup (pong.entry_to_string ());
TAO_StreamCtrl stream_control_impl;
-
+
AVStreams::StreamCtrl_var stream_control =
stream_control_impl._this (ACE_TRY_ENV);
ACE_TRY_CHECK;
@@ -142,7 +157,7 @@ int main (int argc, char *argv[])
stream_control->start (flow_spec, ACE_TRY_ENV);
ACE_TRY_CHECK;
- ACE_Time_Value tv (10, 0);
+ ACE_Time_Value tv (0, milliseconds * 1000);
if (orb->run (tv) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1);
ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
@@ -151,8 +166,8 @@ int main (int argc, char *argv[])
stream_control->stop (flow_spec, ACE_TRY_ENV);
ACE_TRY_CHECK;
- root_poa->destroy (1, 1, ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ // root_poa->destroy (1, 1, ACE_TRY_ENV);
+ // ACE_TRY_CHECK;
}
ACE_CATCHANY
{
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp
index 20f27590825..80799fd09d2 100644
--- a/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/ping.cpp
@@ -5,21 +5,29 @@
#include "tao/corba.h"
#include "tao/TAO.h"
#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
ACE_RCSID(Latency, ping, "$Id$")
const char *ior_output_file = "ping.ior";
const char *protocol = "RTP/UDP";
int milliseconds = 100;
-AVStreams::protocolSpec recv_protocols;
-AVStreams::protocolSpec send_protocols;
+int respond = 1;
+AVStreams::protocolSpec ping_protocols;
+AVStreams::protocolSpec pong_protocols;
Pong_Send_Callback pong_callback;
+ACE_hrtime_t recv_base = 0;
+ACE_Throughput_Stats recv_latency;
+
+CORBA::ORB_ptr the_orb = 0;
+
int
parse_args (int argc, char *argv[])
{
- ACE_Get_Opt get_opts (argc, argv, "o:s:r:t:");
+ ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:");
int c;
while ((c = get_opts ()) != -1)
@@ -31,17 +39,17 @@ parse_args (int argc, char *argv[])
case 'r':
{
- CORBA::ULong l = recv_protocols.length ();
- recv_protocols.length (l + 1);
- recv_protocols[l] = CORBA::string_dup (get_opts.optarg);
+ CORBA::ULong l = ping_protocols.length ();
+ ping_protocols.length (l + 1);
+ ping_protocols[l] = CORBA::string_dup (get_opts.optarg);
}
break;
case 's':
{
- CORBA::ULong l = send_protocols.length ();
- send_protocols.length (l + 1);
- send_protocols[l] = CORBA::string_dup (get_opts.optarg);
+ CORBA::ULong l = pong_protocols.length ();
+ pong_protocols.length (l + 1);
+ pong_protocols[l] = CORBA::string_dup (get_opts.optarg);
}
break;
@@ -49,6 +57,10 @@ parse_args (int argc, char *argv[])
milliseconds = ACE_OS::atoi (get_opts.optarg);
break;
+ case 'x':
+ respond = 0;
+ break;
+
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
@@ -64,16 +76,16 @@ parse_args (int argc, char *argv[])
// If no protocols are specified use the default...
- if (recv_protocols.length () == 0)
+ if (ping_protocols.length () == 0)
{
- recv_protocols.length (1);
- recv_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345");
+ ping_protocols.length (1);
+ ping_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345");
}
- if (send_protocols.length () == 0)
+ if (pong_protocols.length () == 0)
{
- send_protocols.length (1);
- send_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456");
+ pong_protocols.length (1);
+ pong_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456");
}
// Indicates sucessful parsing of the command line
@@ -94,6 +106,9 @@ int main (int argc, char *argv[])
av_core->orb_manager ();
CORBA::ORB_var orb = orb_manager->orb ();
+ the_orb = orb.in ();
+ // No copying, because the global variable is not used after the
+ // event loop finishes...
CORBA::Object_var poa_object =
orb->resolve_initial_references("RootPOA", ACE_TRY_ENV);
@@ -112,10 +127,17 @@ int main (int argc, char *argv[])
// Register the video mmdevice object with the ORB
- Reactive_Strategy reactive_strategy (orb_manager);
- TAO_MMDevice mmdevice_impl (&reactive_strategy);
+ Reactive_Strategy *reactive_strategy;
+ ACE_NEW_RETURN (reactive_strategy,
+ Reactive_Strategy (orb_manager),
+ 1);
+ TAO_MMDevice *mmdevice_impl;
+ ACE_NEW_RETURN (mmdevice_impl,
+ TAO_MMDevice (reactive_strategy),
+ 1);
+
AVStreams::MMDevice_var mmdevice =
- mmdevice_impl._this (ACE_TRY_ENV);
+ mmdevice_impl->_this (ACE_TRY_ENV);
ACE_TRY_CHECK;
CORBA::String_var ior =
@@ -137,27 +159,44 @@ int main (int argc, char *argv[])
ACE_OS::fclose (output_file);
}
- Ping_Recv_FDev ping_fdev_impl ("Ping");
- Pong_Send_FDev pong_fdev_impl ("Pong");
+ Ping_Recv_FDev* ping_fdev_impl;
+ ACE_NEW_RETURN (ping_fdev_impl,
+ Ping_Recv_FDev ("Ping"),
+ 1);
+ Pong_Send_FDev* pong_fdev_impl;
+ ACE_NEW_RETURN (pong_fdev_impl,
+ Pong_Send_FDev ("Pong"),
+ 1);
AVStreams::FDev_var ping_fdev =
- ping_fdev_impl._this (ACE_TRY_ENV);
+ ping_fdev_impl->_this (ACE_TRY_ENV);
ACE_TRY_CHECK;
AVStreams::FDev_var pong_fdev =
- pong_fdev_impl._this (ACE_TRY_ENV);
+ pong_fdev_impl->_this (ACE_TRY_ENV);
ACE_TRY_CHECK;
mmdevice->add_fdev (ping_fdev.in (), ACE_TRY_ENV);
ACE_TRY_CHECK;
- mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV);
- ACE_TRY_CHECK;
- if (orb->run () == -1)
+ if (respond == 1)
+ {
+ mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ ACE_Time_Value tv (120, 0);
+ if (orb->run (tv) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1);
ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
- root_poa->destroy (1, 1, ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . "));
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_DEBUG ((LM_DEBUG, "done\n"));
+
+ recv_latency.dump_results ("Receive", gsf);
+
+ // root_poa->destroy (1, 1, ACE_TRY_ENV);
+ // ACE_TRY_CHECK;
}
ACE_CATCHANY
{
@@ -174,7 +213,7 @@ int main (int argc, char *argv[])
Ping_Recv::Ping_Recv (void)
: TAO_FlowConsumer ("Ping",
- recv_protocols,
+ ping_protocols,
"UNS:ping")
{
}
@@ -183,7 +222,7 @@ int
Ping_Recv::get_callback (const char *,
TAO_AV_Callback *&callback)
{
- ACE_DEBUG ((LM_DEBUG,"Ping_Recv::get_callback\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Ping_Recv::get_callback\n"));
callback = &this->callback_;
return 0;
}
@@ -192,6 +231,8 @@ int
Ping_Recv_Callback::handle_stop (void)
{
ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::stop"));
+ the_orb->shutdown ();
+
return 0;
}
@@ -200,7 +241,7 @@ Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame,
TAO_AV_frame_info *,
const ACE_Addr &)
{
- ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::receive_frame\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::receive_frame\n"));
for (const ACE_Message_Block *i = frame;
frame != 0;
@@ -213,7 +254,19 @@ Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame,
ACE_OS::memcpy (&stamp, frame->rd_ptr (), sizeof(stamp));
- pong_callback.send_response (stamp);
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+ if (recv_base == 0)
+ {
+ recv_base = now;
+ }
+ else
+ {
+ recv_latency.sample (now - recv_base,
+ now - stamp);
+ }
+
+ if (respond == 1)
+ pong_callback.send_response (stamp);
}
return 0;
}
@@ -221,7 +274,7 @@ Ping_Recv_Callback::receive_frame (ACE_Message_Block *frame,
int
Ping_Recv_Callback::handle_destroy (void)
{
- ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::destroy\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Ping_Recv_Callback::destroy\n"));
return 0;
}
@@ -229,7 +282,7 @@ Ping_Recv_Callback::handle_destroy (void)
Pong_Send::Pong_Send (void)
: TAO_FlowProducer ("Pong",
- send_protocols,
+ pong_protocols,
"UNS:pong")
{
}
@@ -238,7 +291,7 @@ int
Pong_Send::get_callback (const char *,
TAO_AV_Callback *&callback)
{
- ACE_DEBUG ((LM_DEBUG,"Pong_Send::get_callback\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Send::get_callback\n"));
callback = &pong_callback;
return 0;
}
@@ -254,7 +307,7 @@ Pong_Send_Callback::get_timeout (ACE_Time_Value *&tv,
int
Pong_Send_Callback::handle_timeout (void *arg)
{
- ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n"));
+ // ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n"));
return 0;
}
@@ -267,15 +320,16 @@ Pong_Send_Callback::handle_end_stream (void)
int
Pong_Send_Callback::send_response (ACE_hrtime_t stamp)
{
- ACE_DEBUG ((LM_DEBUG, "pong send response)\n"));
+ // ACE_DEBUG ((LM_DEBUG, "pong send response)\n"));
ACE_hrtime_t buf[2];
- buf[0] = stamp;
ACE_Message_Block mb (ACE_reinterpret_cast (char*,buf),
sizeof(buf));
+ buf[0] = stamp;
buf[1] = ACE_OS::gethrtime ();
+ mb.wr_ptr (sizeof(buf));
int result = this->protocol_object_->send_frame (&mb);
if (result < 0)
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
index d7ffbb78bda..975963a96b7 100644
--- a/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.cpp
@@ -6,21 +6,30 @@
#include "tao/TAO.h"
#include "ace/Get_Opt.h"
#include "ace/High_Res_Timer.h"
+#include "ace/Stats.h"
ACE_RCSID(Latency, ping, "$Id$")
const char *ior_output_file = "pong.ior";
const char *protocol = "RTP/UDP";
int milliseconds = 100;
-AVStreams::protocolSpec recv_protocols;
-AVStreams::protocolSpec send_protocols;
+int message_size = 64;
+int respond = 1;
+AVStreams::protocolSpec pong_protocols;
+AVStreams::protocolSpec ping_protocols;
-ACE_Throughput_Stats latency;
+ACE_hrtime_t recv_throughput_base = 0;
+ACE_Throughput_Stats recv_latency;
+
+ACE_hrtime_t send_throughput_base = 0;
+ACE_Throughput_Stats send_latency;
+
+CORBA::ORB_ptr the_orb = 0;
int
parse_args (int argc, char *argv[])
{
- ACE_Get_Opt get_opts (argc, argv, "o:s:r:t:");
+ ACE_Get_Opt get_opts (argc, argv, "xo:s:r:t:b:");
int c;
while ((c = get_opts ()) != -1)
@@ -32,17 +41,17 @@ parse_args (int argc, char *argv[])
case 'r':
{
- CORBA::ULong l = recv_protocols.length ();
- recv_protocols.length (l + 1);
- recv_protocols[l] = CORBA::string_dup (get_opts.optarg);
+ CORBA::ULong l = ping_protocols.length ();
+ ping_protocols.length (l + 1);
+ ping_protocols[l] = CORBA::string_dup (get_opts.optarg);
}
break;
case 's':
{
- CORBA::ULong l = send_protocols.length ();
- send_protocols.length (l + 1);
- send_protocols[l] = CORBA::string_dup (get_opts.optarg);
+ CORBA::ULong l = pong_protocols.length ();
+ pong_protocols.length (l + 1);
+ pong_protocols[l] = CORBA::string_dup (get_opts.optarg);
}
break;
@@ -50,6 +59,19 @@ parse_args (int argc, char *argv[])
milliseconds = ACE_OS::atoi (get_opts.optarg);
break;
+ case 'b':
+ message_size = ACE_OS::atoi (get_opts.optarg);
+ if (message_size < sizeof(ACE_hrtime_t))
+ {
+ ACE_DEBUG ((LM_DEBUG, "Invalid message size\n"));
+ message_size = 64;
+ }
+ break;
+
+ case 'x':
+ respond = 0;
+ break;
+
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
@@ -65,16 +87,16 @@ parse_args (int argc, char *argv[])
// If no protocols are specified use the default...
- if (recv_protocols.length () == 0)
+ if (pong_protocols.length () == 0)
{
- recv_protocols.length (1);
- recv_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456");
+ pong_protocols.length (1);
+ pong_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:23456");
}
- if (send_protocols.length () == 0)
+ if (ping_protocols.length () == 0)
{
- send_protocols.length (1);
- send_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345");
+ ping_protocols.length (1);
+ ping_protocols[0] = CORBA::string_dup ("UDP=224.9.9.2:12345");
}
// Indicates sucessful parsing of the command line
@@ -95,6 +117,9 @@ int main (int argc, char *argv[])
av_core->orb_manager ();
CORBA::ORB_var orb = orb_manager->orb ();
+ the_orb = orb.in ();
+ // No copying, because the global variable is not used after the
+ // event loop finishes...
CORBA::Object_var poa_object =
orb->resolve_initial_references("RootPOA", ACE_TRY_ENV);
@@ -113,10 +138,17 @@ int main (int argc, char *argv[])
// Register the video mmdevice object with the ORB
- Reactive_Strategy reactive_strategy (orb_manager);
- TAO_MMDevice mmdevice_impl (&reactive_strategy);
+ Reactive_Strategy *reactive_strategy;
+ ACE_NEW_RETURN (reactive_strategy,
+ Reactive_Strategy (orb_manager),
+ 1);
+ TAO_MMDevice *mmdevice_impl;
+ ACE_NEW_RETURN (mmdevice_impl,
+ TAO_MMDevice (reactive_strategy),
+ 1);
+
AVStreams::MMDevice_var mmdevice =
- mmdevice_impl._this (ACE_TRY_ENV);
+ mmdevice_impl->_this (ACE_TRY_ENV);
ACE_TRY_CHECK;
CORBA::String_var ior =
@@ -138,22 +170,32 @@ int main (int argc, char *argv[])
ACE_OS::fclose (output_file);
}
- Pong_Recv_FDev pong_fdev_impl ("Pong");
- Ping_Send_FDev ping_fdev_impl ("Ping");
+ Pong_Recv_FDev* pong_fdev_impl;
+ ACE_NEW_RETURN (pong_fdev_impl,
+ Pong_Recv_FDev ("Pong"),
+ 1);
+ Ping_Send_FDev* ping_fdev_impl;
+ ACE_NEW_RETURN (ping_fdev_impl,
+ Ping_Send_FDev ("Ping"),
+ 1);
AVStreams::FDev_var ping_fdev =
- ping_fdev_impl._this (ACE_TRY_ENV);
+ ping_fdev_impl->_this (ACE_TRY_ENV);
ACE_TRY_CHECK;
AVStreams::FDev_var pong_fdev =
- pong_fdev_impl._this (ACE_TRY_ENV);
+ pong_fdev_impl->_this (ACE_TRY_ENV);
ACE_TRY_CHECK;
mmdevice->add_fdev (ping_fdev.in (), ACE_TRY_ENV);
ACE_TRY_CHECK;
- mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ if (respond == 1)
+ {
+ mmdevice->add_fdev (pong_fdev.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
- if (orb->run () == -1)
+ ACE_Time_Value tv (120, 0);
+ if (orb->run (tv) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1);
ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
@@ -161,10 +203,12 @@ int main (int argc, char *argv[])
ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
ACE_DEBUG ((LM_DEBUG, "done\n"));
- latency.dump_results ("Ping-Pong", gsf);
+ recv_latency.dump_results ("Receive", gsf);
- root_poa->destroy (1, 1, ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ send_latency.dump_results ("Send", gsf);
+
+ // root_poa->destroy (1, 1, ACE_TRY_ENV);
+ // ACE_TRY_CHECK;
}
ACE_CATCHANY
{
@@ -181,7 +225,7 @@ int main (int argc, char *argv[])
Pong_Recv::Pong_Recv (void)
: TAO_FlowConsumer ("Pong",
- recv_protocols,
+ pong_protocols,
"UNS:pong")
{
}
@@ -190,7 +234,7 @@ int
Pong_Recv::get_callback (const char *,
TAO_AV_Callback *&callback)
{
- ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n"));
callback = &this->callback_;
return 0;
}
@@ -198,7 +242,8 @@ Pong_Recv::get_callback (const char *,
int
Pong_Recv_Callback::handle_stop (void)
{
- ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop"));
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop"));
+ the_orb->shutdown ();
return 0;
}
@@ -207,7 +252,7 @@ Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame,
TAO_AV_frame_info *,
const ACE_Addr &)
{
- ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n"));
ACE_hrtime_t now = ACE_OS::gethrtime ();
for (const ACE_Message_Block *i = frame;
@@ -224,8 +269,12 @@ Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame,
ACE_OS::memcpy (buf, frame->rd_ptr (), sizeof(buf));
- latency.sample (now,
- now - buf[0]);
+ if (recv_throughput_base == 0)
+ {
+ recv_throughput_base = now;
+ }
+ recv_latency.sample (now - recv_throughput_base,
+ now - buf[0]);
}
return 0;
}
@@ -241,7 +290,7 @@ Pong_Recv_Callback::handle_destroy (void)
Ping_Send::Ping_Send (void)
: TAO_FlowProducer ("Ping",
- send_protocols,
+ ping_protocols,
"UNS:ping")
{
}
@@ -250,37 +299,49 @@ int
Ping_Send::get_callback (const char *,
TAO_AV_Callback *&callback)
{
- ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n"));
callback = &this->callback_;
return 0;
}
+Ping_Send_Callback::Ping_Send_Callback (void)
+{
+ this->timeout_ = ACE_Time_Value (0, milliseconds * 1000);
+
+ this->frame_.size (message_size);
+ this->frame_.wr_ptr (message_size);
+}
+
void
Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv,
void *&)
{
- // @@ Naga: why the memory allocation!
- ACE_NEW (tv, ACE_Time_Value (0, milliseconds * 1000));
+ tv = &this->timeout_;
}
int
Ping_Send_Callback::handle_timeout (void *arg)
{
- ACE_DEBUG ((LM_DEBUG, "ping timeout\n"));
+ // ACE_DEBUG ((LM_DEBUG, "ping timeout\n"));
- ACE_hrtime_t buf[1];
+ ACE_hrtime_t stamp = ACE_OS::gethrtime ();
+ ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp));
- buf[0] = ACE_OS::gethrtime ();
- ACE_Message_Block mb (ACE_reinterpret_cast (char*,buf),
- sizeof(buf));
-
- int result = this->protocol_object_->send_frame (&mb);
+ int result = this->protocol_object_->send_frame (&this->frame_);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
"FTP_Client_Flow_Handler::send - %p\n",
""),
-1);
+ if (send_throughput_base == 0)
+ {
+ send_throughput_base = stamp;
+ }
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+ send_latency.sample (now - send_throughput_base,
+ now - stamp);
+
return 0;
}
diff --git a/TAO/orbsvcs/tests/AVStreams/Latency/pong.h b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h
index 5390bec5391..af52b9a70f7 100644
--- a/TAO/orbsvcs/tests/AVStreams/Latency/pong.h
+++ b/TAO/orbsvcs/tests/AVStreams/Latency/pong.h
@@ -19,7 +19,6 @@
#include "orbsvcs/AV/AVStreams_i.h"
#include "orbsvcs/AV/Policy.h"
#include "orbsvcs/AV/Flows_T.h"
-#include "ace/Stats.h"
class Pong_Recv_Callback : public TAO_AV_Callback
{
@@ -47,11 +46,19 @@ private:
class Ping_Send_Callback : public TAO_AV_Callback
{
public:
+ Ping_Send_Callback (void);
virtual int handle_timeout (void *arg);
virtual int handle_end_stream (void);
virtual void get_timeout (ACE_Time_Value *&tv,
void *&arg);
+
+private:
+ ACE_Time_Value timeout_;
+ // the timeout value
+
+ ACE_Message_Block frame_;
+ // Pre-allocate the message block to send...
};
class Ping_Send : public TAO_FlowProducer