diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp | 248 |
1 files changed, 98 insertions, 150 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp index 2bfd8c9d7b8..08494218d07 100644 --- a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp +++ b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp @@ -2,7 +2,6 @@ #include "client.h" - ACE_RCSID(benchmark, client, "$Id$") Client_StreamEndPoint::Client_StreamEndPoint (void) @@ -103,17 +102,17 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) if (this->acceptor_.open (tcp_addr, TAO_ORB_Core_instance ()->reactor ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR,"%p\n","open"),0); + ACE_ERROR_RETURN ((LM_ERROR,"%p\n","open"),-1); ACE_INET_Addr local_addr; if (this->acceptor_.acceptor ().get_local_addr (local_addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t)acceptor get local addr failed %p"),0); + ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t)acceptor get local addr failed %p"),-1); char client_address_string [BUFSIZ]; ::sprintf (client_address_string, "%s:%d", - local_addr.get_host_name (), - // "mambo-atm.cs.wustl.edu", + // local_addr.get_host_name (), + "mambo-atm.cs.wustl.edu", local_addr.get_port_number ()); the_spec.length (1); the_spec [0] = CORBA::string_dup (client_address_string); @@ -140,14 +139,15 @@ ttcp_Client_StreamEndPoint::open (void *) return 0; } -Client::Client (int argc, char **argv,int task_id) - :ttcp_reactive_strategy_ (&orb_manager_,this), - reactive_strategy_ (&orb_manager_), - client_mmdevice_ (0), - argc_ (argc), - argv_ (argv), - task_id_ (task_id) - +Client::Client (int argc, char **argv, ACE_Barrier *barrier) + : reactive_strategy_ (&orb_manager_), + // :reactive_strategy_ (&orb_manager_,this), + client_mmdevice_ (&reactive_strategy_), + argc_ (argc), + argv_ (argv), + block_size_ (1), + number_ (10), + barrier_ (barrier) { } @@ -157,48 +157,25 @@ Client::set_stream (ACE_SOCK_Stream & control) this->stream_ = control; } -Globals::Globals (void) - :ready_cond_ (ready_mtx_), - hostname_ (""), - port_ (0) -{ -} - int -Globals::parse_args (int argc, - char **argv) +Client::parse_args (int argc, + char **argv) { - ACE_Get_Opt opts (argc,argv,"b:n:h:p:ts"); + ACE_Get_Opt opts (argc,argv,"b:"); int c; - this->strategy_ = TTCP_REACTIVE; - this->block_size_ = 1; - this->number_ = 10; + while ((c = opts ()) != -1) switch (c) { case 'b': this->block_size_ = ACE_OS::atoi (opts.optarg); break; - case 'n': - this->number_ = ACE_OS::atoi (opts.optarg); - break; - case 'h': - this->hostname_ = ACE_OS::strdup (opts.optarg); - break; - case 'p': - this->port_ = ACE_OS::atoi (opts.optarg); - break; - case 't': - this->thread_count_ = ACE_OS::atoi (opts.optarg); - break; - case 's': - // use ttcp strategy. - this->strategy_ = TTCP_REACTIVE; - break; +// case 'n': +// this->number_ = ACE_OS::atoi (opts.optarg); +// break; case '?': - ACE_DEBUG ((LM_DEBUG,"Usage %s [-b block_size] [-n number_of times]" - "[-h hostname] [-p port_number] [-t]", + ACE_DEBUG ((LM_DEBUG,"Usage %s [-b block_size] [-n number_of times]", argv[0])); break; } @@ -215,7 +192,8 @@ Client::svc (void) ACE_DEBUG ((LM_DEBUG, "(%P|%t) Thread created\n")); - if (GLOBALS::instance ()->parse_args (this->argc_,this->argv_) == -1) + if (this->parse_args (this->argc_, + this->argv_) == -1) return -1; TAO_TRY { @@ -223,42 +201,8 @@ Client::svc (void) this->argv_, TAO_TRY_ENV); TAO_CHECK_ENV; - - if (this->task_id_ == 0) - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, GLOBALS::instance ()->ready_mtx_, 1)); - if (GLOBALS::instance ()->parse_args (this->argc_, - this->argv_) == -1) - return -1; - - ACE_NEW_RETURN (GLOBALS::instance ()->barrier_, - ACE_Barrier (GLOBALS::instance ()->thread_count_), - -1); - GLOBALS::instance ()->ready_ = 1; - GLOBALS::instance ()->ready_cond_.broadcast (); - } - else - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, GLOBALS::instance ()->ready_mtx_, 1)); - GLOBALS::instance ()->ready_cond_.wait (); - } - - switch (GLOBALS::instance ()->strategy_) - { - case Globals::TTCP_REACTIVE: - ACE_NEW_RETURN (this->client_mmdevice_, - TAO_MMDevice (&ttcp_reactive_strategy_), - -1); - break; - case Globals::DGRAM_REACTIVE: - ACE_NEW_RETURN (this->client_mmdevice_, - TAO_MMDevice (&reactive_strategy_), - -1); - break; - } - // activate the client MMDevice with the ORB - this->orb_manager_.activate (this->client_mmdevice_, + this->orb_manager_.activate (&this->client_mmdevice_, TAO_TRY_ENV); TAO_CHECK_ENV; @@ -268,7 +212,7 @@ Client::svc (void) -1); // wait for the other clients to finish binding - GLOBALS::instance ()->barrier_->wait (); + this->barrier_->wait (); ACE_DEBUG ((LM_DEBUG, "(%P|%t) All threads finished, starting tests.\n")); @@ -281,7 +225,7 @@ Client::svc (void) timer.start (); this->streamctrl_.bind_devs - (this->client_mmdevice_->_this (TAO_TRY_ENV), + (this->client_mmdevice_._this (TAO_TRY_ENV), this->server_mmdevice_.in (), the_qos.inout (), the_flows.in (), @@ -292,53 +236,48 @@ Client::svc (void) timer.elapsed_time (tv1); long time_taken = tv1.sec () + tv1.usec () /1000000; tv1.dump (); - ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken for stream setup is %ld \n", - time_taken )); + //ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken is %ld \n", + // time_taken )); -#if !defined (ACE_LACKS_SOCKET_BUFSIZ) + return 0; + int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; - - int result; - result = this->stream_.set_option (SOL_SOCKET, - SO_SNDBUF, - (void *) &sndbufsize, - sizeof (sndbufsize)); - if ((result == -1) && (errno != ENOTSUP)) - ACE_ERROR_RETURN ((LM_ERROR,"set_option failed for sndbufsize:%p\n",""),-1); - result = this->stream_.set_option (SOL_SOCKET, + + if (this->stream_.set_option (SOL_SOCKET, + SO_SNDBUF, + (void *) &sndbufsize, + sizeof (sndbufsize)) == -1 + && errno != ENOTSUP) + return -1; + else if (this->stream_.set_option (SOL_SOCKET, SO_RCVBUF, (void *) &rcvbufsize, - sizeof (rcvbufsize)); - if ((result == -1)&& (errno != ENOTSUP)) - ACE_ERROR_RETURN ((LM_ERROR,"set_option failed for rcvbufsize:%p\n",""),-1); -#endif /* ACE_LACKS_SOCKET_BUFSIZ */ -#if defined (TCP_NODELAY) + sizeof (rcvbufsize)) == -1 + && errno != ENOTSUP) + return -1; + int one = 1; - result = this->stream_.set_option (SOL_SOCKET, - TCP_NODELAY, - (char *)& one, - sizeof (one)); - if (result == -1) - ACE_ERROR_RETURN ((LM_ERROR,"set_option failed TCP_NODELAY:%p\n",""),-1); -#endif + if (this->stream_.set_option (SOL_SOCKET, + TCP_NODELAY, + (char *)& one, + sizeof (one)) == -1 ) + return -1; char *buffer; - long buffer_siz = GLOBALS::instance ()->block_size_*1024; + long buffer_siz = this->block_size_*1024; ACE_NEW_RETURN (buffer, char [buffer_siz], -1); - long number = 64 *1024/(GLOBALS::instance ()->block_size_); timer.start (); + long number = 64 *1024/this->block_size_; for (int i=0;i<number;i++) - { - this->stream_.send_n (buffer,buffer_siz); - } + this->stream_.send_n (buffer,buffer_siz); timer.stop (); timer.elapsed_time (tv2); double total_time = tv2.sec ()+tv2.usec ()/1000000.0; - double total_data = 64*1024*1024*8.0; + double total_data = 64*1024*1024; ACE_DEBUG ((LM_DEBUG,"Total data = %f , Total time = %f \n", total_data,total_time)); ACE_DEBUG ((LM_DEBUG,"(%P|%t) Thruput for block size is:%d\t%f Mb/s \n", @@ -420,7 +359,7 @@ Client::establish_stream (void) TAO_TRY { this->streamctrl_.bind_devs - (this->client_mmdevice_->_this (TAO_TRY_ENV), + (this->client_mmdevice_._this (TAO_TRY_ENV), this->server_mmdevice_.in (), the_qos.inout (), the_flows.in (), @@ -438,51 +377,39 @@ Client::establish_stream (void) return 0; } - -// ----------------------------------------------------------- -// Video_Endpoint_Reactive_Strategy_A methods - -ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager, - Client *client) - : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager), - client_ (client) -{ -} - -int -ttcp_Endpoint_Reactive_Strategy_A::make_stream_endpoint (ttcp_Client_StreamEndPoint *&endpoint) -{ - ACE_NEW_RETURN (endpoint, - ttcp_Client_StreamEndPoint (this->client_), - -1); - - return 0; -} - // ---------------------------------------------------------------------- int main (int argc, char **argv) -{ - - GLOBALS::instance ()->thread_count_ = 1; - // Preliminary argument processing. - int i; - for (i=0;i< argc;i++) +{ + ACE_Get_Opt opts (argc, argv, "T:"); + int thread_count = 1; + +int c; +while ((c = opts ()) != -1) + switch (c) { - if (ACE_OS::strcmp (argv[i],"-t") == 0 - && (i - 1 < argc)) - GLOBALS::instance ()->thread_count_ = - ACE_OS::atoi (argv[i+1]); + case 'T': + thread_count = (u_int) ACE_OS::atoi (opts.optarg); + continue; + default: +// ACE_DEBUG ((LM_DEBUG, +// "Usage: %s -t number_of_threads\n", +// argv [0])); + break; } - for (i = 0; i < GLOBALS::instance ()->thread_count_; i++) + ACE_Barrier *barrier; + ACE_NEW_RETURN (barrier, + ACE_Barrier (thread_count + 1), + -1); + for (int i = 0; i < thread_count; i++) { Client *client; ACE_NEW_RETURN (client, Client (argc, argv, - i), + barrier), -1); if (client->activate (THR_BOUND) == -1) @@ -492,7 +419,32 @@ main (int argc, char **argv) -1); } + // wait for all the threads to finish starting up + barrier->wait (); + ACE_DEBUG ((LM_DEBUG, + "(%t) All threads started, waiting for test completion\n")); + ACE_Thread_Manager::instance ()->wait (); + +} + +// ----------------------------------------------------------- +// Video_Endpoint_Reactive_Strategy_A methods + +ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager, + Client *client) + : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager), + client_ (client) +{ +} + +int +ttcp_Endpoint_Reactive_Strategy_A::make_stream_endpoint (ttcp_Client_StreamEndPoint *&endpoint) +{ + ACE_NEW_RETURN (endpoint, + ttcp_Client_StreamEndPoint (this->client_), + -1); + return 0; } @@ -504,8 +456,6 @@ template class TAO_AV_Endpoint_Reactive_Strategy_A<Client_StreamEndPoint,TAO_VDe template class ACE_Acceptor <ttcp_Client_StreamEndPoint,ACE_SOCK_ACCEPTOR>; template class ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_NULL_SYNCH>; template class ACE_Task<ACE_SYNCH>; -template class ACE_Condition<ACE_SYNCH_MUTEX> ; -template class ACE_Singleton<Globals,ACE_SYNCH_MUTEX>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> #pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> @@ -514,6 +464,4 @@ template class ACE_Singleton<Globals,ACE_SYNCH_MUTEX>; #pragma instantiate ACE_Acceptor <ttcp_Client_StreamEndPoint,ACE_SOCK_ACCEPTOR> #pragma instantiate ACE_Svc_Handler <ACE_SOCK_STREAM, ACE_NULL_SYNCH> #pragma instantiate ACE_Task<ACE_SYNCH> -#pragma instantiate ACE_Condition<ACE_SYNCH_MUTEX> -#pragma instantiate ACE_Singleton <Globals,ACE_SYNCH_MUTEX> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |