summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp234
1 files changed, 93 insertions, 141 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
index 7962c44421e..d3481b9596a 100644
--- a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp
@@ -29,20 +29,15 @@ Client_StreamEndPoint::handle_close (void)
CORBA::Boolean
Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
{
- // return 1;
- the_spec.length (0);
- ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_preconnect called\n"));
return 0;
}
// called by the A/V framework after calling connect. Passes the
-// server streamendpoints' flowspec which we use to connect our
+// serer streamendpoints' flowspec which we use to connect our
// datagram socket.
CORBA::Boolean
Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& /* server_spec */)
{
- // return 1;
- ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_postconnect called \n"));
return 0;
}
@@ -95,59 +90,19 @@ ttcp_Client_StreamEndPoint::ttcp_Client_StreamEndPoint (Client *client)
CORBA::Boolean
ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
{
- // listen for the tcp socket.
-
- ACE_INET_Addr tcp_addr;
-
- // tcp_addr.set (TCP_PORT,"mambo-atm.cs.wustl.edu");
- tcp_addr.set (TCP_PORT);
-
- if (this->acceptor_.open (tcp_addr,
+ ACE_INET_Addr addr (GLOBALS::instance ()->port_, GLOBALS::instance ()->hostname_);
+
+ if (this->acceptor_.open (addr,
TAO_ORB_Core_instance ()->reactor ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"%p\n","open"),0);
- ACE_INET_Addr local_addr;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n","open")
+ ,0);
+ ACE_INET_Addr local_addr;
if (this->acceptor_.acceptor ().get_local_addr (local_addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t)acceptor get local addr failed %p"),0);
-
-
- char client_address_string [BUFSIZ];
-
- ::sprintf (client_address_string,
- "%s:%d",
- local_addr.get_host_name (),
- local_addr.get_port_number ());
-
- ACE_INET_Addr addr (client_address_string);
-
-
- char *flowname_;
- ACE_NEW_RETURN (flowname_,
- char [BUFSIZ],
- 0);
- ACE_OS::sprintf (flowname_,
- "Data_%s",
- "TCP");
-
- char flow_protocol_str [BUFSIZ];
- ACE_OS::strcpy (flow_protocol_str,"");
-
- TAO_Forward_FlowSpec_Entry entry (flowname_,
- "IN",
- "USER_DEFINED",
- flow_protocol_str,
- "TCP",
- &addr);
-
- entry.set_local_addr (&addr);
-
- the_spec [0] = CORBA::string_dup (entry.entry_to_string ());
- the_spec.length (1);
-
-
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) client flow spec is %s\n",
- client_address_string));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t)acceptor get local addr failed %p"),
+ 0);
return 1;
}
@@ -155,7 +110,8 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec)
CORBA::Boolean
ttcp_Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& /* server_spec */)
{
- ACE_DEBUG ((LM_DEBUG,"ttcp_Client_StreamEndPoint::handle_postconnect \n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "ttcp_Client_StreamEndPoint::handle_postconnect \n"));
this->client_->set_stream (this->peer ());
return 1;
}
@@ -167,13 +123,13 @@ ttcp_Client_StreamEndPoint::open (void *)
return 0;
}
-Client::Client (int argc, char **argv,int task_id)
- :ttcp_reactive_strategy_ (&orb_manager_,this),
- reactive_strategy_ (&orb_manager_),
- client_mmdevice_ (0),
- argc_ (argc),
- argv_ (argv),
- task_id_ (task_id)
+Client::Client (CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, int task_id)
+ : orb_ (orb),
+ poa_ (poa),
+ ttcp_reactive_strategy_ (orb, poa,this),
+ reactive_strategy_ (orb, poa),
+ client_mmdevice_ (0),
+ task_id_ (task_id)
{
}
@@ -221,7 +177,6 @@ Globals::parse_args (int argc,
this->thread_count_ = ACE_OS::atoi (opts.optarg);
break;
case 's':
- // use ttcp strategy.
this->strategy_ = TTCP_REACTIVE;
break;
case '?':
@@ -237,7 +192,6 @@ int
Client::svc (void)
{
- printf ("Within the SVC method\n");
// Now start pumping data.
ACE_High_Res_Timer timer;
ACE_Time_Value tv1,tv2;
@@ -245,23 +199,12 @@ Client::svc (void)
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Thread created\n"));
- if (GLOBALS::instance ()->parse_args (this->argc_,this->argv_) == -1)
- return -1;
ACE_DECLARE_NEW_CORBA_ENV;
-
ACE_TRY
{
- this->orb_manager_.init (this->argc_,
- this->argv_,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
-
if (this->task_id_ == 0)
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, GLOBALS::instance ()->ready_mtx_, 1));
- if (GLOBALS::instance ()->parse_args (this->argc_,
- this->argv_) == -1)
- return -1;
ACE_NEW_RETURN (GLOBALS::instance ()->barrier_,
ACE_Barrier (GLOBALS::instance ()->thread_count_),
@@ -288,14 +231,18 @@ Client::svc (void)
-1);
break;
}
-
+
// activate the client MMDevice with the ORB
- this->orb_manager_.activate (this->client_mmdevice_,
- ACE_TRY_ENV);
+ this->poa_->activate_object (this->client_mmdevice_,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
-
- this->orb_manager_.activate_poa_manager (ACE_TRY_ENV);
-
+
+ //Activate POA Manager
+ PortableServer::POAManager_var mgr
+ = this->poa_->the_POAManager ();
+
+ mgr->activate ();
+
if (this->bind_to_server () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%P|%t) Error binding to the naming service\n"),
@@ -307,48 +254,43 @@ Client::svc (void)
ACE_DEBUG ((LM_DEBUG, "(%P|%t) All threads finished, starting tests.\n"));
ACE_Time_Value tv (0);
- this->orb_manager_.run (ACE_TRY_ENV,&tv);
+ this->orb_->run (&tv, ACE_TRY_ENV);
ACE_TRY_CHECK;
AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
AVStreams::flowSpec_var the_flows (new AVStreams::flowSpec);
- // Bind the client and server mmdevices.
-
- ACE_INET_Addr addr ("danzon:5000");
-
- char* flowname_;
- ACE_NEW_RETURN (flowname_,
- char [BUFSIZ],
- 0);
- ACE_OS::sprintf (flowname_,
- "Data_%s",
- "TCP");
- TAO_Forward_FlowSpec_Entry entry (flowname_,
- "IN",
- "USER_DEFINED",
- "",
- "TCP",
- &addr);
-
- AVStreams::flowSpec flow_spec (1);
- flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
- flow_spec.length (1);
-
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Host %d ,Port %s\n",
+ GLOBALS::instance ()->port_, GLOBALS::instance ()->hostname_));
+
+ ACE_INET_Addr addr (GLOBALS::instance ()->port_,
+ GLOBALS::instance ()->hostname_);
+
+ char buf [BUFSIZ];
+ addr.addr_to_string (buf, BUFSIZ);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Client Svc %s\n",
+ buf));
+
+ AVStreams::flowSpec flow_spec (0);
+
timer.start ();
- this->streamctrl_.bind_devs
- (this->client_mmdevice_->_this (),
- this->server_mmdevice_.in (),
- the_qos.inout (),
- flow_spec,
- ACE_TRY_ENV);
+ this->streamctrl_.bind_devs (this->client_mmdevice_->_this (),
+ this->server_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
-
timer.stop ();
timer.elapsed_time (tv1);
+
long time_taken = tv1.sec () + tv1.usec () /1000000;
tv1.dump ();
+
ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken for stream setup is %ld \n",
time_taken ));
-
+
#if !defined (ACE_LACKS_SOCKET_BUFSIZ)
int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
@@ -367,6 +309,7 @@ Client::svc (void)
if ((result == -1)&& (errno != ENOTSUP))
ACE_ERROR_RETURN ((LM_ERROR,"set_option failed for rcvbufsize:%p\n",""),-1);
#endif /* ACE_LACKS_SOCKET_BUFSIZ */
+
#if defined (TCP_NODELAY)
int one = 1;
result = this->stream_.set_option (SOL_SOCKET,
@@ -415,21 +358,8 @@ Client::bind_to_server (void)
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
- /*
- CORBA::Object_var naming_obj =
- this->orb_manager_.orb ()->resolve_initial_references ("NameService");
- if (CORBA::is_nil (naming_obj.in ()))
- ACE_ERROR_RETURN ((LM_ERROR,
- " (%P|%t) Unable to resolve the Name Service.\n"),
- -1);
- CosNaming::NamingContext_var naming_context =
- CosNaming::NamingContext::_narrow (naming_obj.in (),
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- */
-
// Initialize the naming services
- if (my_name_client_.init (this->orb_manager_.orb ()) != 0)
+ if (my_name_client_.init (this->orb_.in()) != 0)
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Unable to initialize "
"the TAO_Naming_Client. \n"),
@@ -439,11 +369,11 @@ Client::bind_to_server (void)
server_mmdevice_name.length (1);
server_mmdevice_name [0].id = CORBA::string_dup ("Bench_Server_MMDevice");
- CORBA::Object_var server_mmdevice_obj =
- my_name_client_->resolve (server_mmdevice_name,
- ACE_TRY_ENV);
+ CORBA::Object_var server_mmdevice_obj
+ = my_name_client_->resolve (server_mmdevice_name,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
-
+
this->server_mmdevice_ =
AVStreams::MMDevice::_narrow (server_mmdevice_obj.in (),
ACE_TRY_ENV);
@@ -472,8 +402,8 @@ Client::establish_stream (void)
AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
AVStreams::flowSpec_var the_flows (new AVStreams::flowSpec);
- // Bind the client and server mmdevices.
+ // Bind the client and server mmdevices.
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
@@ -501,9 +431,10 @@ Client::establish_stream (void)
// -----------------------------------------------------------
// Video_Endpoint_Reactive_Strategy_A methods
-ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager,
- Client *client)
- : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager),
+ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (CORBA::ORB_ptr orb,
+ PortableServer::POA_ptr poa,
+ Client *client)
+ : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb, poa),
client_ (client)
{
}
@@ -524,8 +455,14 @@ int
main (int argc, char **argv)
{
+ //TAO_debug_level++;
+
+ if (GLOBALS::instance ()->parse_args (argc, argv) == -1)
+ return -1;
+
GLOBALS::instance ()->thread_count_ = 1;
- // Preliminary argument processing.
+
+ // Preliminary argument processing.
int i;
for (i=0;i< argc;i++)
{
@@ -535,15 +472,30 @@ main (int argc, char **argv)
ACE_OS::atoi (argv[i+1]);
}
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA");
+
+ PortableServer::POA_var poa
+ = PortableServer::POA::_narrow (obj);
+
+ //Activate POA Manager
+ PortableServer::POAManager_var mgr
+ = poa->the_POAManager ();
+
+ mgr->activate ();
+
for (i = 0; i < GLOBALS::instance ()->thread_count_; i++)
{
- Client *client;
+ Client* client;
ACE_NEW_RETURN (client,
- Client (argc,
- argv,
+ Client (orb.in (),
+ poa.in (),
i),
-1);
-
+
if (client->activate (THR_BOUND) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%P|%t) Error in activate: %p",