diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp | 234 |
1 files changed, 93 insertions, 141 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp index 7962c44421e..d3481b9596a 100644 --- a/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp +++ b/TAO/orbsvcs/tests/AVStreams/benchmark/client.cpp @@ -29,20 +29,15 @@ Client_StreamEndPoint::handle_close (void) CORBA::Boolean Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) { - // return 1; - the_spec.length (0); - ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_preconnect called\n")); return 0; } // called by the A/V framework after calling connect. Passes the -// server streamendpoints' flowspec which we use to connect our +// serer streamendpoints' flowspec which we use to connect our // datagram socket. CORBA::Boolean Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& /* server_spec */) { - // return 1; - ACE_DEBUG ((LM_DEBUG,"(%P|%t) handle_postconnect called \n")); return 0; } @@ -95,59 +90,19 @@ ttcp_Client_StreamEndPoint::ttcp_Client_StreamEndPoint (Client *client) CORBA::Boolean ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) { - // listen for the tcp socket. - - ACE_INET_Addr tcp_addr; - - // tcp_addr.set (TCP_PORT,"mambo-atm.cs.wustl.edu"); - tcp_addr.set (TCP_PORT); - - if (this->acceptor_.open (tcp_addr, + ACE_INET_Addr addr (GLOBALS::instance ()->port_, GLOBALS::instance ()->hostname_); + + if (this->acceptor_.open (addr, TAO_ORB_Core_instance ()->reactor ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR,"%p\n","open"),0); - ACE_INET_Addr local_addr; + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n","open") + ,0); + ACE_INET_Addr local_addr; if (this->acceptor_.acceptor ().get_local_addr (local_addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t)acceptor get local addr failed %p"),0); - - - char client_address_string [BUFSIZ]; - - ::sprintf (client_address_string, - "%s:%d", - local_addr.get_host_name (), - local_addr.get_port_number ()); - - ACE_INET_Addr addr (client_address_string); - - - char *flowname_; - ACE_NEW_RETURN (flowname_, - char [BUFSIZ], - 0); - ACE_OS::sprintf (flowname_, - "Data_%s", - "TCP"); - - char flow_protocol_str [BUFSIZ]; - ACE_OS::strcpy (flow_protocol_str,""); - - TAO_Forward_FlowSpec_Entry entry (flowname_, - "IN", - "USER_DEFINED", - flow_protocol_str, - "TCP", - &addr); - - entry.set_local_addr (&addr); - - the_spec [0] = CORBA::string_dup (entry.entry_to_string ()); - the_spec.length (1); - - - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) client flow spec is %s\n", - client_address_string)); + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t)acceptor get local addr failed %p"), + 0); return 1; } @@ -155,7 +110,8 @@ ttcp_Client_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &the_spec) CORBA::Boolean ttcp_Client_StreamEndPoint::handle_postconnect (AVStreams::flowSpec& /* server_spec */) { - ACE_DEBUG ((LM_DEBUG,"ttcp_Client_StreamEndPoint::handle_postconnect \n")); + ACE_DEBUG ((LM_DEBUG, + "ttcp_Client_StreamEndPoint::handle_postconnect \n")); this->client_->set_stream (this->peer ()); return 1; } @@ -167,13 +123,13 @@ ttcp_Client_StreamEndPoint::open (void *) return 0; } -Client::Client (int argc, char **argv,int task_id) - :ttcp_reactive_strategy_ (&orb_manager_,this), - reactive_strategy_ (&orb_manager_), - client_mmdevice_ (0), - argc_ (argc), - argv_ (argv), - task_id_ (task_id) +Client::Client (CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, int task_id) + : orb_ (orb), + poa_ (poa), + ttcp_reactive_strategy_ (orb, poa,this), + reactive_strategy_ (orb, poa), + client_mmdevice_ (0), + task_id_ (task_id) { } @@ -221,7 +177,6 @@ Globals::parse_args (int argc, this->thread_count_ = ACE_OS::atoi (opts.optarg); break; case 's': - // use ttcp strategy. this->strategy_ = TTCP_REACTIVE; break; case '?': @@ -237,7 +192,6 @@ int Client::svc (void) { - printf ("Within the SVC method\n"); // Now start pumping data. ACE_High_Res_Timer timer; ACE_Time_Value tv1,tv2; @@ -245,23 +199,12 @@ Client::svc (void) ACE_DEBUG ((LM_DEBUG, "(%P|%t) Thread created\n")); - if (GLOBALS::instance ()->parse_args (this->argc_,this->argv_) == -1) - return -1; ACE_DECLARE_NEW_CORBA_ENV; - ACE_TRY { - this->orb_manager_.init (this->argc_, - this->argv_, - ACE_TRY_ENV); - ACE_TRY_CHECK; - if (this->task_id_ == 0) { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, GLOBALS::instance ()->ready_mtx_, 1)); - if (GLOBALS::instance ()->parse_args (this->argc_, - this->argv_) == -1) - return -1; ACE_NEW_RETURN (GLOBALS::instance ()->barrier_, ACE_Barrier (GLOBALS::instance ()->thread_count_), @@ -288,14 +231,18 @@ Client::svc (void) -1); break; } - + // activate the client MMDevice with the ORB - this->orb_manager_.activate (this->client_mmdevice_, - ACE_TRY_ENV); + this->poa_->activate_object (this->client_mmdevice_, + ACE_TRY_ENV); ACE_TRY_CHECK; - - this->orb_manager_.activate_poa_manager (ACE_TRY_ENV); - + + //Activate POA Manager + PortableServer::POAManager_var mgr + = this->poa_->the_POAManager (); + + mgr->activate (); + if (this->bind_to_server () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Error binding to the naming service\n"), @@ -307,48 +254,43 @@ Client::svc (void) ACE_DEBUG ((LM_DEBUG, "(%P|%t) All threads finished, starting tests.\n")); ACE_Time_Value tv (0); - this->orb_manager_.run (ACE_TRY_ENV,&tv); + this->orb_->run (&tv, ACE_TRY_ENV); ACE_TRY_CHECK; AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); AVStreams::flowSpec_var the_flows (new AVStreams::flowSpec); - // Bind the client and server mmdevices. - - ACE_INET_Addr addr ("danzon:5000"); - - char* flowname_; - ACE_NEW_RETURN (flowname_, - char [BUFSIZ], - 0); - ACE_OS::sprintf (flowname_, - "Data_%s", - "TCP"); - TAO_Forward_FlowSpec_Entry entry (flowname_, - "IN", - "USER_DEFINED", - "", - "TCP", - &addr); - - AVStreams::flowSpec flow_spec (1); - flow_spec [0] = CORBA::string_dup (entry.entry_to_string ()); - flow_spec.length (1); - + + ACE_DEBUG ((LM_DEBUG, + "Host %d ,Port %s\n", + GLOBALS::instance ()->port_, GLOBALS::instance ()->hostname_)); + + ACE_INET_Addr addr (GLOBALS::instance ()->port_, + GLOBALS::instance ()->hostname_); + + char buf [BUFSIZ]; + addr.addr_to_string (buf, BUFSIZ); + + ACE_DEBUG ((LM_DEBUG, + "Client Svc %s\n", + buf)); + + AVStreams::flowSpec flow_spec (0); + timer.start (); - this->streamctrl_.bind_devs - (this->client_mmdevice_->_this (), - this->server_mmdevice_.in (), - the_qos.inout (), - flow_spec, - ACE_TRY_ENV); + this->streamctrl_.bind_devs (this->client_mmdevice_->_this (), + this->server_mmdevice_.in (), + the_qos.inout (), + flow_spec, + ACE_TRY_ENV); ACE_TRY_CHECK; - timer.stop (); timer.elapsed_time (tv1); + long time_taken = tv1.sec () + tv1.usec () /1000000; tv1.dump (); + ACE_DEBUG ((LM_DEBUG,"(%P|%t)time taken for stream setup is %ld \n", time_taken )); - + #if !defined (ACE_LACKS_SOCKET_BUFSIZ) int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; @@ -367,6 +309,7 @@ Client::svc (void) if ((result == -1)&& (errno != ENOTSUP)) ACE_ERROR_RETURN ((LM_ERROR,"set_option failed for rcvbufsize:%p\n",""),-1); #endif /* ACE_LACKS_SOCKET_BUFSIZ */ + #if defined (TCP_NODELAY) int one = 1; result = this->stream_.set_option (SOL_SOCKET, @@ -415,21 +358,8 @@ Client::bind_to_server (void) ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { - /* - CORBA::Object_var naming_obj = - this->orb_manager_.orb ()->resolve_initial_references ("NameService"); - if (CORBA::is_nil (naming_obj.in ())) - ACE_ERROR_RETURN ((LM_ERROR, - " (%P|%t) Unable to resolve the Name Service.\n"), - -1); - CosNaming::NamingContext_var naming_context = - CosNaming::NamingContext::_narrow (naming_obj.in (), - ACE_TRY_ENV); - ACE_TRY_CHECK; - */ - // Initialize the naming services - if (my_name_client_.init (this->orb_manager_.orb ()) != 0) + if (my_name_client_.init (this->orb_.in()) != 0) ACE_ERROR_RETURN ((LM_ERROR, " (%P|%t) Unable to initialize " "the TAO_Naming_Client. \n"), @@ -439,11 +369,11 @@ Client::bind_to_server (void) server_mmdevice_name.length (1); server_mmdevice_name [0].id = CORBA::string_dup ("Bench_Server_MMDevice"); - CORBA::Object_var server_mmdevice_obj = - my_name_client_->resolve (server_mmdevice_name, - ACE_TRY_ENV); + CORBA::Object_var server_mmdevice_obj + = my_name_client_->resolve (server_mmdevice_name, + ACE_TRY_ENV); ACE_TRY_CHECK; - + this->server_mmdevice_ = AVStreams::MMDevice::_narrow (server_mmdevice_obj.in (), ACE_TRY_ENV); @@ -472,8 +402,8 @@ Client::establish_stream (void) AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); AVStreams::flowSpec_var the_flows (new AVStreams::flowSpec); - // Bind the client and server mmdevices. + // Bind the client and server mmdevices. ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { @@ -501,9 +431,10 @@ Client::establish_stream (void) // ----------------------------------------------------------- // Video_Endpoint_Reactive_Strategy_A methods -ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (TAO_ORB_Manager *orb_manager, - Client *client) - : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb_manager), +ttcp_Endpoint_Reactive_Strategy_A::ttcp_Endpoint_Reactive_Strategy_A (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa, + Client *client) + : TAO_AV_Endpoint_Reactive_Strategy_A<ttcp_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> (orb, poa), client_ (client) { } @@ -524,8 +455,14 @@ int main (int argc, char **argv) { + //TAO_debug_level++; + + if (GLOBALS::instance ()->parse_args (argc, argv) == -1) + return -1; + GLOBALS::instance ()->thread_count_ = 1; - // Preliminary argument processing. + + // Preliminary argument processing. int i; for (i=0;i< argc;i++) { @@ -535,15 +472,30 @@ main (int argc, char **argv) ACE_OS::atoi (argv[i+1]); } + CORBA::ORB_var orb = CORBA::ORB_init (argc, + argv); + + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA"); + + PortableServer::POA_var poa + = PortableServer::POA::_narrow (obj); + + //Activate POA Manager + PortableServer::POAManager_var mgr + = poa->the_POAManager (); + + mgr->activate (); + for (i = 0; i < GLOBALS::instance ()->thread_count_; i++) { - Client *client; + Client* client; ACE_NEW_RETURN (client, - Client (argc, - argv, + Client (orb.in (), + poa.in (), i), -1); - + if (client->activate (THR_BOUND) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Error in activate: %p", |