summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp456
1 files changed, 456 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp b/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp
new file mode 100644
index 00000000000..7cca1701be9
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable/ftp.cpp
@@ -0,0 +1,456 @@
+// $Id$
+
+#include "ftp.h"
+#include "tao/debug.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+
+ACE_High_Res_Timer last_frame_sent_time;
+// The time taken for sending a frmae and preparing for the next frame
+
+ACE_Time_Value inter_frame_time;
+// The time that should lapse between two consecutive frames sent.
+
+FTP_Client_Callback::FTP_Client_Callback (void)
+{
+}
+
+FTP_Client_StreamEndPoint::FTP_Client_StreamEndPoint (void)
+{
+}
+
+int
+FTP_Client_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the clienmt application callback and return to the AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+int
+FTP_Client_StreamEndPoint::set_protocol_object (const char *,
+ TAO_AV_Protocol_Object *object)
+{
+ // Set the client protocol object corresponding to the transport protocol selected.
+ CLIENT::instance ()->set_protocol_object (object);
+ return 0;
+}
+
+Client::Client (void)
+ :client_mmdevice_ (&endpoint_strategy_),
+ count_ (0),
+ address_ (0),
+ peer_addr_str_ (0),
+ fp_ (0),
+ protocol_ (ACE_OS::strdup ("UDP")),
+ frame_rate_ (30)
+{
+ this->mb.size (BUFSIZ);
+}
+
+void
+Client::set_protocol_object (TAO_AV_Protocol_Object *object)
+{
+ // Set the client protocol object corresponding to the transport protocol selected.
+ this->protocol_object_ = object;
+}
+
+int
+Client::parse_args (int argc,
+ char **argv)
+{
+ // Parse command line arguments
+ ACE_Get_Opt opts (argc,argv,"f:l:a:p:r:sd");
+
+ this->use_sfp_ = 0;
+
+ int c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ this->filename_ = ACE_OS::strdup (opts.opt_arg ());
+ break;
+ case 'l':
+ this->address_ = ACE_OS::strdup (opts.opt_arg ());
+ break;
+ case 'a':
+ this->peer_addr_str_ = ACE_OS::strdup (opts.opt_arg ());
+ break;
+ case 'p':
+ this->protocol_ = ACE_OS::strdup (opts.opt_arg ());
+ break;
+ case 'r':
+ this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
+ break;
+ case 's':
+ this->use_sfp_ = 1;
+ break;
+ case 'd':
+ TAO_debug_level++;
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+FILE *
+Client::file (void)
+{
+ return this->fp_;
+}
+
+char*
+Client::flowname (void)
+{
+ return this->flowname_;
+}
+
+TAO_StreamCtrl*
+Client::streamctrl (void)
+{
+ return &this->streamctrl_;
+}
+
+
+int
+Client::frame_rate (void)
+{
+ return this->frame_rate_;
+}
+
+
+// Method to get the object reference of the server
+int
+Client::bind_to_server (void)
+{
+ // Initialize the naming services
+ if (my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize "
+ "the TAO_Naming_Client. \n"),
+ -1);
+
+ CosNaming::Name server_mmdevice_name (1);
+ server_mmdevice_name.length (1);
+ server_mmdevice_name [0].id = CORBA::string_dup ("Server_MMDevice");
+
+ // Resolve the server object reference from the Naming Service
+ CORBA::Object_var server_mmdevice_obj =
+ my_naming_client_->resolve (server_mmdevice_name);
+
+ this->server_mmdevice_ =
+ AVStreams::MMDevice::_narrow (server_mmdevice_obj.in ());
+
+ if (CORBA::is_nil (this->server_mmdevice_.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Could not resolve Server_MMdevice in Naming service <%s>\n"),
+ -1);
+
+ return 0;
+}
+
+int
+Client::init (int argc,
+ char **argv)
+{
+ this->argc_ = argc;
+ this->argv_ = argv;
+
+ CORBA::String_var ior;
+
+ // Initialize the endpoint strategy with the orb and poa.
+ this->endpoint_strategy_.init(TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+
+ // Parse the command line arguments
+ int result = this->parse_args (argc,
+ argv);
+
+ if (result != 0)
+ return result;
+
+ // Open file to read.
+ this->fp_ = ACE_OS::fopen (this->filename_,
+ "r");
+ if (this->fp_ == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Cannot open input file %s\n",
+ this->filename_),
+ -1);
+
+ result
+ = this->bind_to_server ();
+
+ // Resolve the object reference of the server from the Naming Service.
+ if (result != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Error binding to the naming service\n"),
+ -1);
+
+ // Create the Flow protocol name
+ char flow_protocol_str [BUFSIZ];
+ if (this->use_sfp_)
+ ACE_OS::strcpy (flow_protocol_str,
+ "sfp:1.0");
+ else
+ ACE_OS::strcpy (flow_protocol_str,
+ "");
+
+ // Initialize the QoS
+ AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
+
+ // Set the address of the ftp client.
+ ACE_INET_Addr* addr;
+ if (this->address_ != 0)
+ ACE_NEW_RETURN (addr,
+ ACE_INET_Addr (this->address_),
+ -1);
+ else
+ {
+ char buf [BUFSIZ];
+ ACE_OS::hostname (buf,
+ BUFSIZ);
+ ACE_NEW_RETURN (addr,
+ ACE_INET_Addr ("5000",
+ buf),
+ -1);
+ }
+
+ // Initialize the flowname
+ ACE_NEW_RETURN (this->flowname_,
+ char [BUFSIZ],
+ 0);
+
+ ACE_OS::sprintf (this->flowname_,
+ "Data_%s",
+ this->protocol_);
+
+ // Create the forward flow specification to describe the flow.
+ TAO_Forward_FlowSpec_Entry entry (this->flowname_,
+ "IN",
+ "USER_DEFINED",
+ flow_protocol_str,
+ this->protocol_,
+ addr);
+
+ ACE_INET_Addr* peer_addr;
+ if (this->peer_addr_str_ != 0)
+ ACE_NEW_RETURN (peer_addr,
+ ACE_INET_Addr (this->peer_addr_str_),
+ -1);
+ else
+ {
+ char buf [BUFSIZ];
+ ACE_OS::hostname (buf,
+ BUFSIZ);
+ ACE_NEW_RETURN (peer_addr,
+ ACE_INET_Addr ("5050",
+ buf),
+ -1);
+ }
+
+ entry.set_peer_addr (peer_addr);
+
+ AVStreams::flowSpec flow_spec (1);
+ flow_spec.length (1);
+ flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
+
+ AVStreams::MMDevice_var client_mmdevice =
+ this->client_mmdevice_._this ();
+
+ // Bind/Connect the client and server MMDevices.
+ CORBA::Boolean bind_result =
+ this->streamctrl_.bind_devs (client_mmdevice.in (),
+ this->server_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec);
+
+ if (bind_result == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"streamctrl::bind_devs failed\n"),-1);
+
+ return 0;
+}
+
+// Method to send data at the specified rate
+int
+Client::pace_data (void)
+{
+
+ // Rate at which frames of data need to be sent.
+ this->frame_rate_ = CLIENT::instance ()->frame_rate ();
+
+ // Time within which a frame should be sent.
+ double frame_time = 1/ (double) this->frame_rate_;
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Frame Time ONE = %f\n Frame Rate = %d\n",
+ frame_time,
+ this->frame_rate_));
+
+ // The time between two consecutive frames.
+ inter_frame_time.set (frame_time);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Inter Frame Time = %d\n",
+ inter_frame_time.msec ()));
+
+ try
+ {
+
+ // Continue to send data till the file is read to the end.
+ while (1)
+ {
+ // Count the frames sent.
+ count_++;
+
+ // Read from the file into a message block.
+ size_t n = ACE_OS::fread(this->mb.rd_ptr (),
+ 1,
+ this->mb.size (),
+ CLIENT::instance ()->file ());
+
+ if (n == 0)
+ {
+ if (feof (CLIENT::instance ()->file ()))
+ {
+ // At end of file break the loop and end the client.
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
+ break;
+ }
+
+ }
+
+ this->mb.wr_ptr (n);
+
+ if ( this->count_ > 1)
+ {
+ // Greater than the first frame.
+
+ // Stop the timer that was started just before the previous frame was sent.
+ last_frame_sent_time.stop ();
+
+ // Get the time elapsed after sending the previous frame.
+ ACE_Time_Value tv;
+ last_frame_sent_time.elapsed_time (tv);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Elapsed Time = %d\n",
+ tv.msec ()));
+
+ // Check to see if the inter frame time has elapsed.
+ if (tv < inter_frame_time)
+ {
+ // Inter frame time has not elapsed.
+
+ // Claculate the time to wait before the next frame needs to be sent.
+ ACE_Time_Value wait_time (inter_frame_time - tv);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Wait Time = %d\n",
+ wait_time.msec ()));
+
+ // run the orb for the wait time so the client can continue other orb requests.
+ TAO_AV_CORE::instance ()->orb ()->run (wait_time);
+ }
+ }
+
+ // Start timer before sending the frame.
+ last_frame_sent_time.start ();
+
+ // Send frame.
+ int result = this->protocol_object_->send_frame (&this->mb);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send failed:%p","FTP_Client_Flow_Handler::send\n"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,"Client::pace_data buffer sent succesfully\n"));
+
+ // Reset the mb.
+ this->mb.reset ();
+
+ } // end while
+
+ // Since the file is read stop the stream.
+ AVStreams::flowSpec stop_spec (1);
+ CLIENT::instance ()->streamctrl ()->destroy (stop_spec);
+
+ // Shut the orb down.
+ TAO_AV_CORE::instance ()->orb ()->shutdown (0);
+
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Client::pace_data Failed");
+ return -1;
+ }
+ return 0;
+}
+
+int
+main (int argc,
+ char **argv)
+{
+ try
+ {
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv,
+ 0);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA");
+
+
+ //Get the POA_var object from Object_var
+ PortableServer::POA_var root_poa
+ = PortableServer::POA::_narrow (obj.in ());
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager ();
+
+ mgr->activate ();
+
+ // Initialize the AV STream components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in ());
+
+ // INitialize the Client.
+ int result = 0;
+ result = CLIENT::instance ()->init (argc,
+ argv);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "client::init failed\n"),1);
+
+ // Start sending data.
+ result = CLIENT::instance ()->pace_data ();
+
+ orb->destroy ();
+ }
+ catch (const CORBA::Exception& ex)
+
+ {
+ ex._tao_print_exception ("Client Failed\n");
+ return -1;
+ }
+
+ CLIENT::close ();
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
+template ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex>::singleton_;
+#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */