diff options
author | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-03-05 10:41:13 +0000 |
---|---|---|
committer | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-03-05 10:41:13 +0000 |
commit | b7402ad5e87d288c50ea3fbb9f7bb1540536a30e (patch) | |
tree | 2c5784e48bdde74f0a37b01d10b37de635f3204a | |
parent | e411309e1f55052168f7cef4280e58cf170ab759 (diff) | |
download | ATCD-b7402ad5e87d288c50ea3fbb9f7bb1540536a30e.tar.gz |
*** empty log message ***
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Pluggable/README | 39 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp | 546 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.h | 148 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Pluggable/server.cpp | 275 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Pluggable/server.h | 93 |
5 files changed, 645 insertions, 456 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable/README b/TAO/orbsvcs/tests/AVStreams/Pluggable/README index 8c24da61dd7..f5d2725204d 100644 --- a/TAO/orbsvcs/tests/AVStreams/Pluggable/README +++ b/TAO/orbsvcs/tests/AVStreams/Pluggable/README @@ -1,22 +1,45 @@ // $Id$ -This directory contains a simple file transfer test program for the -Pluggable Data Protocols in the TAO's Audio/Video Streaming -Service. This program can be run using UDP,TCP and SFP over UDP. +Description +----------- -server: -------- +This directory contains a comprehensive test in the form of a ftp client and server. +The test has the following features. --f filename -> The name of the file under which the received stream - data has to be stored. +1. It tests the AVStreams Pluggable Protocol Framework +2. Shows a mechanism to pace data. +3. Performs benchmarking. + +Running the test +---------------- + +server +------ + +server -f <output_filename> + +-f <output_filename> -> The name of the file under which the received stream + data has to be stored. ftp: ---- +ftp [-f <filename>] [-a <address>] [-p <protocol>] [-s] [-r <frame rate>] [-d] + + -f filename --> The file to be streamed to the server. It is currently streamed at the rate of 0.5kbytes/s. --p protocol --> The protocol string could be UDP or TCP. +-a address --> Multicast address or the host on which the client is running + Default is multicast address + +-p protocol --> The protocol string could be UDP or TCP. But with the + multicast address it should be UDP. -s --> flag to use SFP. This option cannot be used with -p TCP since SFP currently runs only over UDP. + +-r framerate--> The rate at which tha data frames need to be sent. + +-d --> Increament the TAO_debug_level for debug messages. + diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp b/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp index 0aad385ae0a..52d0abe97a3 100644 --- a/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp @@ -3,97 +3,17 @@ #include "ftp.h" #include "ace/Get_Opt.h" #include "ace/High_Res_Timer.h" -#include "ace/Stats.h" +ACE_High_Res_Timer last_frame_sent_time; +// The time taken for sending a frmae and preparing for the next frame -ACE_hrtime_t recv_throughput_base = 0; -ACE_Throughput_Stats recv_latency; - -ACE_hrtime_t send_throughput_base = 0; -ACE_Throughput_Stats send_latency; - -int count =0; -int message_size = 64; +ACE_Time_Value inter_frame_time; +// The time that should lapse between two consecutive frames sent. FTP_Client_Callback::FTP_Client_Callback (void) - :count_ (0) { } -int -FTP_Client_Callback::handle_destroy (void) -{ - TAO_AV_CORE::instance ()->stop_run (); - return 0; -} - -void -FTP_Client_Callback::get_timeout (ACE_Time_Value *&tv, - void *&) -{ - ACE_Time_Value *timeout; - ACE_NEW (timeout, - ACE_Time_Value(1)); - tv = timeout; -} - -//@@coryan: Interpretation for the return value like ACE_Event_Handler's handle_timeout method. -int -FTP_Client_Callback::handle_timeout (void *) -{ - - ACE_TRY_NEW_ENV - { - ACE_Message_Block mb (BUFSIZ); - char *buf = mb.rd_ptr (); - - int n = ACE_OS::fread(buf,1,mb.size (),CLIENT::instance ()->file ()); - - if (n < 0) - { - ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread end of file\n"),-1); - } - if (n == 0) - { - if (feof (CLIENT::instance ()->file ())) - { - // wait for sometime for the data to be flushed to the other side. - this->count_++; - if (this->count_ == 2) - { - //@@coryan: Remove these code from this method. - //Should be called when the user wants to stop the stream. - ACE_DEBUG ((LM_DEBUG,"handle_timeout:End of file\n")); - AVStreams::flowSpec stop_spec (1); - ACE_DECLARE_NEW_CORBA_ENV; - CLIENT::instance ()->streamctrl ()->stop (stop_spec,ACE_TRY_ENV); - ACE_TRY_CHECK; - TAO_AV_CORE::instance ()->orb ()->shutdown (0); - ACE_TRY_CHECK; - return 0; - } - else - return 0; - } - else - return 0; - } - - mb.wr_ptr (n); - int result = this->protocol_object_->send_frame (&mb); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"send failed:%p","FTP_Client_Flow_Handler::send \n"),-1); - ACE_DEBUG ((LM_DEBUG,"handle_timeout::buffer sent succesfully\n")); - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"FTP_Client_Callback::handle_timeout Failed"); - return -1; - } - ACE_ENDTRY; - return 0; -} - FTP_Client_StreamEndPoint::FTP_Client_StreamEndPoint (void) { } @@ -102,10 +22,9 @@ int FTP_Client_StreamEndPoint::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_NEW_RETURN (this->callback_, - FTP_Client_Callback, - -1); - callback = this->callback_; + // Create and return the clienmt application callback and return to the AVStreams + // for further upcalls. + callback = &this->callback_; return 0; } @@ -113,33 +32,38 @@ int FTP_Client_StreamEndPoint::set_protocol_object (const char *, TAO_AV_Protocol_Object *object) { - this->callback_->set_protocol_object (object); + // Set the client protocol object corresponding to the transport protocol selected. + CLIENT::instance ()->set_protocol_object (object); return 0; } -Endpoint_Reactive_Strategy::Endpoint_Reactive_Strategy (Client *client) - : client_ (client) +Client::Client (void) + :client_mmdevice_ (&endpoint_strategy_), + count_ (0), + address_ (ACE_OS::strdup ("224.9.9.2:12345")), + fp_ (0), + protocol_ (ACE_OS::strdup ("UDP")), + frame_rate_ (30) { - this->init(TAO_AV_CORE::instance ()->orb (), TAO_AV_CORE::instance ()->poa ()); + this->mb.size (BUFSIZ); } -int -Endpoint_Reactive_Strategy::make_stream_endpoint (FTP_Client_StreamEndPoint *&endpoint) +void +Client::set_protocol_object (TAO_AV_Protocol_Object *object) { - ACE_DEBUG ((LM_DEBUG,"Endpoint_Reactive_Strategy::make_stream_endpoint")); - ACE_NEW_RETURN (endpoint, - FTP_Client_StreamEndPoint, - -1); - return 0; + // Set the client protocol object corresponding to the transport protocol selected. + this->protocol_object_ = object; } int Client::parse_args (int argc, char **argv) { - ACE_Get_Opt opts (argc,argv,"f:a:p:sdt"); + // Parse command line arguments + ACE_Get_Opt opts (argc,argv,"f:a:p:r:sd"); this->use_sfp_ = 0; + int c; while ((c= opts ()) != -1) { @@ -154,17 +78,17 @@ Client::parse_args (int argc, case 'p': this->protocol_ = ACE_OS::strdup (opts.optarg); break; + case 'r': + this->frame_rate_ = ACE_OS::atoi (opts.optarg); + break; case 's': this->use_sfp_ = 1; break; case 'd': TAO_debug_level++; break; - case 't': - this->test_ = 1; - break; default: - ACE_DEBUG ((LM_DEBUG,"Unknown option\n")); + ACE_DEBUG ((LM_DEBUG,"Unknown Option\n")); return -1; } } @@ -189,166 +113,262 @@ Client::streamctrl (void) return &this->streamctrl_; } -Client::Client (void) - : endpoint_strategy_ (this), - client_mmdevice_ (&endpoint_strategy_), - address_ (ACE_OS::strdup ("224.9.9.2:12345")), - fp_ (0), - protocol_ (ACE_OS::strdup ("UDP")) + +int +Client::frame_rate (void) { + return this->frame_rate_; } +// Method to get the object reference of the server int -Client::bind_to_server (void) +Client::bind_to_server (CORBA::Environment &ACE_TRY_ENV) { - ACE_DECLARE_NEW_CORBA_ENV; - - ACE_TRY - { - // Initialize the naming services - if (my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0) - ACE_ERROR_RETURN ((LM_ERROR, - " (%P|%t) Unable to initialize " - "the TAO_Naming_Client. \n"), - -1); - - CosNaming::Name server_mmdevice_name (1); - server_mmdevice_name.length (1); - server_mmdevice_name [0].id = CORBA::string_dup ("Server_MMDevice"); - CORBA::Object_var server_mmdevice_obj = - my_naming_client_->resolve (server_mmdevice_name, - ACE_TRY_ENV); - ACE_TRY_CHECK; - - this->server_mmdevice_ = - AVStreams::MMDevice::_narrow (server_mmdevice_obj.in (), - ACE_TRY_ENV); - ACE_TRY_CHECK; - - if (CORBA::is_nil (this->server_mmdevice_.in ())) - ACE_ERROR_RETURN ((LM_ERROR, - " could not resolve Server_Mmdevice in Naming service <%s>\n"), - -1); - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Command_Handler::resolve_reference"); - return -1; - } - ACE_ENDTRY; + // Initialize the naming services + if (my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize " + "the TAO_Naming_Client. \n"), + -1); + + CosNaming::Name server_mmdevice_name (1); + server_mmdevice_name.length (1); + server_mmdevice_name [0].id = CORBA::string_dup ("Server_MMDevice"); + + // Resolve the server object reference from the Naming Service + CORBA::Object_var server_mmdevice_obj = + my_naming_client_->resolve (server_mmdevice_name, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + this->server_mmdevice_ = + AVStreams::MMDevice::_narrow (server_mmdevice_obj.in (), + ACE_TRY_ENV); ACE_CHECK_RETURN (-1); + + if (CORBA::is_nil (this->server_mmdevice_.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + "Could not resolve Server_MMdevice in Naming service <%s>\n"), + -1); + return 0; } int -Client::init (int argc,char **argv) +Client::init (int argc, + char **argv, + CORBA::Environment& ACE_TRY_ENV) { this->argc_ = argc; this->argv_ = argv; - - // Increase the debug_level so that we can see the output - // TAO_debug_level++; + CORBA::String_var ior; - PortableServer::POAManager_var mgr - = TAO_AV_CORE::instance ()->poa ()->the_POAManager (); - - mgr->activate (); - - this->parse_args (argc, argv); - - if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0) - ACE_ERROR_RETURN ((LM_ERROR, - " (%P|%t) Unable to initialize " - "the TAO_Naming_Client. \n"), - -1); - - this->fp_ = ACE_OS::fopen (this->filename_,"r"); - if (this->fp_ != 0) - { - ACE_DEBUG ((LM_DEBUG,"file opened successfully\n")); - } + + // Initialize the endpoint strategy with the orb and poa. + this->endpoint_strategy_.init(TAO_AV_CORE::instance ()->orb (), + TAO_AV_CORE::instance ()->poa ()); + // Parse the command line arguments + this->parse_args (argc, + argv); - if (this->bind_to_server () == -1) + + // Open file to read. + this->fp_ = ACE_OS::fopen (this->filename_, + "r"); + if (this->fp_ == 0) + ACE_ERROR_RETURN ((LM_DEBUG, + "Cannot open output file %s\n", + this->filename_), + -1); + + // Resolve the object reference of the server from the Naming Service. + if (this->bind_to_server (ACE_TRY_ENV) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Error binding to the naming service\n"), -1); - + + // Create the Flow protocol name + char flow_protocol_str [BUFSIZ]; + if (this->use_sfp_) + ACE_OS::strcpy (flow_protocol_str, + "sfp:1.0"); + else + ACE_OS::strcpy (flow_protocol_str, + ""); + + // Initialize the QoS + AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); + + // Set the address of the ftp client. + ACE_INET_Addr addr (this->address_); + + // Initialize the flowname + ACE_NEW_RETURN (this->flowname_, + char [BUFSIZ], + 0); + + ACE_OS::sprintf (this->flowname_, + "Data_%s", + this->protocol_); + + // Create the forward flow specification to describe the flow. + TAO_Forward_FlowSpec_Entry entry (this->flowname_, + "IN", + "USER_DEFINED", + flow_protocol_str, + this->protocol_, + &addr); + + AVStreams::flowSpec flow_spec (1); + flow_spec [0] = CORBA::string_dup (entry.entry_to_string ()); + flow_spec.length (1); + + // Bind/Connect the client and server MMDevices. + CORBA::Boolean result = + this->streamctrl_.bind_devs (this->client_mmdevice_._this (ACE_TRY_ENV), + this->server_mmdevice_.in (), + the_qos.inout (), + flow_spec, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (result == 0) + ACE_ERROR_RETURN ((LM_ERROR,"streamctrl::bind_devs failed\n"),-1); + return 0; } +// Method to send data at the specified rate int -Client::run (void) +Client::pace_data (CORBA::Environment& ACE_TRY_ENV) { - ACE_DECLARE_NEW_CORBA_ENV; + + // Rate at which frames of data need to be sent. + this->frame_rate_ = CLIENT::instance ()->frame_rate (); + + // Time within which a frame should be sent. + double frame_time = 1/ (double) this->frame_rate_; + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Frame Time ONE = %f\n Frame Rate = %d\n", + frame_time, + this->frame_rate_)); + + // The time between two consecutive frames. + inter_frame_time.set (frame_time); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Inter Frame Time = %d\n", + inter_frame_time.msec ())); + ACE_TRY { - char flow_protocol_str [BUFSIZ]; - if (this->use_sfp_) - ACE_OS::strcpy (flow_protocol_str,"sfp:1.0"); - else - ACE_OS::strcpy (flow_protocol_str,""); - AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); - - ACE_TRY_CHECK; - ACE_INET_Addr addr (this->address_); - ACE_NEW_RETURN (this->flowname_, - char [BUFSIZ], - 0); - ACE_OS::sprintf (this->flowname_, - "Data_%s", - this->protocol_); - TAO_Forward_FlowSpec_Entry entry (this->flowname_, - "IN", - "USER_DEFINED", - flow_protocol_str, - this->protocol_, - &addr); - - AVStreams::flowSpec flow_spec (1); - flow_spec [0] = CORBA::string_dup (entry.entry_to_string ()); - flow_spec.length (1); - //ACE_High_Res_Timer timer; - //ACE_Time_Value elapsed; - // timer.start (); - CORBA::Boolean result = - this->streamctrl_.bind_devs (this->client_mmdevice_._this (ACE_TRY_ENV), - this->server_mmdevice_.in (), - the_qos.inout (), - flow_spec, - ACE_TRY_ENV); - //timer.stop (); - // timer.elapsed_time (elapsed); - //elapsed.dump (); - - // If we're supposed to do only bind_devs time calculation return. - if (this->test_) - { - AVStreams::flowSpec flow_spec; - this->streamctrl_.destroy (flow_spec,ACE_TRY_ENV); - ACE_TRY_CHECK; - return 0; - } + + // Continue to send data till the file is read to the end. + while (1) + { + // Count the frames sent. + count_++; + + // Read from the file into a message block. + int n = ACE_OS::fread(this->mb.rd_ptr (), + 1, + this->mb.size (), + CLIENT::instance ()->file ()); + + if (n < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "FTP_Client_Flow_Handler::fread end of file\n"), + -1); + + if (n == 0) + { + if (feof (CLIENT::instance ()->file ())) + { + // At end of file break the loop and end the client. + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n")); + break; + } + + } + + this->mb.wr_ptr (n); + + if ( this->count_ > 1) + { + // Greater than the first frame. + + // Stop the timer that was started just before the previous frame was sent. + last_frame_sent_time.stop (); + + // Get the time elapsed after sending the previous frame. + ACE_Time_Value tv; + last_frame_sent_time.elapsed_time (tv); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Elapsed Time = %d\n", + tv.msec ())); + + // Check to see if the inter frame time has elapsed. + if (tv < inter_frame_time) + { + // Inter frame time has not elapsed. + + // Claculate the time to wait before the next frame needs to be sent. + ACE_Time_Value wait_time (inter_frame_time - tv); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Wait Time = %d\n", + wait_time.msec ())); + + // run the orb for the wait time so the client can continue other orb requests. + TAO_AV_CORE::instance ()->orb ()->run (wait_time, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + + // Start timer before sending the frame. + last_frame_sent_time.start (); + + // Send frame. + int result = this->protocol_object_->send_frame (&this->mb); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "send failed:%p","FTP_Client_Flow_Handler::send\n"), + -1); + ACE_DEBUG ((LM_DEBUG,"Client::pace_data buffer sent succesfully\n")); + + // Reset the mb. + this->mb.reset (); + + } // end while + + // Since the file is read stop the stream. + AVStreams::flowSpec stop_spec (1); + CLIENT::instance ()->streamctrl ()->destroy (stop_spec, + ACE_TRY_ENV); ACE_TRY_CHECK; - if (result == 0) - ACE_ERROR_RETURN ((LM_ERROR,"streamctrl::bind_devs failed\n"),-1); - - AVStreams::flowSpec start_spec (1); - //start_spec.length (1); - //start_spec [0] = CORBA::string_dup (this->flowname_); - this->streamctrl_.start (start_spec,ACE_TRY_ENV); + + // Shut the orb down. + TAO_AV_CORE::instance ()->orb ()->shutdown (0); ACE_TRY_CHECK; - - TAO_AV_CORE::instance ()->orb ()->run (); + } ACE_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Client::run\n"); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Client::pace_data Failed"); return -1; } ACE_ENDTRY; - ACE_CHECK_RETURN (-1); return 0; } @@ -360,40 +380,49 @@ main (int argc, ACE_TRY { CORBA::ORB_var orb = CORBA::ORB_init (argc, - argv); + argv, + 0, + ACE_TRY_ENV); CORBA::Object_var obj - = orb->resolve_initial_references ("RootPOA"); + = orb->resolve_initial_references ("RootPOA", + ACE_TRY_ENV); + ACE_TRY_CHECK; + + + //Get the POA_var object from Object_var + PortableServer::POA_var root_poa + = PortableServer::POA::_narrow (obj.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::POAManager_var mgr + = root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; - PortableServer::POA_var poa - = PortableServer::POA::_narrow (obj.in ()); + mgr->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + // Initialize the AV STream components. TAO_AV_CORE::instance ()->init (orb.in (), - poa.in (), + root_poa.in (), ACE_TRY_ENV); ACE_TRY_CHECK; + + // INitialize the Client. int result = 0; - result = CLIENT::instance ()->init (argc,argv); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"client::init failed\n"),1); - result = CLIENT::instance ()->run (); + result = CLIENT::instance ()->init (argc, + argv, + ACE_TRY_ENV); if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"client::run failed\n"),1); - ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . ")); - ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); - ACE_DEBUG ((LM_DEBUG, "done\n")); - - recv_latency.dump_results ("Receive", gsf); - - send_latency.dump_results ("Send", gsf); - poa->destroy (1, 1, ACE_TRY_ENV); - ACE_CHECK_RETURN (-1); + ACE_ERROR_RETURN ((LM_ERROR, + "client::init failed\n"),1); + + // Start sending data. + result = CLIENT::instance ()->pace_data (ACE_TRY_ENV); + ACE_TRY_CHECK; orb->destroy (ACE_TRY_ENV); ACE_CHECK_RETURN (-1); - - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"client::run failed\n"),1); - } ACE_CATCHANY @@ -408,10 +437,15 @@ main (int argc, #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Singleton <Client,ACE_Null_Mutex>; -template class TAO_AV_Endpoint_Reactive_Strategy_A<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; -template class TAO_AV_Endpoint_Reactive_Strategy<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; +template class +TAO_AV_Endpoint_Reactive_Strategy_A<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; +template class +TAO_AV_Endpoint_Reactive_Strategy<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Singleton <Client,ACE_Null_Mutex> -#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> -#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> +#pragma instantiate +TAO_AV_Endpoint_Reactive_Strategy_A<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> +#pragma instantiate +TAO_AV_Endpoint_Reactive_Strategy<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.h b/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.h index f945b3515ef..24f43d7664c 100644 --- a/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.h +++ b/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.h @@ -1,6 +1,23 @@ /* -*- C++ -*- */ // $Id$ +// ============================================================================ +// +// = LIBRARY +// TAO/orbsvcs/tests/AVStreams/Pluggable +// +// = FILENAME +// ftp.h +// +// = DESCRIPTION +// Ftp client to send data +// +// = AUTHOR +// Yamuna Krishnamurthy <yamuna@cs.wustl.edu> +// +// ============================================================================ + + #ifndef TAO_AV_FTP_H #define TAO_AV_FTP_H @@ -8,87 +25,148 @@ #include "orbsvcs/Naming/Naming_Utils.h" #include "orbsvcs/AV/AVStreams_i.h" #include "orbsvcs/AV/Endpoint_Strategy.h" -#include "orbsvcs/AV/FlowSpec_Entry.h" -#include "orbsvcs/AV/sfp.h" -#include "orbsvcs/AV/MCast.h" -#include "ace/High_Res_Timer.h" +#include "orbsvcs/AV/Policy.h" +#include "orbsvcs/AV/Protocol_Factory.h" -class FTP_Client_Callback : public TAO_AV_Callback + +class FTP_Client_Callback : public TAO_AV_Callback { + // = TITLE + // Defines the client applcation callback. + // + // = DESCRIPTION + // This class can override the methods of + // the TAO_AV_Callback to do application + // specific processing. public: FTP_Client_Callback (void); - virtual int handle_timeout (void *arg); - virtual int handle_destroy (void); - virtual void get_timeout (ACE_Time_Value *&tv, - void *&arg); - void set_protocol_object (TAO_AV_Protocol_Object *protocol_object) {this->protocol_object_ = protocol_object;} -protected: - int count_; - TAO_AV_Protocol_Object *protocol_object_; + //Constructor }; + class FTP_Client_StreamEndPoint : public TAO_Client_StreamEndPoint { + // = TITLE + // Defines the client stream endpoint. + // + // = DESCRIPTION + // This class overrides the methods of TAO_ClientStreamendpoint + // so the application can perform its processing during post and pre + // connection set up. + public: FTP_Client_StreamEndPoint (void); + //Contructor virtual int get_callback (const char *flowname, TAO_AV_Callback *&callback); + // Create the application client callback and return its handle to the + // AVSTreams for further application callbacks virtual int set_protocol_object (const char *flowname, TAO_AV_Protocol_Object *object); + // Set protocol object corresponding to the transport protocol chosen. + protected: - FTP_Client_Callback *callback_; + FTP_Client_Callback callback_; + // reference to the cllient application callback. }; -typedef TAO_AV_Endpoint_Reactive_Strategy_A<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> ENDPOINT_STRATEGY; - -class Client; -class Endpoint_Reactive_Strategy : public ENDPOINT_STRATEGY -{ -public: - Endpoint_Reactive_Strategy (Client *client_ptr); - // constructor . The orb manager is needed for the TAO_AV_Endpoint_Reactive_Strategy_A. - - virtual int make_stream_endpoint (FTP_Client_StreamEndPoint *& endpoint); - // hook to make our streamendpoint taking a Client pointer -private: - Client *client_; - // pointer to command handler object -}; +typedef TAO_AV_Endpoint_Reactive_Strategy_A <FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> ENDPOINT_STRATEGY; class Client { + // = TITLE + // Defines the Client Application + // + // = DESCRIPTION + // The actual client program that acts as the ftp client that streams data + // to the ftp servers that are waiting for data. public: Client (void); - int init (int argc, char **argv); - int run (void); + // Constructor + + int init (int argc, + char **argv, + CORBA::Environment&); + // Method to initialize the various data components. + + void set_protocol_object (TAO_AV_Protocol_Object *protocol_object); + // Set the protocol object corresponding to the transport protocol chosen. + + int pace_data (CORBA::Environment&); + // Method to pace and send data from a file. + FILE *file (void); - char *flowname (void); + // File handle from which data is read to be sent. + TAO_StreamCtrl* streamctrl (void); + // The stream control interface that manages the stream set up + + char *flowname (void); + // name of the flow set up. + + int frame_rate (void); + // The requested frame rate for sending each frame of data read from the file. + private: int parse_args (int argc, char **argv); - int bind_to_server (void); - Endpoint_Reactive_Strategy endpoint_strategy_; + // Method to parse the command line arguments. + + int bind_to_server (CORBA::Environment& ACE_TRY_ENV); + // Method that binds the ftp client to the server + + ENDPOINT_STRATEGY endpoint_strategy_; + // The reacfive strategy of the client. + AVStreams::MMDevice_var server_mmdevice_; + // The server MMDevice that the ftpo client connects to + TAO_MMDevice client_mmdevice_; + // The ftp client MMDevice. + TAO_StreamCtrl streamctrl_; // Video stream controller + int count_; + // Number of frames sent. + int argc_; char **argv_; + const char *filename_; + // File from which data is read. + const char *address_; + // Address of the ftp client host machine or a multicast address - Default is + // UDP multicast addess TAO_Naming_Client my_naming_client_; + // The Naming Service client. + FILE *fp_; + // File handle of the file read from. + char *protocol_; + // Selected protocol - default is UDP + char *flowname_; + int use_sfp_; - int test_; + // If set to 1 then use sfp as the flow carrier protocol. + + int frame_rate_; + + ACE_Message_Block mb; + // Message block into which data is read from a file and then sent. + + TAO_AV_Protocol_Object* protocol_object_; + // Protocol object corresponding to the transport protocol selected. + }; typedef ACE_Singleton<Client,ACE_Null_Mutex> CLIENT; #endif /* TAO_AV_FTP_H */ + diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable/server.cpp b/TAO/orbsvcs/tests/AVStreams/Pluggable/server.cpp index 52721abe859..5d58b1db246 100644 --- a/TAO/orbsvcs/tests/AVStreams/Pluggable/server.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Pluggable/server.cpp @@ -1,36 +1,22 @@ + + // $Id$ #include "server.h" #include "ace/Get_Opt.h" -#include "ace/High_Res_Timer.h" -#include "ace/Stats.h" - -ACE_hrtime_t recv_base = 0; -ACE_Throughput_Stats recv_latency; +static FILE *output_file = 0; +// File into which the received data is written. -FTP_Server_StreamEndPoint::FTP_Server_StreamEndPoint (void) -{ - ACE_DEBUG ((LM_DEBUG,"FTP_Server_StreamEndPoint::FTP_Server_StreamEndPoint")); -} +static const char *output_file_name = "output"; +// File handle of the file into which data is written. int FTP_Server_StreamEndPoint::get_callback (const char *, TAO_AV_Callback *&callback) { - ACE_DEBUG ((LM_DEBUG,"FTP_Server_StreamEndPoint::get_sfp_callback\n")); - ACE_NEW_RETURN (callback, - FTP_Server_Callback, - -1); - return 0; -} - -int -FTP_Server_Callback::handle_stop (void) -{ - ACE_DEBUG ((LM_DEBUG,"FTP_Server_Callback::stop")); - ACE_OS::fclose (FTP_SERVER::instance ()->file ()); - TAO_AV_CORE::instance ()->orb ()->shutdown (); + // Return the server application callback to the AVStreams for further upcalls, + callback = &this->callback_; return 0; } @@ -39,16 +25,25 @@ FTP_Server_Callback::receive_frame (ACE_Message_Block *frame, TAO_AV_frame_info *, const ACE_Addr &) { - ACE_DEBUG ((LM_DEBUG,"FTP_Server_Callback::receive_frame\n")); + // Upcall from the AVStreams when there is data to be received from the + // ftp client. + + ACE_DEBUG ((LM_DEBUG, + "FTP_Server_Callback::receive_frame\n")); + while (frame != 0) { - int result = ACE_OS::fwrite (frame->rd_ptr (), - frame->length (), - 1, - FTP_SERVER::instance ()->file ()); + // Write the received data to the file. + unsigned int result = ACE_OS::fwrite (frame->rd_ptr (), + frame->length (), + 1, + output_file); + + if (result == frame->length ()) + ACE_ERROR_RETURN ((LM_ERROR, + "FTP_Server_Flow_Handler::fwrite failed\n"), + -1); - if (result == 0) - ACE_ERROR_RETURN ((LM_ERROR,"FTP_Server_Flow_Handler::fwrite failed\n"),-1); frame = frame->cont (); } return 0; @@ -57,88 +52,71 @@ FTP_Server_Callback::receive_frame (ACE_Message_Block *frame, int FTP_Server_Callback::handle_destroy (void) { - ACE_DEBUG ((LM_DEBUG,"FTP_SFP_Callback::end_stream\n")); - TAO_AV_CORE::instance ()->orb ()->shutdown (0); + // Called when the ftp client requests the stream to be shutdown. + ACE_DEBUG ((LM_DEBUG, + "FTP_Server_Callback::end_stream\n")); + TAO_AV_CORE::instance ()->orb ()->shutdown (); return 0; } Server::Server (void) + : mmdevice_ (0) { - reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (), TAO_AV_CORE::instance ()->poa ()); } -int -Server::init (int argc, - char **argv) +Server::~Server (void) { - ACE_DECLARE_NEW_CORBA_ENV; - ACE_TRY - { - PortableServer::POAManager_var mgr - = TAO_AV_CORE::instance ()->poa ()->the_POAManager (); + delete this->mmdevice_; +} - mgr->activate (); +int +Server::init (int, + char **, + CORBA::Environment &ACE_TRY_ENV) +{ + int result = + this->reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (), + TAO_AV_CORE::instance ()->poa ()); + if (result != 0) + return result; + + // Register the server mmdevice object with the ORB + ACE_NEW_RETURN (this->mmdevice_, + TAO_MMDevice (&this->reactive_strategy_), + -1); - int result = this->parse_args (argc,argv); - if (result == -1) - ACE_ERROR_RETURN ((LM_ERROR,"parse args failed\n"),-1); - // Initialize the naming services + // Register the mmdevice with the naming service. + CosNaming::Name server_mmdevice_name (1); + server_mmdevice_name.length (1); + server_mmdevice_name [0].id = CORBA::string_dup ("Server_MMDevice"); - if (my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0) - ACE_ERROR_RETURN ((LM_ERROR, - " (%P|%t) Unable to initialize " - "the TAO_Naming_Client. \n"), - -1); + CORBA::Object_var mmdevice = + this->mmdevice_->_this (ACE_TRY_ENV); + ACE_CHECK_RETURN(-1); - // Register the video mmdevice object with the ORB - ACE_NEW_RETURN (this->mmdevice_, - TAO_MMDevice (&this->reactive_strategy_), + // Initialize the naming services + if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to initialize " + "the TAO_Naming_Client\n"), -1); - // Register the mmdevice with the naming service. - CosNaming::Name server_mmdevice_name (1); - server_mmdevice_name.length (1); - server_mmdevice_name [0].id = CORBA::string_dup ("Server_MMDevice"); - - // Register the video control object with the naming server. - this->my_naming_client_->rebind (server_mmdevice_name, - this->mmdevice_->_this (ACE_TRY_ENV), - ACE_TRY_ENV); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"server::init"); - return -1; - } - ACE_ENDTRY; + // Register the server object with the naming server. + this->my_naming_client_->rebind (server_mmdevice_name, + mmdevice.in (), + ACE_TRY_ENV); ACE_CHECK_RETURN (-1); - return 0; -} -int -Server::run (void) -{ - ACE_DECLARE_NEW_CORBA_ENV; - ACE_TRY - { - TAO_AV_CORE::instance ()->orb ()->run (); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"server::init"); - return -1; - } - ACE_ENDTRY; - ACE_CHECK_RETURN (-1); return 0; } int -Server::parse_args (int argc,char **argv) +parse_args (int argc, + char **argv) { - ACE_Get_Opt opts (argc,argv,"f:p:"); + ACE_Get_Opt opts (argc, + argv, + "f:"); int c; while ((c = opts ()) != -1) @@ -146,50 +124,87 @@ Server::parse_args (int argc,char **argv) switch (c) { case 'f': - this->fp_ = ACE_OS::fopen (opts.optarg,"w"); - if (this->fp_ != 0) - { - ACE_DEBUG ((LM_DEBUG,"file opened successfully\n")); - } - break; - case 'p': - this->protocol_ = ACE_OS::strdup (opts.optarg); + output_file_name = opts.optarg; break; default: - ACE_ERROR_RETURN ((LM_ERROR,"Usage: server -f filename"),-1); + ACE_ERROR_RETURN ((LM_ERROR, + "Usage: server -f filename"), + -1); } } - return 0; -} -FILE* -Server::file (void) -{ - return this->fp_; + return 0; } int main (int argc, char **argv) { - int result = 0; - CORBA::ORB_var orb = CORBA::ORB_init (argc, - argv); + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // Initialize the ORB first. + CORBA::ORB_var orb = CORBA::ORB_init (argc, + argv, + 0, + ACE_TRY_ENV); + ACE_TRY_CHECK; - CORBA::Object_var obj - = orb->resolve_initial_references ("RootPOA"); + int result = + parse_args (argc, + argv); - PortableServer::POA_var poa - = PortableServer::POA::_narrow (obj.in ()); + if (result == -1) + return -1; + + // Make sure we have a valid <output_file> + output_file = ACE_OS::fopen (output_file_name, + "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_DEBUG, + "Cannot open output file %s\n", + output_file_name), + -1); - ACE_DECLARE_NEW_CORBA_ENV; + else ACE_DEBUG ((LM_DEBUG, + "File Opened Successfull\n")); - ACE_TRY - { + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA", + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Get the POA_var object from Object_var. + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (obj.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::POAManager_var mgr + = root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + + mgr->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Initialize the AVStreams components. TAO_AV_CORE::instance ()->init (orb.in (), - poa.in (), + root_poa.in (), ACE_TRY_ENV); ACE_TRY_CHECK; + + Server server; + result = + server.init (argc, + argv, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (result != 0) + return result; + + orb->run (ACE_TRY_ENV); + ACE_TRY_CHECK; } ACE_CATCHANY { @@ -198,26 +213,20 @@ main (int argc, } ACE_ENDTRY; ACE_CHECK_RETURN (-1); - result = FTP_SERVER::instance ()->init (argc,argv); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"FTP_SERVER::init failed\n"),1); - result = FTP_SERVER::instance ()->run (); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"FTP_SERVER::run failed\n"),1); - ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . ")); - ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); - ACE_DEBUG ((LM_DEBUG, "done\n")); - - recv_latency.dump_results ("Receive", gsf); + + ACE_OS::fclose (output_file); + return 0; } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Singleton <Server,ACE_Null_Mutex>; -template class TAO_AV_Endpoint_Reactive_Strategy_B <FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; -template class TAO_AV_Endpoint_Reactive_Strategy <FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; +template class TAO_AV_Endpoint_Reactive_Strategy_B +<FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; +template class TAO_AV_Endpoint_Reactive_Strategy +<FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Singleton <Server,ACE_Null_Mutex> -#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_B <FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> -#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy <FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> +#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_B +<FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> +#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy +<FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable/server.h b/TAO/orbsvcs/tests/AVStreams/Pluggable/server.h index ddb0b123e5d..4f442c7af4e 100644 --- a/TAO/orbsvcs/tests/AVStreams/Pluggable/server.h +++ b/TAO/orbsvcs/tests/AVStreams/Pluggable/server.h @@ -1,49 +1,94 @@ /* -*- C++ -*- */ // $Id$ +// ============================================================================ +// +// = LIBRARY +// TAO/orbsvcs/tests/AVStreams/Pluggable +// +// = FILENAME +// ftp.h +// +// = DESCRIPTION +// Ftp server to receive data +// +// = AUTHOR +// Yamuna Krishnamurthy <yamuna@cs.wustl.edu> +// +// ============================================================================ -#include "ace/Get_Opt.h" -#include "tao/PortableServer/ORB_Manager.h" #include "orbsvcs/Naming/Naming_Utils.h" #include "orbsvcs/AV/AVStreams_i.h" #include "orbsvcs/AV/Endpoint_Strategy.h" -#include "orbsvcs/AV/FlowSpec_Entry.h" -#include "orbsvcs/AV/sfp.h" -#include "orbsvcs/AV/MCast.h" #include "orbsvcs/AV/Policy.h" -class FTP_Server_StreamEndPoint : public TAO_Server_StreamEndPoint + +class FTP_Server_Callback : public TAO_AV_Callback { + // = TITLE + // Defines a class for the server application callback. + // + // = DESCRIPTION + // This class overides the methods of the TAO_AV_Callback so the + // AVStreams can make upcalls to the application. + public: - FTP_Server_StreamEndPoint (void); - virtual int get_callback (const char *flowname, - TAO_AV_Callback *&callback); + + // Method that is called when there is data to be received from the ftp client. + int receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *frame_info, + const ACE_Addr &peer_address); + + // Called when the ftp client has finished reading the file and wants + // to close4 down the connection. + int handle_destroy (void); }; -class FTP_Server_Callback : public TAO_AV_Callback +class FTP_Server_StreamEndPoint : public TAO_Server_StreamEndPoint { + // = TITLE + // Defines the aplication stream endpoint + // + // = DESCRIPTION + // This is the class that overrides the tao_server_enpodint to handle + // pre and post connect processing. public: - virtual int handle_stop (void); - virtual int receive_frame (ACE_Message_Block *frame, - TAO_AV_frame_info *frame_info = 0, - const ACE_Addr &peer_address = ACE_Addr::sap_any); - virtual int handle_destroy (void); + // Create the server application callback. + int get_callback (const char *flowname, + TAO_AV_Callback *&callback); + +private: + FTP_Server_Callback callback_; + // reference to the server application callback. }; class Server { + // = TITLE + // Defines the server application class. + // + // = DESCRIPOTION + // The actual server progarm that acts as the ftp server that receives data + // sent by the ftp client. public: Server (void); + // Constructor + + ~Server (void); + // Deestructor. + int init (int argc, - char **argv); - int run (void); - FILE *file (void); + char **argv, + CORBA::Environment &); + // Initialize data components. + protected: - int parse_args (int argc,char **argv); TAO_Naming_Client my_naming_client_; - TAO_AV_Endpoint_Reactive_Strategy_B <FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> reactive_strategy_; + // The Naming Service Client. + + TAO_AV_Endpoint_Reactive_Strategy_B + <FTP_Server_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> reactive_strategy_; + // The endpoint reacxtive strategy. + TAO_MMDevice *mmdevice_; - FILE *fp_; - char *protocol_; + // The server MMDevice. }; - -typedef ACE_Singleton<Server,ACE_Null_Mutex> FTP_SERVER; |