diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp | 512 |
1 files changed, 0 insertions, 512 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp deleted file mode 100644 index bcc4f71458d..00000000000 --- a/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp +++ /dev/null @@ -1,512 +0,0 @@ -// $Id$ - -#include "distributer.h" -#include "tao/debug.h" -#include "ace/Get_Opt.h" -#include "orbsvcs/AV/Protocol_Factory.h" -#include "orbsvcs/AV/FlowSpec_Entry.h" - -#include "tao/Strategies/advanced_resource.h" - -typedef ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER; - -// constructor. -Signal_Handler::Signal_Handler (void) -{ -} - -int -Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*) -{ - if (signum == SIGINT) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "In the signal handler\n")); - - DISTRIBUTER::instance ()->done (1); - - } - return 0; -} - -int -Distributer_Sender_StreamEndPoint::get_callback (const char *flow_name, - TAO_AV_Callback *&callback) -{ - /// Create and return the sender application callback to AVStreams - /// for further upcalls. - callback = &this->callback_; - - ACE_CString fname = flow_name; - - this->callback_.flowname (fname); - - return 0; -} - -int -Distributer_Sender_StreamEndPoint::set_protocol_object (const char *flowname, - TAO_AV_Protocol_Object *object) -{ - Connection_Manager &connection_manager = - DISTRIBUTER::instance ()->connection_manager (); - - /// Add to the map of protocol objects. - connection_manager.protocol_objects ().bind (flowname, - object); - - /// Store the related streamctrl. - connection_manager.add_streamctrl (flowname, - this); - - return 0; -} - -int -Distributer_Receiver_StreamEndPoint::get_callback (const char *flow_name, - TAO_AV_Callback *&callback) -{ - /// Create and return the receiver application callback to AVStreams - /// for further upcalls. - callback = &this->callback_; - - ACE_CString flowname (flow_name); - this->callback_.flowname (flowname); - - return 0; -} - -int -Distributer_Receiver_StreamEndPoint::set_protocol_object (const char *, - TAO_AV_Protocol_Object *) -{ - /// Increment the stream count. - DISTRIBUTER::instance ()->stream_created (); - - return 0; -} - -CORBA::Boolean -Distributer_Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &flowspec - ACE_ENV_ARG_DECL_NOT_USED) -{ - //if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Distributer_Receiver_StreamEndPoint::handle_connection_requested\n")); - - Connection_Manager &connection_manager = - DISTRIBUTER::instance ()->connection_manager (); - - /// Check to see if the flow already exists. If it does then close the - /// old connection and setup a new one with the new sender. - - for (CORBA::ULong i = 0; - i < flowspec.length (); - i++) - { - TAO_Forward_FlowSpec_Entry entry; - entry.parse (flowspec[i]); - - //if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Handle Conection Requested flowname %s \n", - entry.flowname ())); - - ACE_CString flowname (entry.flowname ()); - - int result = - connection_manager.streamctrls ().find (flowname); - - /// If the flowname is found. - if (result == 0) - { - ACE_DEBUG ((LM_DEBUG, "\nDistributer switching senders handle connection requested\n\n")); - - ///Destroy old stream with the same flowname. - connection_manager.destroy (flowname); - - } - - /// Store the related streamctrl. - connection_manager.add_streamctrl (flowname.c_str (), - this); - - } - return 1; - -} - - -Distributer_Receiver_Callback::Distributer_Receiver_Callback (void) - : frame_count_ (1) -{ -} - -ACE_CString & -Distributer_Receiver_Callback::flowname (void) -{ - return this->flowname_; -} - -void -Distributer_Receiver_Callback::flowname (const ACE_CString &flowname) -{ - this->flowname_ = flowname; -} - - -int -Distributer_Receiver_Callback::receive_frame (ACE_Message_Block *frame, - TAO_AV_frame_info *, - const ACE_Addr &) -{ - /// Upcall from the AVStreams when there is data to be received from - /// the sender. - ACE_DEBUG ((LM_DEBUG, - "Distributer_Callback::receive_frame for frame %d\n", - this->frame_count_++)); - - Connection_Manager::Protocol_Objects &protocol_objects = - DISTRIBUTER::instance ()->connection_manager ().protocol_objects (); - - /// Send frame to all receivers. - for (Connection_Manager::Protocol_Objects::iterator iterator = protocol_objects.begin (); - iterator != protocol_objects.end (); - ++iterator) - { - int result = - (*iterator).int_id_->send_frame (frame); - - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "send failed:%p", - "Sender::pace_data send\n"), - -1); - } - - return 0; -} - -int -Distributer_Receiver_Callback::handle_destroy (void) -{ - /// Called when the sender requests the stream to be shutdown. - ACE_DEBUG ((LM_DEBUG, - "Distributer_Receiver_Callback::end_stream\n")); - - DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ()); - - /// Decrement the stream count. - DISTRIBUTER::instance ()->stream_destroyed (); - - return 0; -} - -ACE_CString & -Distributer_Sender_Callback::flowname (void) -{ - return this->flowname_; -} - -void -Distributer_Sender_Callback::flowname (const ACE_CString &flowname) -{ - this->flowname_ = flowname; -} - -int -Distributer_Sender_Callback::handle_destroy (void) -{ - /// Called when the sender requests the stream to be shutdown. - - ACE_DEBUG ((LM_DEBUG, - "Distributer_Sender_Callback::end_stream\n")); - - DISTRIBUTER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_.c_str ()); - - DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ()); - - DISTRIBUTER::instance ()->connection_manager ().receivers ().unbind (this->flowname_.c_str ()); - - return 0; -} - -Distributer::Distributer (void) - : sender_name_ ("sender"), - distributer_name_ ("distributer"), - done_ (0), - stream_count_ (0) -{ -} - -Distributer::~Distributer (void) -{ -} - -void -Distributer::stream_created (void) -{ - this->stream_count_++; -} - -void -Distributer::stream_destroyed (void) -{ - this->stream_count_--; - - if (this->stream_count_ == 0) - this->done_ = 1; -} - - -Connection_Manager & -Distributer::connection_manager (void) -{ - return this->connection_manager_; -} - -int -Distributer::parse_args (int argc, - char **argv) -{ - /// Parse command line arguments - ACE_Get_Opt opts (argc, argv, "s:r:"); - - int c; - while ((c= opts ()) != -1) - { - switch (c) - { - case 's': - this->sender_name_ = opts.opt_arg (); - break; - case 'r': - this->distributer_name_ = opts.opt_arg (); - break; - default: - ACE_DEBUG ((LM_DEBUG,"Unknown Option\n")); - return -1; - } - } - return 0; -} - - -int -Distributer::init (int argc, - char ** argv - ACE_ENV_ARG_DECL) -{ - /// Initialize the connection class. - int result = - this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ()); - if (result != 0) - return result; - - /// Initialize the endpoint strategy with the orb and poa. - result = - this->sender_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (), - TAO_AV_CORE::instance ()->poa ()); - if (result != 0) - return result; - - result = - this->receiver_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (), - TAO_AV_CORE::instance ()->poa ()); - if (result != 0) - return result; - - /// Parse the command line arguments - result = - this->parse_args (argc, - argv); - if (result != 0) - return result; - - ACE_Reactor *reactor = - TAO_AV_CORE::instance ()->reactor (); - - if (reactor->register_handler (SIGINT, - &this->signal_handler_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error in handler register\n"), - -1); - /// Register the signal handler for clean termination of the process. - - ACE_NEW_RETURN (this->distributer_sender_mmdevice_, - TAO_MMDevice (&this->sender_endpoint_strategy_), - -1); - - /// Servant Reference Counting to manage lifetime - PortableServer::ServantBase_var safe_sender_mmdevice = - this->distributer_sender_mmdevice_; - - AVStreams::MMDevice_var distributer_sender_mmdevice = - this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - ACE_NEW_RETURN (this->distributer_receiver_mmdevice_, - TAO_MMDevice (&this->receiver_endpoint_strategy_), - -1); - - /// Servant Reference Counting to manage lifetime - PortableServer::ServantBase_var safe_receiver_mmdevice = - this->distributer_receiver_mmdevice_; - - AVStreams::MMDevice_var distributer_receiver_mmdevice = - this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - - /// Bind to sender. - this->connection_manager_.bind_to_sender (this->sender_name_, - this->distributer_name_, - distributer_receiver_mmdevice.in () - ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - /// Connect to sender. - this->connection_manager_.connect_to_sender (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - /// Bind to receivers. - this->connection_manager_.bind_to_receivers (this->distributer_name_, - distributer_sender_mmdevice.in () - ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - /// Connect to receivers - this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - return 0; -} - -int -Distributer::done (void) const -{ - return this->done_; -} - -void -Distributer::shut_down (ACE_ENV_SINGLE_ARG_DECL) -{ - ACE_TRY - { - AVStreams::MMDevice_var receiver_mmdevice = - this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - DISTRIBUTER::instance ()->connection_manager ().unbind_receiver (this->sender_name_, - this->distributer_name_, - receiver_mmdevice.in ()); - AVStreams::MMDevice_var sender_mmdevice = - this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - DISTRIBUTER::instance ()->connection_manager ().unbind_sender (this->distributer_name_, - sender_mmdevice.in ()); - - // DISTRIBUTER::instance ()->connection_manager ().destroy (ACE_ENV_SINGLE_ARG_PARAMETER); - // ACE_TRY_CHECK; - - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Distributer::shut_down"); - } - ACE_ENDTRY; -} - -void -Distributer::done (int done) -{ - this->done_ = done; -} - -int -main (int argc, - char **argv) -{ - ACE_DECLARE_NEW_CORBA_ENV; - ACE_TRY - { - /// Initialize the ORB first. - CORBA::ORB_var orb = - CORBA::ORB_init (argc, - argv, - 0 - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - CORBA::Object_var obj - = orb->resolve_initial_references ("RootPOA" - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - /// Get the POA_var object from Object_var. - PortableServer::POA_var root_poa = - PortableServer::POA::_narrow (obj.in () - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - PortableServer::POAManager_var mgr - = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - /// Initialize the AVStreams components. - TAO_AV_CORE::instance ()->init (orb.in (), - root_poa.in () - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - /// Initialize the Distributer - int result = - DISTRIBUTER::instance ()->init (argc, - argv - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - if (result != 0) - return result; - - while (!DISTRIBUTER::instance ()->done ()) - { - CORBA::Boolean wp = orb->work_pending (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - if (wp) - { - orb->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER); - - ACE_TRY_CHECK; - } - } - - DISTRIBUTER::instance ()->shut_down (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - -// orb->shutdown(1 ACE_ENV_ARG_PARAMETER); -// ACE_TRY_CHECK; - - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"main"); - return -1; - } - ACE_ENDTRY; - ACE_CHECK_RETURN (-1); - - DISTRIBUTER::close (); // Explicitly finalize the Unmanaged_Singleton. - - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION) -template ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex>::singleton_; -#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */ |