summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Multicast_Full_Profile/ftp.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Multicast_Full_Profile/ftp.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Multicast_Full_Profile/ftp.cpp527
1 files changed, 527 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Multicast_Full_Profile/ftp.cpp b/TAO/orbsvcs/tests/AVStreams/Multicast_Full_Profile/ftp.cpp
new file mode 100644
index 00000000000..63a5a315342
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Multicast_Full_Profile/ftp.cpp
@@ -0,0 +1,527 @@
+// $Id$
+
+#include "ftp.h"
+
+// FTP_Client_Callback::FTP_Client_Callback (FTP_Client_Flow_Handler *handler)
+// :handler_ (handler)
+// {
+// }
+
+// int
+// FTP_Client_Callback::handle_start (void)
+// {
+// ACE_DEBUG ((LM_DEBUG,"FTP_Client_Callback::handle_start"));
+// return this->handler_->start ();
+// }
+
+// int
+// FTP_Client_Callback::handle_stop (void)
+// {
+// ACE_DEBUG ((LM_DEBUG,"FTP_Client_Callback::handle_stop"));
+// return this->handler_->stop ();
+// }
+
+
+// FTP_Client_Flow_Handler::FTP_Client_Flow_Handler (TAO_ORB_Manager *orb_manager,
+// ACE_Time_Value &timeout)
+// :TAO_FlowProducer ("Data",CLIENT::instance ()->protocols (),CLIENT::instance ()->format ()),
+// orb_manager_ (orb_manager),
+// count_ (0),
+// timeout_ (timeout)
+// {
+// }
+
+// int
+// FTP_Client_Flow_Handler::get_callback (const char *flowname,
+// TAO_AV_Callback *&callback)
+// {
+// ACE_DEBUG ((LM_DEBUG,"FTP_Client_Flow_Handler::get_callback\n"));
+// ACE_NEW_RETURN (callback,
+// FTP_Client_Callback (this),
+// -1);
+// return 0;
+// }
+
+// int
+// FTP_Client_Flow_Handler::start (void)
+// {
+// ACE_DEBUG ((LM_DEBUG,"FTP_Client_Flow_Handler::start"));
+// ACE_Time_Value delta = ACE_Time_Value::zero;
+// this->timer_id_ =
+// TAO_AV_CORE::instance ()->reactor ()->schedule_timer (this,
+// 0,
+// delta,
+// this->timeout_);
+// return 0;
+// }
+
+// int
+// FTP_Client_Flow_Handler::stop (void)
+// {
+// ACE_DEBUG ((LM_DEBUG,"FTP_Client_Flow_Handler::stop"));
+// int result = TAO_AV_CORE::instance ()->reactor ()->cancel_timer (this->timer_id_);
+// if (result < 0)
+// ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::stop cancel timer failed\n"),-1);
+// return 0;
+// }
+
+// int
+// FTP_Client_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
+// const void *arg)
+// {
+// ACE_DEBUG ((LM_DEBUG,"FTP_Client_StreamEndPoint::handle_timeout"));
+// ACE_Message_Block mb (BUFSIZ);
+// char *buf = mb.rd_ptr ();
+// cerr << "message block size" << mb.size () << endl;
+// int n = ACE_OS::fread(buf,1,mb.size (),CLIENT::instance ()->file ());
+// if (n < 0)
+// {
+// TAO_AV_CORE::instance ()->reactor ()->cancel_timer (this->timer_id_);
+// ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread end of file\n"),-1);
+// }
+// if (n == 0)
+// {
+// if (::feof (CLIENT::instance ()->file ()))
+// {
+// // wait for sometime for the data to be flushed to the other side.
+// this->count_++;
+// if (this->count_ == 2)
+// {
+// ACE_DECLARE_NEW_CORBA_ENV;
+// ACE_DEBUG ((LM_DEBUG,"handle_timeout:End of file\n"));
+// AVStreams::flowSpec stop_spec (1);
+// // stop_spec.length (1);
+// // stop_spec [0] = CORBA::string_dup (CLIENT::instance ()->flowname ());
+// CLIENT::instance ()->streamctrl ()->stop (stop_spec,ACE_TRY_ENV);
+// ACE_CHECK_RETURN (-1);
+// return 0;
+// }
+// else
+// return 0;
+// }
+// else
+// ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread error\n"),-1);
+// }
+// cerr << "read bytes = " << n << endl;
+// mb.wr_ptr (n);
+// int result = this->protocol_object_->send_frame (&mb);
+// if (result < 0)
+// ACE_ERROR_RETURN ((LM_ERROR,"send failed:%p","FTP_Client_Flow_Handler::send \n"),-1);
+// ACE_DEBUG ((LM_DEBUG,"handle_timeout::buffer sent succesfully\n"));
+// }
+
+//------------------------------------------------------------
+// FTP_Client_FDev
+//------------------------------------------------------------
+
+// FTP_Client_FDev::FTP_Client_FDev (TAO_ORB_Manager *orb_manager)
+// :TAO_FDev (CORBA::string_dup ("Data")),
+// orb_manager_ (orb_manager)
+// {
+// }
+
+// AVStreams::FlowProducer_ptr
+// FTP_Client_FDev::make_producer (AVStreams::FlowConnection_ptr the_requester,
+// AVStreams::QoS & the_qos,
+// CORBA::Boolean_out met_qos,
+// char *& named_fdev,
+// CORBA::Environment &ACE_TRY_ENV)
+// {
+// ACE_DEBUG ((LM_DEBUG,"FTP_Client_FDev::make_producer\n"));
+// FTP_Client_Flow_Handler *handler;
+// ACE_Time_Value timeout (2);
+// ACE_NEW_RETURN (handler,
+// FTP_Client_Flow_Handler (this->orb_manager_,
+// timeout),
+// 0);
+// AVStreams::FlowProducer_ptr producer = handler->_this (ACE_TRY_ENV);
+// ACE_CHECK_RETURN (0);
+// return producer;
+// }
+
+FTP_Client_Callback::FTP_Client_Callback (void)
+ // :handler_ (handler),
+ :count_ (0)
+{
+}
+
+int
+FTP_Client_Callback::handle_end_stream (void)
+{
+ TAO_AV_CORE::instance ()->stop_run ();
+ return 0;
+}
+
+void
+FTP_Client_Callback::get_timeout (ACE_Time_Value *&tv,
+ void *&arg)
+{
+ ACE_Time_Value *timeout;
+ ACE_NEW (timeout,
+ ACE_Time_Value(2));
+ tv = timeout;
+}
+
+int
+FTP_Client_Callback::handle_timeout (void *arg)
+{
+ ACE_Message_Block mb (BUFSIZ);
+ ACE_DEBUG ((LM_DEBUG,"FTP_Client_Callback::get_frame"));
+ char *buf = mb.rd_ptr ();
+ cerr << "message block size" << mb.size () << endl;
+ int n = ACE_OS::fread(buf,1,mb.size (),CLIENT::instance ()->file ());
+ if (n < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread end of file\n"),-1);
+ }
+ if (n == 0)
+ {
+ if (feof (CLIENT::instance ()->file ()))
+ {
+ // wait for sometime for the data to be flushed to the other side.
+ this->count_++;
+ if (this->count_ == 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,"handle_timeout:End of file\n"));
+ AVStreams::flowSpec stop_spec (1);
+ ACE_DECLARE_NEW_CORBA_ENV;
+ CLIENT::instance ()->streamctrl ()->stop (stop_spec,ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ CLIENT::instance ()->streamctrl ()->destroy (stop_spec,ACE_TRY_ENV);
+ TAO_AV_CORE::instance ()->stop_run ();
+ }
+ else
+ return 0;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread error\n"),-1);
+ }
+ cerr << "read bytes = " << n << endl;
+ mb.wr_ptr (n);
+ int result = this->protocol_object_->send_frame (&mb);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"send failed:%p","FTP_Client_Flow_Handler::send \n"),-1);
+ ACE_DEBUG ((LM_DEBUG,"handle_timeout::buffer sent succesfully\n"));
+ return 0;
+}
+
+FTP_Client_Producer::FTP_Client_Producer (void)
+ :TAO_FlowProducer ("Data",CLIENT::instance ()->protocols (),CLIENT::instance ()->format ())
+{
+}
+
+int
+FTP_Client_Producer::set_protocol_object (const char *flowname,
+ TAO_AV_Protocol_Object *object)
+{
+ this->callback_->set_protocol_object (object);
+ return 0;
+}
+
+int
+FTP_Client_Producer::get_callback (const char *flowname,
+ TAO_AV_Callback *&callback)
+{
+ ACE_NEW_RETURN (this->callback_,
+ FTP_Client_Callback,
+ -1);
+ callback = this->callback_;
+}
+
+Client::parse_args (int argc,
+ char **argv)
+{
+ ACE_Get_Opt opts (argc,argv,"f:a:p:sd");
+
+ this->use_sfp_ = 0;
+ char c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ this->filename_ = ACE_OS::strdup (opts.optarg);
+ break;
+ case 'a':
+ this->address_ = ACE_OS::strdup (opts.optarg);
+ break;
+ case 'p':
+ this->protocol_ = ACE_OS::strdup (opts.optarg);
+ break;
+ case 's':
+ this->use_sfp_ = 1;
+ break;
+ case 'd':
+ TAO_debug_level++;
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG,"Unknown option\n"));
+ return -1;
+ break;
+ }
+ }
+ return 0;
+}
+
+FILE *
+Client::file (void)
+{
+ return this->fp_;
+}
+
+char*
+Client::flowname (void)
+{
+ return this->flowname_;
+}
+
+AVStreams::protocolSpec
+Client::protocols (void)
+{
+ AVStreams::protocolSpec protocols (1);
+ protocols.length (1);
+ char buf [BUFSIZ];
+ ACE_OS::sprintf (buf,"%s=%s",this->protocol_,this->address_);
+ protocols [0] = CORBA::string_dup (buf);
+ return protocols;
+}
+
+const char *
+Client::format (void)
+{
+ return "UNS:ftp";
+}
+
+const char *
+Client::address (void)
+
+{
+ return this->address_;
+}
+
+TAO_StreamCtrl*
+Client::streamctrl (void)
+{
+ return &this->streamctrl_;
+}
+
+Client::Client (void)
+ :orb_manager_ (TAO_AV_CORE::instance ()->orb_manager ()),
+ endpoint_strategy_ (orb_manager_),
+ client_mmdevice_ (&endpoint_strategy_),
+ fdev_ (0),
+ fp_ (0),
+ protocol_ (ACE_OS::strdup ("UDP"))
+{
+}
+
+
+int
+Client::bind_to_server (const char *name)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+
+ ACE_TRY
+ {
+ // Initialize the naming services
+ if (my_naming_client_.init (this->orb_manager_->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize "
+ "the TAO_Naming_Client. \n"),
+ -1);
+
+ CosNaming::Name server_mmdevice_name (1);
+ server_mmdevice_name.length (1);
+ server_mmdevice_name [0].id = name;
+ CORBA::Object_var server_mmdevice_obj =
+ my_naming_client_->resolve (server_mmdevice_name,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ this->server_mmdevice_ =
+ AVStreams::MMDevice::_narrow (server_mmdevice_obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (this->server_mmdevice_.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " could not resolve Server_Mmdevice in Naming service <%s>\n"),
+ -1);
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Command_Handler::resolve_reference");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+ return 0;
+}
+
+int
+Client::init (int argc,char **argv)
+{
+ this->argc_ = argc;
+ this->argv_ = argv;
+
+ // Increase the debug_level so that we can see the output
+ // TAO_debug_level++;
+ CORBA::String_var ior;
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ TAO_AV_CORE::instance ()->init (argc,
+ argv,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->orb_manager_ = TAO_AV_CORE::instance ()->orb_manager ();
+ this->orb_manager_->init_child_poa (this->argc_,
+ this->argv_,
+ "child_poa",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->parse_args (this->argc_, this->argv_);
+ // activate the client video mmdevice under the child poa.
+ ior = this->orb_manager_->activate (&this->client_mmdevice_,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ this->orb_manager_->activate_poa_manager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ ACE_NEW_RETURN (this->fdev_,
+ // FTP_Client_FDev (this->orb_manager_),
+ FTP_Client_FDev,
+ -1);
+
+ ACE_NEW_RETURN (this->flowname_,
+ char [BUFSIZ],
+ 0);
+ ACE_OS::sprintf (this->flowname_,
+ "Data");
+ this->fdev_->flowname (this->flowname ());
+ AVStreams::MMDevice_var mmdevice = this->client_mmdevice_._this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ AVStreams::FDev_var fdev = this->fdev_->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ mmdevice->add_fdev (fdev.in (),
+ ACE_TRY_ENV);
+
+ // Initialize the naming services
+ CORBA::ORB_var orb = orb_manager_->orb ();
+ if (this->my_naming_client_.init (orb.in ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize "
+ "the TAO_Naming_Client. \n"),
+ -1);
+
+ this->fp_ = ACE_OS::fopen (this->filename_,"r");
+ if (this->fp_ != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"file opened successfully\n"));
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Client::init");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+ return 0;
+}
+
+int
+Client::run (void)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ char flow_protocol_str [BUFSIZ];
+ if (this->use_sfp_)
+ ACE_OS::strcpy (flow_protocol_str,"sfp:1.0");
+ else
+ ACE_OS::strcpy (flow_protocol_str,"");
+ AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
+ AVStreams::flowSpec flow_spec (1);
+ // Bind the client and server mmdevices.
+
+ ACE_INET_Addr addr (this->address_);
+ TAO_Forward_FlowSpec_Entry entry (this->flowname_,
+ "IN",
+ "USER_DEFINED",
+ flow_protocol_str,
+ this->protocol_,
+ &addr);
+ flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
+ flow_spec.length (1);
+ CORBA::Boolean result =
+ this->streamctrl_.bind_devs (this->client_mmdevice_._this (ACE_TRY_ENV),
+ AVStreams::MMDevice::_nil (),
+ the_qos.inout (),
+ flow_spec,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ if (this->bind_to_server ("Server_MMDevice1") == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Error binding to the naming service\n"),
+ -1);
+ result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
+ this->server_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ if (this->bind_to_server ("Server_MMDevice2") == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Error binding to the naming service\n"),
+ -1);
+ result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
+ this->server_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (result == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"streamctrl::bind_devs failed\n"),-1);
+ AVStreams::flowSpec start_spec (1);
+ start_spec.length (1);
+ start_spec [0] = CORBA::string_dup (this->flowname_);
+ this->streamctrl_.start (start_spec,ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ // Schedule a timer for the for the flow handler.
+ TAO_AV_CORE::instance ()->run ();
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Client::run");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+ return 0;
+}
+
+int
+main (int argc,
+ char *argv[])
+{
+ int result = 0;
+ result = CLIENT::instance ()->init (argc,argv);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"client::init failed\n"),1);
+ result = CLIENT::instance ()->run ();
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"client::run failed\n"),1);
+
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Singleton <Client,ACE_Null_Mutex>;
+template class TAO_AV_Endpoint_Reactive_Strategy_A<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Singleton <Client,ACE_Null_Mutex>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<FTP_Client_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */