summaryrefslogtreecommitdiff
path: root/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp')
-rw-r--r--ACE/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp341
1 files changed, 341 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp b/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
new file mode 100644
index 00000000000..5d4fd10e67f
--- /dev/null
+++ b/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
@@ -0,0 +1,341 @@
+// $Id$
+
+#include "distributer.h"
+#include "ace/Get_Opt.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+
+// An Unmanaged_Singleton is used to avoid static object destruction
+// order related problems since the underlying singleton object
+// contains references to static TypeCodes.
+
+typedef ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER;
+
+int
+Distributer_Sender_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the sender application callback to AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+int
+Distributer_Sender_StreamEndPoint::set_protocol_object (const char *,
+ TAO_AV_Protocol_Object *object)
+{
+ // Set the protocol object corresponding to the transport protocol
+ // selected.
+ DISTRIBUTER::instance ()->sender_protocol_object (object);
+
+ return 0;
+}
+
+int
+Distributer_Receiver_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the receiver application callback to AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+Distributer_Receiver_Callback::Distributer_Receiver_Callback (void)
+ : frame_count_ (1)
+{
+}
+
+int
+Distributer_Receiver_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ // Upcall from the AVStreams when there is data to be received from
+ // the sender.
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Callback::receive_frame for frame %d\n",
+ this->frame_count_++));
+
+ // Get the protocol object corresponding to the receiver stream and
+ // send the data received from sender to the receiver.
+ int result =
+ DISTRIBUTER::instance ()->sender_protocol_object ()->send_frame (frame);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Distributer_Callback::receive_frame send failed\n"),
+ -1);
+
+ return 0;
+}
+
+int
+Distributer_Receiver_Callback::handle_destroy (void)
+{
+ // Called when the sender requests the stream to be shutdown.
+ try
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Callback::end_stream\n"));
+
+ // Destroy the receiver stream
+ AVStreams::flowSpec stop_spec;
+ DISTRIBUTER::instance ()->receiver_streamctrl ()->destroy (stop_spec);
+
+ // We can close down now.
+ DISTRIBUTER::instance ()->done (1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception (
+ "Distributer_Callback::handle_destroy Failed\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+Distributer::sender_protocol_object (TAO_AV_Protocol_Object *object)
+{
+ // Set the corresponding protocol objects for the different streams created.
+ this->sender_protocol_object_ = object;
+
+ return 0;
+}
+
+TAO_AV_Protocol_Object *
+Distributer::sender_protocol_object (void)
+{
+ return this->sender_protocol_object_;
+}
+
+Distributer::Distributer (void)
+ : distributer_receiver_mmdevice_ (0),
+ distributer_sender_mmdevice_ (0),
+ sender_protocol_object_ (0),
+ receiver_streamctrl_ (0),
+ protocol_ ("UDP"),
+ done_ (0)
+{
+}
+
+Distributer::~Distributer (void)
+{
+}
+
+void
+Distributer::bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice,
+ const ACE_CString &mmdevice_name)
+{
+ CosNaming::Name name (1);
+ name.length (1);
+ name [0].id =
+ CORBA::string_dup (mmdevice_name.c_str ());
+
+ // Resolve the mmdevice object reference from the Naming Service
+ CORBA::Object_var mmdevice_obj =
+ this->naming_client_->resolve (name);
+
+ mmdevice =
+ AVStreams::MMDevice::_narrow (mmdevice_obj.in ());
+}
+
+int
+Distributer::init (int /*argc*/,
+ ACE_TCHAR *[] /*argv*/
+ )
+{
+ // Initialize the naming services
+ int result =
+ this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
+ if (result != 0)
+ return result;
+
+ // Initialize the endpoint strategy with the orb and poa.
+ result =
+ this->sender_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ result =
+ this->receiver_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ // Bind to the receiver mmdevice
+ ACE_CString mmdevice_name ("Receiver");
+ this->bind_to_mmdevice (this->receiver_mmdevice_.out (),
+ mmdevice_name);
+
+ // Bind to the sender mmdevice
+ mmdevice_name = "Sender";
+ this->bind_to_mmdevice (this->sender_mmdevice_.out (),
+ mmdevice_name);
+
+ // Initialize the QoS
+ AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
+
+ // Create the forward flow specification to describe the flow.
+ TAO_Forward_FlowSpec_Entry receiver_entry ("Data_Receiver",
+ "IN",
+ "USER_DEFINED",
+ "",
+ this->protocol_.c_str (),
+ 0);
+
+ // Set the flow specification for the stream between receiver and distributer
+ AVStreams::flowSpec flow_spec (1);
+ flow_spec.length (1);
+ flow_spec [0] =
+ CORBA::string_dup (receiver_entry.entry_to_string ());
+
+ ACE_NEW_RETURN (this->distributer_sender_mmdevice_,
+ TAO_MMDevice (&this->sender_endpoint_strategy_),
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_sender_mmdevice =
+ this->distributer_sender_mmdevice_;
+
+ AVStreams::MMDevice_var distributer_sender_mmdevice =
+ this->distributer_sender_mmdevice_->_this ();
+
+ ACE_NEW_RETURN (this->distributer_receiver_mmdevice_,
+ TAO_MMDevice (&this->receiver_endpoint_strategy_),
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_receiver_mmdevice =
+ this->distributer_receiver_mmdevice_;
+
+ AVStreams::MMDevice_var distributer_receiver_mmdevice =
+ this->distributer_receiver_mmdevice_->_this ();
+
+ ACE_NEW_RETURN (this->receiver_streamctrl_,
+ TAO_StreamCtrl,
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_receiver_streamctrl =
+ this->receiver_streamctrl_;
+
+ // Bind/Connect the distributer and receiver MMDevices.
+ result =
+ this->receiver_streamctrl_->bind_devs (distributer_sender_mmdevice.in (),
+ this->receiver_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec);
+
+ // Create the forward flow specification to describe the flow.
+ TAO_Forward_FlowSpec_Entry sender_entry ("Data_Sender",
+ "OUT",
+ "USER_DEFINED",
+ "", // Flowname
+ this->protocol_.c_str (),
+ 0);
+
+ TAO_StreamCtrl* sender_streamctrl = 0;
+ // Video stream controller for the stream between sender and distributer
+
+ ACE_NEW_RETURN (sender_streamctrl,
+ TAO_StreamCtrl,
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_sender_streamctrl =
+ sender_streamctrl;
+
+ // Set the flow specification for the stream between sender and distributer
+ flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ());
+
+ // Bind/Connect the sender and distributer MMDevices.
+ CORBA::Boolean res =
+ sender_streamctrl->bind_devs (sender_mmdevice_.in (),
+ distributer_receiver_mmdevice.in (),
+ the_qos.inout (),
+ flow_spec);
+
+ if (res == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"Streamctrl::bind_devs failed\n"),-1);
+
+ return 0;
+}
+
+TAO_StreamCtrl *
+Distributer::receiver_streamctrl (void)
+{
+ return this->receiver_streamctrl_;
+}
+
+int
+Distributer::done (void) const
+{
+ return this->done_;
+}
+
+void
+Distributer::done (int done)
+{
+ this->done_ = done;
+}
+
+int
+ACE_TMAIN (int argc,
+ ACE_TCHAR *argv[])
+{
+ try
+ {
+ // Initialize the ORB first.
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA");
+
+ // Get the POA_var object from Object_var.
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (obj.in ());
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager ();
+
+ mgr->activate ();
+
+ // Initialize the AVStreams components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in ());
+
+ // Initialize the Distributer
+ int result =
+ DISTRIBUTER::instance ()->init (argc,
+ argv);
+
+ if (result != 0)
+ return result;
+
+ while (!DISTRIBUTER::instance ()->done ())
+ {
+ orb->perform_work ();
+ }
+
+ // Hack for now....
+ ACE_OS::sleep (1);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("main");
+ return -1;
+ }
+
+ DISTRIBUTER::close (); // Explicitly finalize the Unmanaged_Singleton.
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
+template ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex>::singleton_;
+#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */