summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp495
1 files changed, 495 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp
new file mode 100644
index 00000000000..a60e82ac9bd
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp
@@ -0,0 +1,495 @@
+// $Id$
+
+#include "distributer.h"
+#include "ace/Get_Opt.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+
+int first = 1;
+
+int
+Distributer_Receiver_StreamEndPoint::set_protocol_object (const char *flow_name,
+ TAO_AV_Protocol_Object */*object*/)
+{
+ // Store the flowname of the stream that this callback belongs to.
+ ACE_CString fname = flow_name;
+ this->callback_.flowname (fname);
+
+ // Increment the stream count.
+ DISTRIBUTER::instance ()->stream_created ();
+ return 0;
+}
+
+int
+Distributer_Receiver_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the application callback and return to the AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+int
+Distributer_Sender_StreamEndPoint::set_protocol_object (const char *flow_name,
+ TAO_AV_Protocol_Object *object)
+{
+ // Set the protocol object corresponding to the transport protocol selected.
+ DISTRIBUTER::instance ()->set_protocol_object (flow_name, object);
+
+ // Store the flowname of the stream that this callback belongs to.
+ ACE_CString fname = flow_name;
+ this->callback_.flowname (fname);
+
+ // Store the flowname and the corresponding stream endpoint for the receiver
+ DISTRIBUTER::instance ()->set_endpoint (flow_name,
+ this);
+
+ // Increment the stream count.
+ DISTRIBUTER::instance ()->stream_created ();
+ return 0;
+}
+
+
+void
+Distributer::set_endpoint (const char* flowname, TAO_StreamEndPoint* endpoint)
+{
+ // Set the flowname and teh corresponding endpoint.
+ StreamEndpoint_Item* item;
+
+ ACE_NEW (item,
+ StreamEndpoint_Item);
+
+ item->flowname = flowname;
+
+ item->endpoint = endpoint;
+
+ this->endpoint_set_.insert (item);
+
+}
+
+int
+Distributer_Sender_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the application callback and return to the AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+void
+Distributer_Receiver_Callback::flowname (ACE_CString flowname)
+{
+ this->flowname_ = flowname;
+}
+
+
+ACE_CString
+Distributer_Receiver_Callback::flowname (void)
+{
+ return this->flowname_;
+}
+
+int
+Distributer_Receiver_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ // Upcall from the AVStreams when there is data to be received from the
+ // sender.
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Callback::receive_frame\n"));
+
+
+ int result = 0;
+
+ Protocol_Object_Set_Itor end = DISTRIBUTER::instance ()->protocol_object_set ()->end ();
+ for (Protocol_Object_Set_Itor begin = DISTRIBUTER::instance ()->protocol_object_set ()->begin ();
+ begin != end; ++begin)
+ {
+ // Get the protocol object corresponding to the receiver stream
+ // send the data received from sender to the receivers.
+ result = (*begin)->protocol_object->send_frame (frame);
+ }
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send failed:%p","Distributer::Callback\n"),
+ -1);
+
+ return 0;
+}
+
+int
+Distributer_Receiver_Callback::handle_destroy (void)
+{
+ // Called when the sender requests the stream to be shutdown.
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Receiver_Callback::end_stream\n"));
+
+ // Disconnect all receivers the sender is sending to
+ DISTRIBUTER::instance ()->helper ()->disconnect (DISTRIBUTER::instance ()->get_endpoint_set (),
+ DISTRIBUTER::instance ()->get_streamctrl_map (),
+ ACE_TRY_ENV);
+ // Decrement the stream count.
+ DISTRIBUTER::instance ()->stream_destroyed ();
+ return 0;
+}
+
+
+void
+Distributer_Sender_Callback::flowname (ACE_CString flowname)
+{
+ this->flowname_ = flowname;
+}
+
+
+ACE_CString
+Distributer_Sender_Callback::flowname (void)
+{
+ return this->flowname_;
+}
+
+int
+Distributer_Sender_Callback::handle_destroy (void)
+{
+ // Called when the sender requests the stream to be shutdown.
+ // ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Sender_Callback::end_stream\n"));
+
+ // Decrement the stream count.
+ DISTRIBUTER::instance ()->stream_destroyed ();
+
+ return 0;
+}
+
+Protocol_Object_Set*
+Distributer::protocol_object_set (void)
+{
+ return &this->protocol_object_set_;
+}
+
+void
+Distributer::protocol_object_set (Protocol_Object_Set* protocol_object_set)
+{
+ this->protocol_object_set_ = *protocol_object_set;
+}
+
+int
+Distributer::set_protocol_object (const char* flowname, TAO_AV_Protocol_Object* object)
+{
+ // Set the corresponding protocol objects for the different streams created.
+ // and store them in a protocol object list
+ Protocol_Object_Item* item;
+
+ ACE_NEW_RETURN (item,
+ Protocol_Object_Item,
+ -1);
+
+ item->flowname = flowname;
+
+ item->protocol_object = object;
+
+ this->protocol_object_set_.insert (item);
+
+ return 0;
+}
+
+
+Distributer::Distributer (void)
+ : count_ (0),
+ protocol_ ("UDP"),
+ stream_count_ (0),
+ done_ (0),
+ default_port (9000)
+{
+
+ this->mb.size (BUFSIZ);
+}
+
+Distributer::~Distributer (void)
+{
+}
+
+Connection_Setup_Helper*
+Distributer::helper (void)
+{
+ return &this->helper_;
+}
+
+int
+Distributer::parse_args (int argc,
+ char **argv)
+{
+ // Parse command line arguments
+ ACE_Get_Opt opts (argc,argv,"s:r:");
+
+ int c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 's':
+ this->sender_device_name_ = opts.optarg;
+ break;
+ case 'r':
+ this->distributer_device_name_ = opts.optarg;
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+
+int
+Distributer::init (int argc,
+ char ** argv,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ // Initialize the endpoint strategy with the orb and poa.
+ int result =
+ this->a_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ result =
+ this->b_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ // Parse the command line arguments
+ result = this->parse_args (argc,
+ argv);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Error in Parse Args \n"),
+ -1);
+
+ // Initialize the helper class.
+ this->helper_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+
+
+ ACE_NEW_RETURN (this->distributer_sender_mmdevice_,
+ TAO_MMDevice (&this->a_endpoint_strategy_),
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_sender_mmdevice =
+ this->distributer_sender_mmdevice_;
+
+ // Register the sender mmdevice object with the ORB
+ this->distributer_sender_mmdevice_obj_ =
+ this->distributer_sender_mmdevice_->_this (ACE_TRY_ENV);
+ ACE_CHECK_RETURN(-1);
+
+
+ ACE_NEW_RETURN (this->distributer_receiver_mmdevice_,
+ TAO_MMDevice (&this->b_endpoint_strategy_),
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_receiver_mmdevice =
+ this->distributer_receiver_mmdevice_;
+
+ // Register the receiver mmdevice object with the ORB
+ this->distributer_receiver_mmdevice_obj_ =
+ this->distributer_receiver_mmdevice_->_this (ACE_TRY_ENV);
+ ACE_CHECK_RETURN(-1);
+
+ // Bind to receivers
+ result = this->helper_.bind_to_receivers (&this->distributer_device_name_,
+ this->distributer_sender_mmdevice_obj_.in (),
+ this->recv_obj_ref_set_,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+
+ if (result != 0)
+ return result;
+
+ ACE_CString flow_protocol_str ("");
+
+ ACE_CString address;
+ // Get the local host name
+ char buf [BUFSIZ];
+ ACE_OS::hostname (buf,
+ BUFSIZ);
+
+ // Set the address to the local host and port.
+ address = buf;
+ address += ":9000";
+
+
+ // Connect to receivers
+ result = this->helper_.connect_to_receivers (&this->distributer_device_name_,
+ &flow_protocol_str,
+ &this->protocol_,
+ &address,
+ this->distributer_sender_mmdevice_obj_.in (),
+ &this->recv_obj_ref_set_,
+ this->streamctrl_map_,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+
+ if (result != 0)
+ return result;
+
+ // Bind to sender
+ result = this->helper_.bind_to_sender (&this->sender_device_name_,
+ &this->distributer_device_name_,
+ this->sender_mmdevice_.inout (),
+ this->distributer_receiver_mmdevice_obj_.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+
+ if (result != 0)
+ return result;
+
+ // Set the address to the local host and port.
+ address = "";
+
+ // Connect to sender
+ result = this->helper_.connect_to_sender (&this->sender_device_name_,
+ &this->distributer_device_name_,
+ &flow_protocol_str,
+ &this->protocol_,
+ &address,
+ this->sender_mmdevice_.inout (),
+ this->distributer_receiver_mmdevice_obj_.in (),
+ this->streamctrl_map_,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+
+ if (result != 0)
+ return result;
+
+
+ return 0;
+}
+
+
+StreamCtrl_Map*
+Distributer::get_streamctrl_map (void)
+{
+ return &this->streamctrl_map_;
+}
+
+StreamEndpoint_Set*
+Distributer::get_endpoint_set (void)
+{
+ return &this->endpoint_set_;
+}
+
+void
+Distributer::stream_created (void)
+{
+ this->stream_count_++;
+}
+
+void
+Distributer::stream_destroyed (void)
+{
+ this->stream_count_--;
+
+ if (this->stream_count_ == 0)
+ this->done_ = 1;
+}
+
+int
+Distributer::done (void)
+{
+ return this->done_;
+}
+
+
+int
+main (int argc,
+ char **argv)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ // Initialize the ORB first.
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv,
+ 0,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Get the POA_var object from Object_var.
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ mgr->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Initialize the AVStreams components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Initialize the Distributer
+ int result = DISTRIBUTER::instance ()->init (argc,
+ argv,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (result != 0)
+ return result;
+
+ // run the orb till the streams are not destroyed.
+ while (!DISTRIBUTER::instance ()->done () || orb->work_pending ())
+ orb->perform_work (ACE_TRY_ENV);
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"main");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Singleton <Distributer,ACE_Null_Mutex>;
+template class TAO_AV_Endpoint_Reactive_Strategy_B <Distributer_Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy <Distributer_Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy_A <Distributer_Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy <Distributer_Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Singleton <Distributer,ACE_Null_Mutex>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_B <Distributer_Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy <Distributer_Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A <Distributer_Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy <Distributer_Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */