diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Setup_Helper.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Setup_Helper.cpp | 550 |
1 files changed, 550 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Setup_Helper.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Setup_Helper.cpp new file mode 100644 index 00000000000..23faf19977e --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Setup_Helper.cpp @@ -0,0 +1,550 @@ +//$Id$ +#include "Connection_Setup_Helper.h" + +Connection_Setup_Helper::Connection_Setup_Helper (void) +{ +} + +Connection_Setup_Helper::~Connection_Setup_Helper (void) +{ +} + +int +Connection_Setup_Helper::init (CORBA::ORB_ptr orb, PortableServer::POA_ptr poa) +{ + this->orb_ = orb; + this->poa_ = poa; + + // Initialize the naming services + if (this->my_naming_client_.init (this->orb_.in ()) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize " + "the TAO_Naming_Client. \n"), + -1); + return 0; +} + + +int +Connection_Setup_Helper::create_recv_obj_ref_list (CosNaming::BindingList &bl, + ACE_CString *sender_device_name, + Recv_Obj_Ref_Set &recv_obj_ref_set, + CORBA::Environment& ACE_TRY_ENV) +{ + for (CORBA::ULong i = 0; i < bl.length (); i++) + { + Recv_Obj_Ref_Item *item; + + // Create the recv_onj_ref_item + ACE_NEW_RETURN (item, + Recv_Obj_Ref_Item, + -1); + + // get the device name from the binding list + item->name = bl [i].binding_name [0].id.in (); + + + // Create the full name of the receiver for resolving from NS + // correctly + CosNaming::Name mmdevice_name (1); + mmdevice_name.length (1); + mmdevice_name [0].id = CORBA::string_dup (sender_device_name->c_str ()); + + mmdevice_name.length (2); + mmdevice_name [1].id = CORBA::string_dup ("Receivers"); + + mmdevice_name.length (3); + mmdevice_name [2].id = CORBA::string_dup (item->name.c_str ()); + + // Resolve the reference of the receiver from the NS. + CORBA::Object_var obj = my_naming_client_->resolve (mmdevice_name, + ACE_TRY_ENV); + + // Store the reference in the item + item->ref = AVStreams::MMDevice::_narrow (obj.in ()); + + // Add the object reference item corresponding to this receuver + // in the set + recv_obj_ref_set.insert (item); + } + return 0; +} + + +int +Connection_Setup_Helper::bind_to_receivers (ACE_CString *sender_device_name, + AVStreams::MMDevice_ptr sender_mmdevice_obj, + Recv_Obj_Ref_Set &recv_obj_ref_set, + CORBA::Environment &ACE_TRY_ENV) +{ + + // Create name with the name of the sender + CosNaming::Name sender_mmdevice_name (1); + sender_mmdevice_name.length (1); + sender_mmdevice_name [0].id = CORBA::string_dup (sender_device_name->c_str ()); + + + ACE_TRY_EX (bind_naming_context) + { + // Try binding the sender context to the NS + this->my_naming_client_->bind_new_context (sender_mmdevice_name, + ACE_TRY_ENV); + ACE_TRY_CHECK_EX (bind_naming_context); + + // Sender context does not exist + + // Create the name for storing the receivers + sender_mmdevice_name.length (2); + sender_mmdevice_name [1].id = CORBA::string_dup ("Receivers"); + + // Try binding the receivers context under the sender + this->my_naming_client_->bind_new_context (sender_mmdevice_name, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + + } + ACE_CATCH (CosNaming::NamingContext::AlreadyBound, al_ex) + { + + // The sender is already bound, probably by the receiver. + + sender_mmdevice_name.length (2); + sender_mmdevice_name [1].id = CORBA::string_dup ("Receivers"); + + // Try to resolev the receivers + CORBA::Object_var rc_obj = this->my_naming_client_->resolve (sender_mmdevice_name, + ACE_TRY_ENV); + + CosNaming::NamingContext_var rc = CosNaming::NamingContext::_narrow (rc_obj.in ()); + + + CosNaming::BindingIterator_var it; + CosNaming::BindingList_var bl; + const CORBA::ULong CHUNK = 100; + + // Get the list of receivers registered for this server + rc->list (CHUNK, + bl, + it); + + // Add the receivers found in the bindinglist to the recv_obj_ref_set + create_recv_obj_ref_list (bl, + sender_device_name, + recv_obj_ref_set, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (!CORBA::is_nil (it.in ())) + { + CORBA::Boolean more = 1; + // Check to see if there are more receivers listed. + while (more) + { + more = it->next_n (CHUNK, bl); + + create_recv_obj_ref_list (bl, + sender_device_name, + recv_obj_ref_set, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + } + } + } + ACE_ENDTRY; + + + sender_mmdevice_name.length (2); + sender_mmdevice_name [1].id = CORBA::string_dup (sender_device_name->c_str ()); + + // Register the sender object with the naming server. + this->my_naming_client_->rebind (sender_mmdevice_name, + sender_mmdevice_obj, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + + + return 0; +} + +int +Connection_Setup_Helper::connect_to_receivers (ACE_CString *sender_device_name, + ACE_CString *flow_protocol_str, + ACE_CString *protocol, + ACE_CString *host_address, + AVStreams::MMDevice_ptr sender_mmdevice_obj, + Recv_Obj_Ref_Set *recv_obj_ref_set, + StreamCtrl_Map &streamctrl_map, + CORBA::Environment& ACE_TRY_ENV) +{ + + if (recv_obj_ref_set->size () == 0) + return 0; + + ACE_CString address (host_address->c_str ()); + + // Set the address of the of the distributer receiver endpoint. + ACE_INET_Addr sender_addr (address.c_str ()); + + char hostname [BUFSIZ]; + + int port = sender_addr.get_port_number (); + sender_addr.get_host_name (hostname, + BUFSIZ); + + ACE_DEBUG ((LM_DEBUG, + "Hostname:Port is %s:%d\n", + hostname, + port)); + + // Connect to all the receivers bound to + Recv_Obj_Ref_Set_Itor end = recv_obj_ref_set->end (); + for (Recv_Obj_Ref_Set_Itor begin = recv_obj_ref_set->begin (); + begin != end; ++begin) + { + // Initialize the QoS + AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); + + char buf [BUFSIZ]; + + address = hostname; + address += ":"; + address += ACE_OS::itoa (port++, buf,10); + + sender_addr.set (address.c_str ()); + + ACE_CString flowname (sender_device_name->c_str ()); + flowname += "_"; + flowname += (*begin)->name; + + // Create the forward flow specification to describe the flow. + TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (), + "IN", + "USER_DEFINED", + flow_protocol_str->c_str (), + protocol->c_str (), + &sender_addr); + + // Set the flow specification for the stream between receiver and distributer + AVStreams::flowSpec flow_spec (1); + flow_spec.length (1); + flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ()); + + StreamCtrl_Item *item; + ACE_NEW_RETURN (item, + StreamCtrl_Item, + -1); + + // Create the stream control for this stream. + TAO_StreamCtrl* streamctrl; + ACE_NEW_RETURN (streamctrl, + TAO_StreamCtrl, + -1); + + // Servant Reference Counting to manage lifetime + PortableServer::ServantBase_var safe_streamctrl = streamctrl; + + + item->streamctrl = streamctrl; + item->flowname = flowname; + + TAO_String_Hash_Key flowname_key (CORBA::string_dup (flowname.c_str ())); + + // Bind the flowname and the corresponding stream controller to the stream controller map + if (streamctrl_map.bind (flowname_key, item) != 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Bind to Stream Ctrl Map failed\n"), + -1); + } + + // Bind/Connect the sender and receiver MMDevices. + int result = streamctrl->bind_devs (sender_mmdevice_obj, + (*begin)->ref.in (), + the_qos.inout (), + flow_spec, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in binding sender and receiver devices\n"), + -1); + + } + return 0; +} + +int +Connection_Setup_Helper::bind_to_sender (ACE_CString *sender_device_name, + ACE_CString *receiver_device_name, + AVStreams::MMDevice_ptr &sender_mmdevice_obj, + AVStreams::MMDevice_ptr receiver_mmdevice_obj, + CORBA::Environment &ACE_TRY_ENV) +{ + CosNaming::Name mmdevice_naming_name (1); + + ACE_TRY_EX (resolve_sender) + { + + // Create name with the name of the sender + mmdevice_naming_name.length (1); + mmdevice_naming_name [0].id = CORBA::string_dup (sender_device_name->c_str ()); + + mmdevice_naming_name.length (2); + mmdevice_naming_name [1].id = CORBA::string_dup (sender_device_name->c_str ()); + + // Resolve the sender mmdevice object reference from the Naming Service + CORBA::Object_var mmdevice_obj = + this->my_naming_client_->resolve (mmdevice_naming_name, + ACE_TRY_ENV); + ACE_TRY_CHECK_EX (resolve_sender); + + sender_mmdevice_obj = + AVStreams::MMDevice::_narrow (mmdevice_obj.in (), + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (CORBA::is_nil (sender_mmdevice_obj)) + ACE_ERROR_RETURN ((LM_ERROR, + "Could not resolve MMdevice in Naming service \n"), + -1); + } + ACE_CATCH (CosNaming::NamingContext::NotFound, al_ex) + { + // Sender does not exist + ACE_TRY_EX (bind_sender_context) + { + mmdevice_naming_name.length (1); + mmdevice_naming_name [0].id = CORBA::string_dup (sender_device_name->c_str ()); + + // Create the sender context + this->my_naming_client_->bind_new_context (mmdevice_naming_name, + ACE_TRY_ENV); + ACE_TRY_CHECK_EX (bind_sender_context); + + mmdevice_naming_name.length (2); + mmdevice_naming_name [1].id = CORBA::string_dup ("Receivers"); + + // Craete the receivers context under the sender's context + this->my_naming_client_->bind_new_context (mmdevice_naming_name, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + } + ACE_CATCH (CosNaming::NamingContext::AlreadyBound, al_ex) + { + } + ACE_ENDTRY; + } + ACE_ENDTRY; + + mmdevice_naming_name.length (2); + mmdevice_naming_name [1].id = CORBA::string_dup ("Receivers"); + + mmdevice_naming_name.length (3); + mmdevice_naming_name [2].id = CORBA::string_dup (receiver_device_name->c_str ()); + + + // Register the receiver object under the sender context with the naming server. + this->my_naming_client_->rebind (mmdevice_naming_name, + receiver_mmdevice_obj, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + return 0; +} + + +int +Connection_Setup_Helper::connect_to_sender (ACE_CString *sender_device_name, + ACE_CString *receiver_device_name, + ACE_CString *flow_protocol_str, + ACE_CString *protocol, + ACE_CString *host_address, + AVStreams::MMDevice_ptr &sender_mmdevice_obj, + AVStreams::MMDevice_ptr receiver_mmdevice_obj, + StreamCtrl_Map &streamctrl_map, + CORBA::Environment& ACE_TRY_ENV) +{ + + if (CORBA::is_nil (sender_mmdevice_obj)) + return 0; + + + // Initialize the QoS + AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); + + // Set the address of the sender. + ACE_INET_Addr sender_addr (host_address->c_str ()); + + ACE_CString flowname (receiver_device_name->c_str ()); + flowname += "_"; + flowname += sender_device_name->c_str (); + + // Create the forward flow specification to describe the flow. + TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (), + "IN", + "USER_DEFINED", + flow_protocol_str->c_str (), + protocol->c_str (), + &sender_addr); + + + // Set the flow specification for the stream between sender and distributer + AVStreams::flowSpec flow_spec (1); + flow_spec.length (1); + flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ()); + + StreamCtrl_Item *item; + ACE_NEW_RETURN (item, + StreamCtrl_Item, + -1); + + // Create teh stream control for the stream + TAO_StreamCtrl* streamctrl; + ACE_NEW_RETURN (streamctrl, + TAO_StreamCtrl, + -1); + + // Servant Reference Counting to manage lifetime + PortableServer::ServantBase_var safe_streamctrl = streamctrl; + + item->streamctrl = streamctrl; + item->flowname = flowname; + + TAO_String_Hash_Key flowname_key (CORBA::string_dup (flowname.c_str ())); + + // Bind the streamcontroller and stream name to the Stream Controller hash map. + if (streamctrl_map.bind (flowname_key, item) != 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Bind to Stream Ctrl Map failed\n"), + -1); + } + + // Bind/Connect the sender and receiver MMDevices. + CORBA::Boolean res = + streamctrl->bind_devs (sender_mmdevice_obj, + receiver_mmdevice_obj, + the_qos.inout (), + flow_spec, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (res == 0) + ACE_ERROR_RETURN ((LM_ERROR,"Streamctrl::bind_devs failed\n"),-1); + + // Start the data sending + AVStreams::flowSpec start_spec; + streamctrl->start (start_spec,ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + +return 0; +} + +int +Connection_Setup_Helper::disconnect (StreamEndpoint_Set *endpoint_set, + StreamCtrl_Map *streamctrl_map, + CORBA::Environment& ACE_TRY_ENV) +{ + AVStreams::flowSpec stop_spec; + + // Disconnect every stream whose endpoints are in the endpoint_set. + StreamEndpoint_Set_Itor end = endpoint_set->end (); + for (StreamEndpoint_Set_Itor begin = endpoint_set->begin (); + begin != end; ++begin) + { + ACE_CString flowname = (*begin)->flowname; + TAO_String_Hash_Key flow_name_key (flowname.c_str ()); + StreamCtrl_Map_Entry *streamctrl_entry = 0; + // Check if this flow has been initiated by this device. If so then + if (streamctrl_map->find (flow_name_key, + streamctrl_entry) == 0) + { + // The flow has been initiated by this device and hence + // call destroy on the stream controller. + streamctrl_entry->int_id_->streamctrl->destroy (stop_spec, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + } + else + { + // The flow was initiated by the peer device. + + // Get the stream controller for this stream. This is the stream controller of the peer device + CORBA::Any_ptr streamctrl_any = (*begin)->endpoint->get_property_value ("Related_StreamCtrl", + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + AVStreams::StreamCtrl_ptr streamctrl; + + *streamctrl_any >>= streamctrl; + + //Destroy the stream + streamctrl->destroy (stop_spec, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + } + } + + return 0; + +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Hash<TAO_String_Hash_Key>; +template class ACE_Equal_To<TAO_String_Hash_Key>; + +template class ACE_Hash_Map_Entry<TAO_String_Hash_Key, StreamCtrl_Item*>; +template class ACE_Hash_Map_Manager<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; + +template class ACE_Hash_Map_Iterator<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; + +template class ACE_Node <Recv_Obj_Ref_Item*>; +template class ACE_Unbounded_Set <Recv_Obj_Ref_Item*>; +template class ACE_Unbounded_Set_Iterator <Recv_Obj_Ref_Item*>; + +template class ACE_Node <Protocol_Object_Item*>; +template class ACE_Unbounded_Set <Protocol_Object_Item*>; +template class ACE_Unbounded_Set_Iterator <Protocol_Object_Item*>; + +template class ACE_Node <StreamEndpoint_Item*>; +template class ACE_Unbounded_Set <StreamEndpoint_Item*>; +template class ACE_Unbounded_Set_Iterator <StreamEndpoint_Item*>; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiation ACE_Hash<TAO_String_Hash_Key>; +#pragma instantiation ACE_Equal_To<TAO_String_Hash_Key>; + +#pragma instantiation ACE_Hash_Map_Entry<TAO_String_Hash_Key, StreamCtrl_Item*>; +#pragma instantiation ACE_Hash_Map_Manager<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Null_Mutex>; +#pragma instantiation ACE_Hash_Map_Manager_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; + +#pragma instantiation ACE_Hash_Map_Iterator<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Null_Mutex>; +#pragma instantiation ACE_Hash_Map_Iterator_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; +#pragma instantiation ACE_Hash_Map_Iterator_Base_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; +#pragma instantiation ACE_Hash_Map_Reverse_Iterator<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Null_Mutex>; +#pragma instantiation ACE_Hash_Map_Reverse_Iterator_Ex<TAO_String_Hash_Key, StreamCtrl_Item*, ACE_Hash<TAO_String_Hash_Key>, ACE_Equal_To<TAO_String_Hash_Key>, ACE_Null_Mutex>; + +#pragma instantiation ACE_Node <Recv_Obj_Ref_Item*>; +#pragma instantiation ACE_Unbounded_Set <Recv_Obj_Ref_Item*>; +#pragma instantiation ACE_Unbounded_Set_Iterator <Recv_Obj_Ref_Item*>; + +#pragma instantiation ACE_Node <Protocol_Object_Item*>; +#pragma instantiation ACE_Unbounded_Set <Protocol_Object_Item*>; +#pragma instantiation ACE_Unbounded_Set_Iterator <Protocol_Object_Item*>; + +#pragma instantiation ACE_Node <StreamEndpoint_Item*>; +#pragma instantiation ACE_Unbounded_Set <StreamEndpoint_Item*>; +#pragma instantiation ACE_Unbounded_Set_Iterator <StreamEndpoint_Item*>; + + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |