summaryrefslogtreecommitdiff
path: root/trunk/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp')
-rw-r--r--trunk/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp512
1 files changed, 512 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp b/trunk/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp
new file mode 100644
index 00000000000..bcc4f71458d
--- /dev/null
+++ b/trunk/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp
@@ -0,0 +1,512 @@
+// $Id$
+
+#include "distributer.h"
+#include "tao/debug.h"
+#include "ace/Get_Opt.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+#include "orbsvcs/AV/FlowSpec_Entry.h"
+
+#include "tao/Strategies/advanced_resource.h"
+
+typedef ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER;
+
+// constructor.
+Signal_Handler::Signal_Handler (void)
+{
+}
+
+int
+Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*)
+{
+ if (signum == SIGINT)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "In the signal handler\n"));
+
+ DISTRIBUTER::instance ()->done (1);
+
+ }
+ return 0;
+}
+
+int
+Distributer_Sender_StreamEndPoint::get_callback (const char *flow_name,
+ TAO_AV_Callback *&callback)
+{
+ /// Create and return the sender application callback to AVStreams
+ /// for further upcalls.
+ callback = &this->callback_;
+
+ ACE_CString fname = flow_name;
+
+ this->callback_.flowname (fname);
+
+ return 0;
+}
+
+int
+Distributer_Sender_StreamEndPoint::set_protocol_object (const char *flowname,
+ TAO_AV_Protocol_Object *object)
+{
+ Connection_Manager &connection_manager =
+ DISTRIBUTER::instance ()->connection_manager ();
+
+ /// Add to the map of protocol objects.
+ connection_manager.protocol_objects ().bind (flowname,
+ object);
+
+ /// Store the related streamctrl.
+ connection_manager.add_streamctrl (flowname,
+ this);
+
+ return 0;
+}
+
+int
+Distributer_Receiver_StreamEndPoint::get_callback (const char *flow_name,
+ TAO_AV_Callback *&callback)
+{
+ /// Create and return the receiver application callback to AVStreams
+ /// for further upcalls.
+ callback = &this->callback_;
+
+ ACE_CString flowname (flow_name);
+ this->callback_.flowname (flowname);
+
+ return 0;
+}
+
+int
+Distributer_Receiver_StreamEndPoint::set_protocol_object (const char *,
+ TAO_AV_Protocol_Object *)
+{
+ /// Increment the stream count.
+ DISTRIBUTER::instance ()->stream_created ();
+
+ return 0;
+}
+
+CORBA::Boolean
+Distributer_Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &flowspec
+ ACE_ENV_ARG_DECL_NOT_USED)
+{
+ //if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Receiver_StreamEndPoint::handle_connection_requested\n"));
+
+ Connection_Manager &connection_manager =
+ DISTRIBUTER::instance ()->connection_manager ();
+
+ /// Check to see if the flow already exists. If it does then close the
+ /// old connection and setup a new one with the new sender.
+
+ for (CORBA::ULong i = 0;
+ i < flowspec.length ();
+ i++)
+ {
+ TAO_Forward_FlowSpec_Entry entry;
+ entry.parse (flowspec[i]);
+
+ //if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Handle Conection Requested flowname %s \n",
+ entry.flowname ()));
+
+ ACE_CString flowname (entry.flowname ());
+
+ int result =
+ connection_manager.streamctrls ().find (flowname);
+
+ /// If the flowname is found.
+ if (result == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "\nDistributer switching senders handle connection requested\n\n"));
+
+ ///Destroy old stream with the same flowname.
+ connection_manager.destroy (flowname);
+
+ }
+
+ /// Store the related streamctrl.
+ connection_manager.add_streamctrl (flowname.c_str (),
+ this);
+
+ }
+ return 1;
+
+}
+
+
+Distributer_Receiver_Callback::Distributer_Receiver_Callback (void)
+ : frame_count_ (1)
+{
+}
+
+ACE_CString &
+Distributer_Receiver_Callback::flowname (void)
+{
+ return this->flowname_;
+}
+
+void
+Distributer_Receiver_Callback::flowname (const ACE_CString &flowname)
+{
+ this->flowname_ = flowname;
+}
+
+
+int
+Distributer_Receiver_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ /// Upcall from the AVStreams when there is data to be received from
+ /// the sender.
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Callback::receive_frame for frame %d\n",
+ this->frame_count_++));
+
+ Connection_Manager::Protocol_Objects &protocol_objects =
+ DISTRIBUTER::instance ()->connection_manager ().protocol_objects ();
+
+ /// Send frame to all receivers.
+ for (Connection_Manager::Protocol_Objects::iterator iterator = protocol_objects.begin ();
+ iterator != protocol_objects.end ();
+ ++iterator)
+ {
+ int result =
+ (*iterator).int_id_->send_frame (frame);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send failed:%p",
+ "Sender::pace_data send\n"),
+ -1);
+ }
+
+ return 0;
+}
+
+int
+Distributer_Receiver_Callback::handle_destroy (void)
+{
+ /// Called when the sender requests the stream to be shutdown.
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Receiver_Callback::end_stream\n"));
+
+ DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
+
+ /// Decrement the stream count.
+ DISTRIBUTER::instance ()->stream_destroyed ();
+
+ return 0;
+}
+
+ACE_CString &
+Distributer_Sender_Callback::flowname (void)
+{
+ return this->flowname_;
+}
+
+void
+Distributer_Sender_Callback::flowname (const ACE_CString &flowname)
+{
+ this->flowname_ = flowname;
+}
+
+int
+Distributer_Sender_Callback::handle_destroy (void)
+{
+ /// Called when the sender requests the stream to be shutdown.
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Sender_Callback::end_stream\n"));
+
+ DISTRIBUTER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_.c_str ());
+
+ DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
+
+ DISTRIBUTER::instance ()->connection_manager ().receivers ().unbind (this->flowname_.c_str ());
+
+ return 0;
+}
+
+Distributer::Distributer (void)
+ : sender_name_ ("sender"),
+ distributer_name_ ("distributer"),
+ done_ (0),
+ stream_count_ (0)
+{
+}
+
+Distributer::~Distributer (void)
+{
+}
+
+void
+Distributer::stream_created (void)
+{
+ this->stream_count_++;
+}
+
+void
+Distributer::stream_destroyed (void)
+{
+ this->stream_count_--;
+
+ if (this->stream_count_ == 0)
+ this->done_ = 1;
+}
+
+
+Connection_Manager &
+Distributer::connection_manager (void)
+{
+ return this->connection_manager_;
+}
+
+int
+Distributer::parse_args (int argc,
+ char **argv)
+{
+ /// Parse command line arguments
+ ACE_Get_Opt opts (argc, argv, "s:r:");
+
+ int c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 's':
+ this->sender_name_ = opts.opt_arg ();
+ break;
+ case 'r':
+ this->distributer_name_ = opts.opt_arg ();
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+
+int
+Distributer::init (int argc,
+ char ** argv
+ ACE_ENV_ARG_DECL)
+{
+ /// Initialize the connection class.
+ int result =
+ this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ());
+ if (result != 0)
+ return result;
+
+ /// Initialize the endpoint strategy with the orb and poa.
+ result =
+ this->sender_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ result =
+ this->receiver_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ /// Parse the command line arguments
+ result =
+ this->parse_args (argc,
+ argv);
+ if (result != 0)
+ return result;
+
+ ACE_Reactor *reactor =
+ TAO_AV_CORE::instance ()->reactor ();
+
+ if (reactor->register_handler (SIGINT,
+ &this->signal_handler_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in handler register\n"),
+ -1);
+ /// Register the signal handler for clean termination of the process.
+
+ ACE_NEW_RETURN (this->distributer_sender_mmdevice_,
+ TAO_MMDevice (&this->sender_endpoint_strategy_),
+ -1);
+
+ /// Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_sender_mmdevice =
+ this->distributer_sender_mmdevice_;
+
+ AVStreams::MMDevice_var distributer_sender_mmdevice =
+ this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_NEW_RETURN (this->distributer_receiver_mmdevice_,
+ TAO_MMDevice (&this->receiver_endpoint_strategy_),
+ -1);
+
+ /// Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_receiver_mmdevice =
+ this->distributer_receiver_mmdevice_;
+
+ AVStreams::MMDevice_var distributer_receiver_mmdevice =
+ this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+
+ /// Bind to sender.
+ this->connection_manager_.bind_to_sender (this->sender_name_,
+ this->distributer_name_,
+ distributer_receiver_mmdevice.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ /// Connect to sender.
+ this->connection_manager_.connect_to_sender (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ /// Bind to receivers.
+ this->connection_manager_.bind_to_receivers (this->distributer_name_,
+ distributer_sender_mmdevice.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ /// Connect to receivers
+ this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+int
+Distributer::done (void) const
+{
+ return this->done_;
+}
+
+void
+Distributer::shut_down (ACE_ENV_SINGLE_ARG_DECL)
+{
+ ACE_TRY
+ {
+ AVStreams::MMDevice_var receiver_mmdevice =
+ this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ DISTRIBUTER::instance ()->connection_manager ().unbind_receiver (this->sender_name_,
+ this->distributer_name_,
+ receiver_mmdevice.in ());
+ AVStreams::MMDevice_var sender_mmdevice =
+ this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ DISTRIBUTER::instance ()->connection_manager ().unbind_sender (this->distributer_name_,
+ sender_mmdevice.in ());
+
+ // DISTRIBUTER::instance ()->connection_manager ().destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ // ACE_TRY_CHECK;
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Distributer::shut_down");
+ }
+ ACE_ENDTRY;
+}
+
+void
+Distributer::done (int done)
+{
+ this->done_ = done;
+}
+
+int
+main (int argc,
+ char **argv)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ /// Initialize the ORB first.
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc,
+ argv,
+ 0
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ /// Get the POA_var object from Object_var.
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (obj.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ /// Initialize the AVStreams components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ /// Initialize the Distributer
+ int result =
+ DISTRIBUTER::instance ()->init (argc,
+ argv
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (result != 0)
+ return result;
+
+ while (!DISTRIBUTER::instance ()->done ())
+ {
+ CORBA::Boolean wp = orb->work_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (wp)
+ {
+ orb->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ ACE_TRY_CHECK;
+ }
+ }
+
+ DISTRIBUTER::instance ()->shut_down (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+// orb->shutdown(1 ACE_ENV_ARG_PARAMETER);
+// ACE_TRY_CHECK;
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"main");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ DISTRIBUTER::close (); // Explicitly finalize the Unmanaged_Singleton.
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
+template ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex>::singleton_;
+#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */