summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Multicast/ftp.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Multicast/ftp.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Multicast/ftp.cpp365
1 files changed, 365 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Multicast/ftp.cpp b/TAO/orbsvcs/tests/AVStreams/Multicast/ftp.cpp
new file mode 100644
index 00000000000..c827562b411
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Multicast/ftp.cpp
@@ -0,0 +1,365 @@
+// $Id$
+
+#include "ftp.h"
+
+FTP_Client_Callback::FTP_Client_Callback (void)
+ :count_ (0)
+{
+}
+
+int
+FTP_Client_Callback::handle_end_stream (void)
+{
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+}
+
+FTP_Client_StreamEndPoint::FTP_Client_StreamEndPoint (void)
+{
+
+}
+
+void
+FTP_Client_Callback::get_timeout (ACE_Time_Value *&tv,
+ void *&)
+{
+ ACE_Time_Value *timeout;
+ ACE_NEW (timeout,
+ ACE_Time_Value(2));
+ tv = timeout;
+}
+
+int
+FTP_Client_Callback::handle_timeout (void *)
+{
+ ACE_Message_Block mb (BUFSIZ);
+ ACE_DEBUG ((LM_DEBUG,"FTP_Client_Callback::get_frame\n"));
+ char *buf = mb.rd_ptr ();
+ //cerr << "message block size" << mb.size () << endl;
+ int n = ACE_OS::fread(buf,1,mb.size (),CLIENT::instance ()->file ());
+ if (n < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread end of file\n"),-1);
+ }
+ if (n == 0)
+ {
+ if (feof (CLIENT::instance ()->file ()))
+ {
+ // wait for sometime for the data to be flushed to the other side.
+ this->count_++;
+ if (this->count_ == 2)
+ {
+ try
+ {
+ ACE_DEBUG ((LM_DEBUG,"handle_timeout:End of file\n"));
+ AVStreams::flowSpec stop_spec (1);
+ //ACE_DECLARE_NEW_CORBA_ENV;
+ CLIENT::instance ()->streamctrl ()->stop (stop_spec);
+// CLIENT::instance ()->streamctrl ()->destroy (stop_spec);
+ TAO_AV_CORE::instance ()->orb ()->shutdown (0);
+ return 0;
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception (
+ "FTP_Client_Callback::handle_timeout\n");
+ return -1;
+ }
+ }
+ else
+ return 0;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread error\n"),-1);
+ }
+ //cerr << "read bytes = " << n << endl;
+ mb.wr_ptr (n);
+ int result = this->protocol_object_->send_frame (&mb);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"send failed:%p","FTP_Client_Flow_Handler::send \n"),-1);
+ ACE_DEBUG ((LM_DEBUG,"handle_timeout::buffer sent succesfully\n"));
+ return 0;
+}
+
+int
+FTP_Client_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ ACE_NEW_RETURN (this->callback_,
+ FTP_Client_Callback,
+ -1);
+ callback = this->callback_;
+ return 0;
+}
+
+int
+FTP_Client_StreamEndPoint::set_protocol_object (const char *flowname,
+ TAO_AV_Protocol_Object *object)
+{
+ this->callback_->set_protocol_object (object);
+ ACE_CString flow_string (flowname);
+ return 0;
+}
+
+
+Endpoint_Reactive_Strategy::Endpoint_Reactive_Strategy (CORBA::ORB_ptr orb,
+ PortableServer::POA_ptr poa,
+ Client *client)
+ : client_ (client)
+{
+ this->init (orb, poa);
+}
+
+int
+Endpoint_Reactive_Strategy::make_stream_endpoint (FTP_Client_StreamEndPoint *&endpoint)
+{
+ ACE_DEBUG ((LM_DEBUG,"Endpoint_Reactive_Strategy::make_stream_endpoint\n"));
+ ACE_NEW_RETURN (endpoint,
+ FTP_Client_StreamEndPoint,
+ -1);
+ return 0;
+}
+
+int
+Client::parse_args (int argc,
+ char **argv)
+{
+ ACE_Get_Opt opts (argc,argv,"f:a:p:s");
+
+ this->use_sfp_ = 0;
+ int c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ this->filename_ = ACE_OS::strdup (opts.opt_arg ());
+ break;
+ case 'a':
+ this->address_ = ACE_OS::strdup (opts.opt_arg ());
+ break;
+ case 'p':
+ this->protocol_ = ACE_OS::strdup (opts.opt_arg ());
+ break;
+ case 's':
+ this->use_sfp_ = 1;
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG,"Unknown option\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+FILE *
+Client::file (void)
+{
+ return this->fp_;
+}
+
+char*
+Client::flowname (void)
+{
+ return this->flowname_;
+}
+
+TAO_StreamCtrl*
+Client::streamctrl (void)
+{
+ return &this->streamctrl_;
+}
+
+Client::Client (void)
+ :endpoint_strategy_ (TAO_AV_CORE::instance ()->orb (), TAO_AV_CORE::instance ()->poa (),this),
+ client_mmdevice_ (&endpoint_strategy_),
+ address_ (ACE_OS::strdup ("224.9.9.2:12345")),
+ fp_ (0),
+ protocol_ (ACE_OS::strdup ("UDP"))
+{
+}
+
+
+int
+Client::bind_to_server (const char *name)
+{
+
+ try
+ {
+ // Initialize the naming services
+ CosNaming::Name server_mmdevice_name (1);
+ server_mmdevice_name.length (1);
+ server_mmdevice_name [0].id = name;
+ CORBA::Object_var server_mmdevice_obj =
+ my_naming_client_->resolve (server_mmdevice_name);
+
+ this->server_mmdevice_ =
+ AVStreams::MMDevice::_narrow (server_mmdevice_obj.in ());
+
+ if (CORBA::is_nil (this->server_mmdevice_.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " could not resolve Server_Mmdevice in Naming service <%s>\n"),
+ -1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Client::bind_to_server\n");
+ return -1;
+ }
+ return 0;
+}
+
+int
+Client::init (int argc,char **argv)
+{
+
+ PortableServer::POAManager_var mgr
+ = TAO_AV_CORE::instance ()->poa ()->the_POAManager ();
+
+ mgr->activate ();
+
+ this->argc_ = argc;
+ this->argv_ = argv;
+
+ // Increase the debug_level so that we can see the output
+ this->parse_args (this->argc_, this->argv_);
+
+ if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize "
+ "the TAO_Naming_Client. \n"),
+ -1);
+
+ this->fp_ = ACE_OS::fopen (this->filename_,"r");
+ if (this->fp_ != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"file opened successfully\n"));
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "ERROR: file %s could not be opened\n",
+ this->filename_), -1);
+ }
+
+ return 0;
+}
+
+int
+Client::run (void)
+{
+ try
+ {
+ char flow_protocol_str [BUFSIZ];
+ if (this->use_sfp_)
+ ACE_OS::strcpy (flow_protocol_str,"sfp:1.0");
+ else
+ ACE_OS::strcpy (flow_protocol_str,"");
+ AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
+ AVStreams::flowSpec flow_spec (1);
+
+ ACE_INET_Addr addr (this->address_);
+ ACE_NEW_RETURN (this->flowname_,
+ char [BUFSIZ],
+ 0);
+ ACE_OS::sprintf (this->flowname_,
+ "Data_%s",
+ this->protocol_);
+ TAO_Forward_FlowSpec_Entry entry (this->flowname_,
+ "IN",
+ "USER_DEFINED",
+ flow_protocol_str,
+ this->protocol_,
+ &addr);
+ flow_spec.length (1);
+ flow_spec [0] = entry.entry_to_string ();
+ ACE_DEBUG ((LM_DEBUG, "(%N,%l) Flowspec: %s\n", entry.entry_to_string() ));
+
+ AVStreams::MMDevice_var client_mmdevice
+ = this->client_mmdevice_._this ();
+
+ CORBA::Boolean result =
+ this->streamctrl_.bind_devs (client_mmdevice.in (),
+ AVStreams::MMDevice::_nil (),
+ the_qos.inout (),
+ flow_spec);
+ if (this->bind_to_server ("Server_MMDevice1") == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Error binding to the naming service\n"),
+ -1);
+ result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
+ this->server_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec);
+ if (this->bind_to_server ("Server_MMDevice2") == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Error binding to the naming service\n"),
+ -1);
+ result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
+ this->server_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec);
+
+ if (result == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"streamctrl::bind_devs failed\n"),-1);
+ AVStreams::flowSpec start_spec (1);
+ start_spec.length (1);
+ start_spec [0] = CORBA::string_dup (this->flowname_);
+ this->streamctrl_.start (start_spec);
+ // Schedule a timer for the for the flow handler.
+ //TAO_AV_CORE::instance ()->run ();
+ ACE_Time_Value tv (10000,0);
+
+ TAO_AV_CORE::instance ()->orb ()->run (tv);
+
+ ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "Exited the TAO_AV_Core::run\n"));
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Client::run");
+ return -1;
+ }
+ return 0;
+}
+
+int
+main (int argc,
+ char **argv)
+{
+ try
+ {
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv);
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA");
+
+ PortableServer::POA_var poa
+ = PortableServer::POA::_narrow (obj.in ());
+
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ poa.in ());
+
+ int result = 0;
+ result = CLIENT::instance ()->init (argc,argv);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"client::init failed\n"),1);
+ result = CLIENT::instance ()->run ();
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"client::run failed\n"),1);
+ }
+ catch (const CORBA::Exception& ex)
+
+ {
+ ex._tao_print_exception ("Client Failed\n");
+ return -1;
+ }
+
+ CLIENT::close (); // Explicitly finalize the Unmanaged_Singleton.
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
+template ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex>::singleton_;
+#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */