summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp248
1 files changed, 98 insertions, 150 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
index 2bfd8c9d7b8..08494218d07 100644
--- a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
@@ -2,7 +2,6 @@
#include "client.h"
-
ACE_RCSID(benchmark, client, "$Id$")
Client_StreamEndPoint::Client_StreamEndPoint (void)
@@ -103,17 +102,17 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
if (this->acceptor_.open (tcp_addr,
TAO_ORB_Core_instance ()->reactor ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"%p\n","open"),0);
+ ACE_ERROR_RETURN ((LM_ERROR,"%p\n","open"),-1);
ACE_INET_Addr local_addr;
if (this->acceptor_.acceptor ().get_local_addr (local_addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t)acceptor get local addr failed %p"),0);
+ ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t)acceptor get local addr failed %p"),-1);
char client_address_string [BUFSIZ];
::sprintf (client_address_string,
"%s:%d",
- local_addr.get_host_name (),
- // "mambo-atm.cs.wustl.edu",
+ // local_addr.get_host_name (),
+ "mambo-atm.cs.wustl.edu",
local_addr.get_port_number ());
the_spec.length (1);
the_spec [0] = CORBA::string_dup (client_address_string);
@@ -140,14 +139,15 @@ ttcp_Client_StreamEndPoint::open (void *)
return 0;
}
-Client::Client (int argc, char **argv,int task_id)
- :ttcp_reactive_strategy_ (&orb_manager_,this),
- reactive_strategy_ (&orb_manager_),
- client_mmdevice_ (0),
- argc_ (argc),
- argv_ (argv),
- task_id_ (task_id)
-
+Client::Client (int argc, char **argv, ACE_Barrier *barrier)
+ : reactive_strategy_ (&orb_manager_),
+ // :reactive_strategy_ (&orb_manager_,this),
+ client_mmdevice_ (&reactive_strategy_),
+ argc_ (argc),
+ argv_ (argv),
+ block_size_ (1),
+ number_ (10),
+ barrier_ (barrier)
{
}
@@ -157,48 +157,25 @@ Client::set_stream (ACE_SOCK_Stream & control)
this->stream_ = control;
}
-Globals::Globals (void)
- :ready_cond_ (ready_mtx_),
- hostname_ (""),
- port_ (0)
-{
-}
-
int
-Globals::parse_args (int argc,
- char **argv)
+Client::parse_args (int argc,
+ char **argv)
{
- ACE_Get_Opt opts (argc,argv,"b:n:h:p:ts");
+ ACE_Get_Opt opts (argc,argv,"b:");
int c;
- this->strategy_ = TTCP_REACTIVE;
- this->block_size_ = 1;
- this->number_ = 10;
+
while ((c = opts ()) != -1)
switch (c)
{
case 'b':
this->block_size_ = ACE_OS::atoi (opts.optarg);
break;
- case 'n':
- this->number_ = ACE_OS::atoi (opts.optarg);
- break;
- case 'h':
- this->hostname_ = ACE_OS::strdup (opts.optarg);
- break;
- case 'p':
- this->port_ = ACE_OS::atoi (opts.optarg);
- break;
- case 't':
- this->thread_count_ = ACE_OS::atoi (opts.optarg);
- break;
- case 's':
- // use ttcp strategy.
- this->strategy_ = TTCP_REACTIVE;
- break;
+// case 'n':
+// this->number_ = ACE_OS::atoi (opts.optarg);
+// break;
case '?':
- ACE_DEBUG ((LM_DEBUG,"Usage %s [-b block_size] [-n number_of times]"
- "[-h hostname] [-p port_number] [-t]",
+ ACE_DEBUG ((LM_DEBUG,"Usage %s [-b block_size] [-n number_of times]",
argv[0]));
break;
}
@@ -215,7 +192,8 @@ Client::svc (void)
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Thread created\n"));
- if (GLOBALS::instance ()->parse_args (this->argc_,this->argv_) == -1)
+ if (this->parse_args (this->argc_,
+ this->argv_) == -1)
return -1;
TAO_TRY
{
@@ -223,42 +201,8 @@ Client::svc (void)
this->argv_,
TAO_TRY_ENV);
TAO_CHECK_ENV;
-
- if (this->task_id_ == 0)
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, GLOBALS::instance ()->ready_mtx_, 1));
- if (GLOBALS::instance ()->parse_args (this->argc_,
- this->argv_) == -1)
- return -1;
-
- ACE_NEW_RETURN (GLOBALS::instance ()->barrier_,
- ACE_Barrier (GLOBALS::instance ()->thread_count_),
- -1);
- GLOBALS::instance ()->ready_ = 1;
- GLOBALS::instance ()->ready_cond_.broadcast ();
- }
- else
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, GLOBALS::instance ()->ready_mtx_, 1));
- GLOBALS::instance ()->ready_cond_.wait ();
- }
-
- switch (GLOBALS::instance ()->strategy_)
- {
- case Globals::TTCP_REACTIVE:
- ACE_NEW_RETURN (this->client_mmdevice_,
- TAO_MMDevice (&ttcp_reactive_strategy_),
- -1);
- break;
- case Globals::DGRAM_REACTIVE:
- ACE_NEW_RETURN (this->client_mmdevice_,
- TAO_MMDevice (&reactive_strategy_),
- -1);
- break;
- }
-
// activate the client MMDevice with the ORB
- this->orb_manager_.activate (this->client_mmdevice_,
+ this->orb_manager_.activate (&this->client_mmdevice_,
TAO_TRY_ENV);
TAO_CHECK_ENV;
@@ -268,7 +212,7 @@ Client::svc (void)
-1);
// wait for the other clients to finish binding
- GLOBALS::instance ()->barrier_->wait ();
+ this->barrier_->wait ();
ACE_DEBUG ((LM_DEBUG, "(%P|%t) All threads finished, starting tests.\n"));
@@ -281,7 +225,7 @@ Client::svc (void)
timer.start ();
this->streamctrl_.bind_devs
- (this->client_mmdevice_->_this (TAO_TRY_ENV),
+ (this->client_mmdevice_._this (TAO_TRY_ENV),
this->server_mmdevice_.in (),
the_qos.inout (),
the_flows.in (),
@@ -292,53 +236,48 @@ Client::svc (void)
timer.elapsed_time (tv1);
long time_taken = tv1.sec () + tv1.usec () /1000000;
tv1.dump ();
- ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken for stream setup is %ld \n",
- time_taken ));
+ //ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken is %ld \n",
+ // time_taken ));
-#if !defined (ACE_LACKS_SOCKET_BUFSIZ)
+ return 0;
+
int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
-
- int result;
- result = this->stream_.set_option (SOL_SOCKET,
- SO_SNDBUF,
- (void *) &sndbufsize,
- sizeof (sndbufsize));
- if ((result == -1) && (errno != ENOTSUP))
- ACE_ERROR_RETURN ((LM_ERROR,"set_option failed for sndbufsize:%p\n",""),-1);
- result = this->stream_.set_option (SOL_SOCKET,
+
+ if (this->stream_.set_option (SOL_SOCKET,
+ SO_SNDBUF,
+ (void *) &sndbufsize,
+ sizeof (sndbufsize)) == -1
+ && errno != ENOTSUP)
+ return -1;
+ else if (this->stream_.set_option (SOL_SOCKET,
SO_RCVBUF,
(void *) &rcvbufsize,
- sizeof (rcvbufsize));
- if ((result == -1)&& (errno != ENOTSUP))
- ACE_ERROR_RETURN ((LM_ERROR,"set_option failed for rcvbufsize:%p\n",""),-1);
-#endif /* ACE_LACKS_SOCKET_BUFSIZ */
-#if defined (TCP_NODELAY)
+ sizeof (rcvbufsize)) == -1
+ && errno != ENOTSUP)
+ return -1;
+
int one = 1;
- result = this->stream_.set_option (SOL_SOCKET,
- TCP_NODELAY,
- (char *)& one,
- sizeof (one));
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"set_option failed TCP_NODELAY:%p\n",""),-1);
-#endif
+ if (this->stream_.set_option (SOL_SOCKET,
+ TCP_NODELAY,
+ (char *)& one,
+ sizeof (one)) == -1 )
+ return -1;
char *buffer;
- long buffer_siz = GLOBALS::instance ()->block_size_*1024;
+ long buffer_siz = this->block_size_*1024;
ACE_NEW_RETURN (buffer,
char [buffer_siz],
-1);
- long number = 64 *1024/(GLOBALS::instance ()->block_size_);
timer.start ();
+ long number = 64 *1024/this->block_size_;
for (int i=0;i<number;i++)
- {
- this->stream_.send_n (buffer,buffer_siz);
- }
+ this->stream_.send_n (buffer,buffer_siz);
timer.stop ();
timer.elapsed_time (tv2);
double total_time = tv2.sec ()+tv2.usec ()/1000000.0;
- double total_data = 64*1024*1024*8.0;
+ double total_data = 64*1024*1024;
ACE_DEBUG ((LM_DEBUG,"Total data = %f , Total time = %f \n",
total_data,total_time));
ACE_DEBUG ((LM_DEBUG,"(%P|%t) Thruput for block size is:%d\t%f Mb/s \n",
@@ -420,7 +359,7 @@ Client::establish_stream (void)
TAO_TRY
{
this->streamctrl_.bind_devs
- (this->client_mmdevice_->_this (TAO_TRY_ENV),
+ (this->client_mmdevice_._this (TAO_TRY_ENV),
this->server_mmdevice_.in (),
the_qos.inout (),
the_flows.in (),
@@ -438,51 +377,39 @@ Client::establish_stream (void)
return 0;
}
-
-// -----------------------------------------------------------
-// Video_Endpoint_Reactive_Strategy_A methods
-
-ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager,
- Client *client)
- : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager),
- client_ (client)
-{
-}
-
-int
-ttcp_Endpoint_Reactive_Strategy_A::make_stream_endpoint (ttcp_Client_StreamEndPoint *&endpoint)
-{
- ACE_NEW_RETURN (endpoint,
- ttcp_Client_StreamEndPoint (this->client_),
- -1);
-
- return 0;
-}
-
// ----------------------------------------------------------------------
int
main (int argc, char **argv)
-{
-
- GLOBALS::instance ()->thread_count_ = 1;
- // Preliminary argument processing.
- int i;
- for (i=0;i< argc;i++)
+{
+ ACE_Get_Opt opts (argc, argv, "T:");
+ int thread_count = 1;
+
+int c;
+while ((c = opts ()) != -1)
+ switch (c)
{
- if (ACE_OS::strcmp (argv[i],"-t") == 0
- && (i - 1 < argc))
- GLOBALS::instance ()->thread_count_ =
- ACE_OS::atoi (argv[i+1]);
+ case 'T':
+ thread_count = (u_int) ACE_OS::atoi (opts.optarg);
+ continue;
+ default:
+// ACE_DEBUG ((LM_DEBUG,
+// "Usage: %s -t number_of_threads\n",
+// argv [0]));
+ break;
}
- for (i = 0; i < GLOBALS::instance ()->thread_count_; i++)
+ ACE_Barrier *barrier;
+ ACE_NEW_RETURN (barrier,
+ ACE_Barrier (thread_count + 1),
+ -1);
+ for (int i = 0; i < thread_count; i++)
{
Client *client;
ACE_NEW_RETURN (client,
Client (argc,
argv,
- i),
+ barrier),
-1);
if (client->activate (THR_BOUND) == -1)
@@ -492,7 +419,32 @@ main (int argc, char **argv)
-1);
}
+ // wait for all the threads to finish starting up
+ barrier->wait ();
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) All threads started, waiting for test completion\n"));
+
ACE_Thread_Manager::instance ()->wait ();
+
+}
+
+// -----------------------------------------------------------
+// Video_Endpoint_Reactive_Strategy_A methods
+
+ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager,
+ Client *client)
+ : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager),
+ client_ (client)
+{
+}
+
+int
+ttcp_Endpoint_Reactive_Strategy_A::make_stream_endpoint (ttcp_Client_StreamEndPoint *&endpoint)
+{
+ ACE_NEW_RETURN (endpoint,
+ ttcp_Client_StreamEndPoint (this->client_),
+ -1);
+
return 0;
}
@@ -504,8 +456,6 @@ template class TAO_AV_Endpoint_Reactive_Strategy_A<Client_StreamEndPoint,TAO_VDe
template class ACE_Acceptor <ttcp_Client_StreamEndPoint,ACE_SOCK_ACCEPTOR>;
template class ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_NULL_SYNCH>;
template class ACE_Task<ACE_SYNCH>;
-template class ACE_Condition<ACE_SYNCH_MUTEX> ;
-template class ACE_Singleton<Globals,ACE_SYNCH_MUTEX>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
@@ -514,6 +464,4 @@ template class ACE_Singleton<Globals,ACE_SYNCH_MUTEX>;
#pragma instantiate ACE_Acceptor <ttcp_Client_StreamEndPoint,ACE_SOCK_ACCEPTOR>
#pragma instantiate ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_NULL_SYNCH>
#pragma instantiate ACE_Task<ACE_SYNCH>
-#pragma instantiate ACE_Condition<ACE_SYNCH_MUTEX>
-#pragma instantiate ACE_Singleton <Globals,ACE_SYNCH_MUTEX>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */