diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp | 167 |
1 files changed, 147 insertions, 20 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp index bacf2533919..4cc70cf5f30 100644 --- a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp +++ b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp @@ -26,6 +26,7 @@ Client_StreamEndPoint::handle_close (void) CORBA::Boolean Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) { + // return CORBA::B_TRUE; the_spec.length (0); ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_preconnect called\n")); return 0; @@ -37,6 +38,7 @@ Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) CORBA::Boolean Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& server_spec) { + // return CORBA::B_TRUE; ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_postconnect called \n")); return 0; } @@ -81,8 +83,9 @@ ttcp_Acceptor::make_svc_handler (ttcp_Client_StreamEndPoint *&sh) //------------------------------------------------------------ -ttcp_Client_StreamEndPoint::ttcp_Client_StreamEndPoint (void) - :acceptor_ (this) +ttcp_Client_StreamEndPoint::ttcp_Client_StreamEndPoint (Client *client) + :acceptor_ (this), + client_ (client) { } @@ -93,6 +96,7 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) ACE_INET_Addr tcp_addr; + // tcp_addr.set (TCP_PORT,"mambo-atm.cs.wustl.edu"); tcp_addr.set (TCP_PORT); if (this->acceptor_.open (tcp_addr, @@ -106,7 +110,8 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) char client_address_string [BUFSIZ]; ::sprintf (client_address_string, "%s:%d", - local_addr.get_host_name (), + // local_addr.get_host_name (), + "mambo-atm.cs.wustl.edu", local_addr.get_port_number ()); the_spec.length (1); the_spec [0] = CORBA::string_dup (client_address_string); @@ -118,26 +123,77 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) return CORBA::B_TRUE; } +CORBA::Boolean +ttcp_Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& server_spec) +{ + ACE_DEBUG ((LM_DEBUG,"ttcp_Client_StreamEndPoint::handle_postconnect \n")); + this->client_->set_stream (this->peer ()); + return CORBA::B_TRUE; +} + int ttcp_Client_StreamEndPoint::open (void *) { + ACE_DEBUG ((LM_DEBUG,"(%P|%t) ttcp_Client_StreamEndPoint::open () called\n")); return 0; } Client::Client (int argc, char **argv, ACE_Barrier *barrier) - : reactive_strategy_ (&orb_manager_), + : reactive_strategy_ (&orb_manager_), + // :reactive_strategy_ (&orb_manager_,this), client_mmdevice_ (&reactive_strategy_), argc_ (argc), argv_ (argv), + block_size_ (1), + number_ (10), barrier_ (barrier) { } +void +Client::set_stream (ACE_SOCK_Stream & control) +{ + this->stream_ = control; +} + +int +Client::parse_args (int argc, + char **argv) +{ + ACE_Get_Opt opts (argc,argv,"b:"); + + int c; + + while ((c = opts ()) != -1) + switch (c) + { + case 'b': + this->block_size_ = ACE_OS::atoi (opts.optarg); + break; +// case 'n': +// this->number_ = ACE_OS::atoi (opts.optarg); +// break; + case '?': + ACE_DEBUG ((LM_DEBUG,"Usage %s [-b block_size] [-n number_of times]", + argv[0])); + break; + } + return 0; +} + int Client::svc (void) { + // Now start pumping data. + ACE_High_Res_Timer timer; + ACE_Time_Value tv1,tv2; + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Thread created\n")); + + if (this->parse_args (this->argc_, + this->argv_) == -1) + return -1; TAO_TRY { this->orb_manager_.init (this->argc_, @@ -165,15 +221,66 @@ Client::svc (void) AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); AVStreams::flowSpec_var the_flows (new AVStreams::flowSpec); // Bind the client and server mmdevices. - + + timer.start (); this->streamctrl_.bind_devs (this->client_mmdevice_._this (TAO_TRY_ENV), this->server_mmdevice_.in (), the_qos.inout (), the_flows.in (), TAO_TRY_ENV); - TAO_CHECK_ENV; + + timer.stop (); + timer.elapsed_time (tv1); + long time_taken = tv1.sec () + tv1.usec () /1000000; + tv1.dump (); + //ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken is %ld \n", + // time_taken )); + + return 0; + + int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; + int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; + + if (this->stream_.set_option (SOL_SOCKET, + SO_SNDBUF, + (void *) &sndbufsize, + sizeof (sndbufsize)) == -1 + && errno != ENOTSUP) + return -1; + else if (this->stream_.set_option (SOL_SOCKET, + SO_RCVBUF, + (void *) &rcvbufsize, + sizeof (rcvbufsize)) == -1 + && errno != ENOTSUP) + return -1; + + int one = 1; + if (this->stream_.set_option (SOL_SOCKET, + TCP_NODELAY, + (char *)& one, + sizeof (one)) == -1 ) + return -1; + + char *buffer; + long buffer_siz = this->block_size_*1024; + + ACE_NEW_RETURN (buffer, + char [buffer_siz], + -1); + timer.start (); + long number = 64 *1024/this->block_size_; + for (int i=0;i<number;i++) + this->stream_.send_n (buffer,buffer_siz); + timer.stop (); + timer.elapsed_time (tv2); + double total_time = tv2.sec ()+tv2.usec ()/1000000.0; + double total_data = 64*1024*1024; + ACE_DEBUG ((LM_DEBUG,"Total data = %f , Total time = %f \n", + total_data,total_time)); + ACE_DEBUG ((LM_DEBUG,"(%P|%t) Thruput for block size is:%d\t%f Mb/s \n", + buffer_siz,total_data/(total_time*1024.0*1024.0))); } TAO_CATCHANY { @@ -266,21 +373,21 @@ int main (int argc, char **argv) { ACE_Get_Opt opts (argc, argv, "T:"); - int thread_count = 0; + int thread_count = 1; - int c; - while ((c = opts ()) != -1) - switch (c) - { - case 'T': - thread_count = (u_int) ACE_OS::atoi (opts.optarg); - continue; - default: -// ACE_DEBUG ((LM_DEBUG, -// "Usage: %s -t number_of_threads\n", -// argv [0])); - break; - } +int c; +while ((c = opts ()) != -1) + switch (c) + { + case 'T': + thread_count = (u_int) ACE_OS::atoi (opts.optarg); + continue; + default: +// ACE_DEBUG ((LM_DEBUG, +// "Usage: %s -t number_of_threads\n", +// argv [0])); + break; + } ACE_Barrier *barrier; ACE_NEW_RETURN (barrier, @@ -310,3 +417,23 @@ main (int argc, char **argv) ACE_Thread_Manager::instance ()->wait (); } + +// ----------------------------------------------------------- +// Video_Endpoint_Reactive_Strategy_A methods + +ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager, + Client *client) + : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager), + client_ (client) +{ +} + +int +ttcp_Endpoint_Reactive_Strategy_A::make_stream_endpoint (ttcp_Client_StreamEndPoint *&endpoint) +{ + ACE_NEW_RETURN (endpoint, + ttcp_Client_StreamEndPoint (this->client_), + -1); + + return 0; +} |