summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp167
1 files changed, 147 insertions, 20 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
index bacf2533919..4cc70cf5f30 100644
--- a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
@@ -26,6 +26,7 @@ Client_StreamEndPoint::handle_close (void)
CORBA::Boolean
Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
{
+ // return CORBA::B_TRUE;
the_spec.length (0);
ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_preconnect called\n"));
return 0;
@@ -37,6 +38,7 @@ Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
CORBA::Boolean
Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& server_spec)
{
+ // return CORBA::B_TRUE;
ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_postconnect called \n"));
return 0;
}
@@ -81,8 +83,9 @@ ttcp_Acceptor::make_svc_handler (ttcp_Client_StreamEndPoint *&sh)
//------------------------------------------------------------
-ttcp_Client_StreamEndPoint::ttcp_Client_StreamEndPoint (void)
- :acceptor_ (this)
+ttcp_Client_StreamEndPoint::ttcp_Client_StreamEndPoint (Client *client)
+ :acceptor_ (this),
+ client_ (client)
{
}
@@ -93,6 +96,7 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
ACE_INET_Addr tcp_addr;
+ // tcp_addr.set (TCP_PORT,"mambo-atm.cs.wustl.edu");
tcp_addr.set (TCP_PORT);
if (this->acceptor_.open (tcp_addr,
@@ -106,7 +110,8 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
char client_address_string [BUFSIZ];
::sprintf (client_address_string,
"%s:%d",
- local_addr.get_host_name (),
+ // local_addr.get_host_name (),
+ "mambo-atm.cs.wustl.edu",
local_addr.get_port_number ());
the_spec.length (1);
the_spec [0] = CORBA::string_dup (client_address_string);
@@ -118,26 +123,77 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
return CORBA::B_TRUE;
}
+CORBA::Boolean
+ttcp_Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& server_spec)
+{
+ ACE_DEBUG ((LM_DEBUG,"ttcp_Client_StreamEndPoint::handle_postconnect \n"));
+ this->client_->set_stream (this->peer ());
+ return CORBA::B_TRUE;
+}
+
int
ttcp_Client_StreamEndPoint::open (void *)
{
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t) ttcp_Client_StreamEndPoint::open () called\n"));
return 0;
}
Client::Client (int argc, char **argv, ACE_Barrier *barrier)
- : reactive_strategy_ (&orb_manager_),
+ : reactive_strategy_ (&orb_manager_),
+ // :reactive_strategy_ (&orb_manager_,this),
client_mmdevice_ (&reactive_strategy_),
argc_ (argc),
argv_ (argv),
+ block_size_ (1),
+ number_ (10),
barrier_ (barrier)
{
}
+void
+Client::set_stream (ACE_SOCK_Stream & control)
+{
+ this->stream_ = control;
+}
+
+int
+Client::parse_args (int argc,
+ char **argv)
+{
+ ACE_Get_Opt opts (argc,argv,"b:");
+
+ int c;
+
+ while ((c = opts ()) != -1)
+ switch (c)
+ {
+ case 'b':
+ this->block_size_ = ACE_OS::atoi (opts.optarg);
+ break;
+// case 'n':
+// this->number_ = ACE_OS::atoi (opts.optarg);
+// break;
+ case '?':
+ ACE_DEBUG ((LM_DEBUG,"Usage %s [-b block_size] [-n number_of times]",
+ argv[0]));
+ break;
+ }
+ return 0;
+}
+
int
Client::svc (void)
{
+ // Now start pumping data.
+ ACE_High_Res_Timer timer;
+ ACE_Time_Value tv1,tv2;
+
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Thread created\n"));
+
+ if (this->parse_args (this->argc_,
+ this->argv_) == -1)
+ return -1;
TAO_TRY
{
this->orb_manager_.init (this->argc_,
@@ -165,15 +221,66 @@ Client::svc (void)
AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
AVStreams::flowSpec_var the_flows (new AVStreams::flowSpec);
// Bind the client and server mmdevices.
-
+
+ timer.start ();
this->streamctrl_.bind_devs
(this->client_mmdevice_._this (TAO_TRY_ENV),
this->server_mmdevice_.in (),
the_qos.inout (),
the_flows.in (),
TAO_TRY_ENV);
-
TAO_CHECK_ENV;
+
+ timer.stop ();
+ timer.elapsed_time (tv1);
+ long time_taken = tv1.sec () + tv1.usec () /1000000;
+ tv1.dump ();
+ //ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken is %ld \n",
+ // time_taken ));
+
+ return 0;
+
+ int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
+ int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
+
+ if (this->stream_.set_option (SOL_SOCKET,
+ SO_SNDBUF,
+ (void *) &sndbufsize,
+ sizeof (sndbufsize)) == -1
+ && errno != ENOTSUP)
+ return -1;
+ else if (this->stream_.set_option (SOL_SOCKET,
+ SO_RCVBUF,
+ (void *) &rcvbufsize,
+ sizeof (rcvbufsize)) == -1
+ && errno != ENOTSUP)
+ return -1;
+
+ int one = 1;
+ if (this->stream_.set_option (SOL_SOCKET,
+ TCP_NODELAY,
+ (char *)& one,
+ sizeof (one)) == -1 )
+ return -1;
+
+ char *buffer;
+ long buffer_siz = this->block_size_*1024;
+
+ ACE_NEW_RETURN (buffer,
+ char [buffer_siz],
+ -1);
+ timer.start ();
+ long number = 64 *1024/this->block_size_;
+ for (int i=0;i<number;i++)
+ this->stream_.send_n (buffer,buffer_siz);
+ timer.stop ();
+ timer.elapsed_time (tv2);
+ double total_time = tv2.sec ()+tv2.usec ()/1000000.0;
+ double total_data = 64*1024*1024;
+ ACE_DEBUG ((LM_DEBUG,"Total data = %f , Total time = %f \n",
+ total_data,total_time));
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t) Thruput for block size is:%d\t%f Mb/s \n",
+ buffer_siz,total_data/(total_time*1024.0*1024.0)));
}
TAO_CATCHANY
{
@@ -266,21 +373,21 @@ int
main (int argc, char **argv)
{
ACE_Get_Opt opts (argc, argv, "T:");
- int thread_count = 0;
+ int thread_count = 1;
- int c;
- while ((c = opts ()) != -1)
- switch (c)
- {
- case 'T':
- thread_count = (u_int) ACE_OS::atoi (opts.optarg);
- continue;
- default:
-// ACE_DEBUG ((LM_DEBUG,
-// "Usage: %s -t number_of_threads\n",
-// argv [0]));
- break;
- }
+int c;
+while ((c = opts ()) != -1)
+ switch (c)
+ {
+ case 'T':
+ thread_count = (u_int) ACE_OS::atoi (opts.optarg);
+ continue;
+ default:
+// ACE_DEBUG ((LM_DEBUG,
+// "Usage: %s -t number_of_threads\n",
+// argv [0]));
+ break;
+ }
ACE_Barrier *barrier;
ACE_NEW_RETURN (barrier,
@@ -310,3 +417,23 @@ main (int argc, char **argv)
ACE_Thread_Manager::instance ()->wait ();
}
+
+// -----------------------------------------------------------
+// Video_Endpoint_Reactive_Strategy_A methods
+
+ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager,
+ Client *client)
+ : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager),
+ client_ (client)
+{
+}
+
+int
+ttcp_Endpoint_Reactive_Strategy_A::make_stream_endpoint (ttcp_Client_StreamEndPoint *&endpoint)
+{
+ ACE_NEW_RETURN (endpoint,
+ ttcp_Client_StreamEndPoint (this->client_),
+ -1);
+
+ return 0;
+}