From 0fa7172c89be4041d90519a662c7586f8a719b23 Mon Sep 17 00:00:00 2001 From: naga Date: Mon, 12 Jul 1999 01:06:00 +0000 Subject: Added code for the FlowEndPoint to make use of the Pluggable Data Protocols. Also fixed a few bugs in the full profile TAO_StreamCtrl::bind and other full profile classes like TAO_FlowConnection. Now the AV application can make use of FDev in MMDevices for each flow and automagically the appropriate protocols will be chosen depending on the common protocol between 2 flow endpoints. --- TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp | 1254 ++++++++++++++++++++++---------- TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h | 231 +++--- 2 files changed, 1030 insertions(+), 455 deletions(-) diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp index a5203aa47c4..45b10888710 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp @@ -64,6 +64,8 @@ get_flowname (const char *flow_spec_entry_str) ACE_CString flow_name; if (slash_pos != flow_spec_entry.npos) flow_name = flow_spec_entry.substring (0,slash_pos); + else + flow_name = flow_spec_entry_str; return CORBA::string_dup (flow_name.c_str ()); } @@ -81,15 +83,7 @@ AV_Null_MediaCtrl::AV_Null_MediaCtrl (void) // Constructor TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void) - :vdev_a_ (AVStreams::VDev::_nil ()), - vdev_b_ (AVStreams::VDev::_nil ()), - sep_a_ (AVStreams::StreamEndPoint_A::_nil ()), - sep_b_ (AVStreams::StreamEndPoint_B::_nil ()), - flow_count_ (0) -{ -} - -TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void) + :flow_count_ (0) { } @@ -112,7 +106,7 @@ TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec, flow_connection_entry->int_id_->stop (ACE_TRY_ENV); } - if (CORBA::is_nil (this->sep_a_)) + if (CORBA::is_nil (this->sep_a_.in ())) return; // Make the upcall into the application @@ -138,7 +132,7 @@ TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec, flow_connection_entry->int_id_->start (ACE_TRY_ENV); } - if (CORBA::is_nil (this->sep_a_)) + if (CORBA::is_nil (this->sep_a_.in ())) return; // Make the upcall into the application @@ -165,7 +159,7 @@ TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec, flow_connection_entry->int_id_->destroy (ACE_TRY_ENV); } - if (CORBA::is_nil (this->sep_a_)) + if (CORBA::is_nil (this->sep_a_.in ())) return; // Make the upcall into the application @@ -220,7 +214,7 @@ TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec, AVStreams::FPError)) { - if (!CORBA::is_nil (this->sep_a_)) + if (!CORBA::is_nil (this->sep_a_.in ())) { this->sep_a_->set_FPStatus (flow_spec,fp_name,fp_settings,ACE_TRY_ENV); ACE_CHECK; @@ -238,7 +232,7 @@ TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name, TAO_String_Hash_Key flow_name_key (flow_name); FlowConnection_Map::ENTRY *flow_connection_entry = 0; if (this->flow_connection_map_.find (flow_name_key,flow_connection_entry) == 0) - return flow_connection_entry->int_id_; + return AVStreams::FlowConnection::_duplicate (flow_connection_entry->int_id_); else ACE_THROW_RETURN (AVStreams::noSuchFlow (),CORBA::Object::_nil ()); } @@ -266,12 +260,20 @@ TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name, ACE_ENDTRY; ACE_CHECK; // add the flowname and the flowconnection to the hashtable. + this->flows_.length (this->flow_count_ + 1); this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name); TAO_String_Hash_Key flow_name_key (flow_name); if (this->flow_connection_map_.bind (flow_name_key,flow_connection) != 0) ACE_THROW (AVStreams::noSuchFlow ());// is this right? } +TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void) +{ + FlowConnection_Map_Iterator iterator (this->flow_connection_map_); + FlowConnection_Map_Entry *entry = 0; + for (;iterator.next (entry) != 0;iterator.advance ()) + CORBA::release (entry->int_id_); +} // ---------------------------------------------------------------------- // TAO_Negotiator @@ -389,13 +391,25 @@ MMDevice_Map_Hash_Key::hash (void) const // ---------------------------------------------------------------------- TAO_StreamCtrl::TAO_StreamCtrl (void) - :mcastconfigif_ (0), - mcastconfigif_ptr_ (0) + :mcastconfigif_ (0) { + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->streamctrl_ = this->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamCtrl::TAO_StreamCtrl"); + } + ACE_ENDTRY; + ACE_CHECK; } TAO_StreamCtrl::~TAO_StreamCtrl (void) { + delete this->mcastconfigif_; } // request the two MMDevices to create vdev and stream endpoints. save @@ -444,8 +458,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, else { this->sep_a_ = - a_party-> create_A (this->_this (ACE_TRY_ENV), - this->vdev_a_, + a_party-> create_A (this->streamctrl_.in (), + this->vdev_a_.out (), the_qos, met_qos, named_vdev.inout (), @@ -457,8 +471,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, // add the mmdevice,sep and vdev to the map. MMDevice_Map_Entry map_entry; MMDevice_Map_Hash_Key key (a_party); - map_entry.sep_ = this->sep_a_; - map_entry.vdev_ = this->vdev_a_; + map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ()); + map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ()); map_entry.flowspec_ = the_flows; map_entry.qos_ = the_qos; result = @@ -484,8 +498,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, { this->sep_b_ = - b_party-> create_B (this->_this (ACE_TRY_ENV), - this->vdev_b_, + b_party-> create_B (this->streamctrl_.in (), + this->vdev_b_.out (), the_qos, met_qos, named_vdev.inout (), @@ -497,14 +511,14 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, ACE_DEBUG ((LM_DEBUG, "\n(%P|%t)stream_endpoint_b_ = %s", - TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_, + TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in (), ACE_TRY_ENV))); ACE_TRY_CHECK; // add the mmdevice,sep and vdev to the map. MMDevice_Map_Entry map_entry; - MMDevice_Map_Hash_Key key (a_party); - map_entry.sep_ = this->sep_a_; - map_entry.vdev_ = this->vdev_a_; + MMDevice_Map_Hash_Key key (b_party); + map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ()); + map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ()); map_entry.flowspec_ = the_flows; map_entry.qos_ = the_qos; int result = @@ -525,8 +539,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, ACE_TRY_CHECK; } // Multicast source being added. - CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->_this (ACE_TRY_ENV), - this->mcastconfigif_ptr_, + CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (), + this->mcastconfigif_ptr_.in (), the_qos, the_flows, ACE_TRY_ENV); @@ -539,7 +553,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, // Multicast sink being added. if (this->mcastconfigif_ != 0) ACE_ERROR_RETURN ((LM_ERROR,"first add a source and then a sink\n"),0); - this->mcastconfigif_->set_peer (this->vdev_b_, + this->mcastconfigif_->set_peer (this->vdev_b_.in (), the_qos, the_flows, ACE_TRY_ENV); @@ -548,7 +562,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, int connect_leaf_success = 0; ACE_TRY_EX (connect_leaf) { - connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_, + connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (), the_qos, the_flows, ACE_TRY_ENV); @@ -578,18 +592,18 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party)) { - if (!CORBA::is_nil (this->vdev_a_) && !CORBA::is_nil (this->vdev_b_)) + if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ())) { // Tell the 2 VDev's about one another - this->vdev_a_->set_peer (this->_this (ACE_TRY_ENV), - this->vdev_b_, + this->vdev_a_->set_peer (this->streamctrl_.in (), + this->vdev_b_.in (), the_qos, the_flows, ACE_TRY_ENV); ACE_TRY_CHECK; - this->vdev_b_->set_peer (this->_this (ACE_TRY_ENV), - this->vdev_a_, + this->vdev_b_->set_peer (this->streamctrl_.in (), + this->vdev_a_.in (), the_qos, the_flows, ACE_TRY_ENV); @@ -599,7 +613,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, // Now connect the streams together. This will // establish the connection CORBA::Boolean result = - this->sep_a_->connect (this->sep_b_, + this->sep_a_->connect (this->sep_b_.in (), the_qos, the_flows, ACE_TRY_ENV); @@ -611,8 +625,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, { // Its full profile // we have feps in the sep then dont call connect instead call bind on the streamctrl. - this->bind (this->sep_a_, - this->sep_b_, + this->bind (this->sep_a_.in (), + this->sep_b_.in (), the_qos, the_flows, ACE_TRY_ENV); @@ -656,17 +670,32 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a, "a_party or b_party null!"), 0); + // Define each other as their peers. + CORBA::Any sep_any; + sep_any <<= sep_b; + sep_a->define_property ("PeerAdapter", + sep_any, + ACE_TRY_ENV); + ACE_TRY_CHECK; + sep_any <<= sep_a; + sep_b->define_property ("PeerAdapter", + sep_any, + ACE_TRY_ENV); + ACE_TRY_CHECK; // since its full profile we do the viable stream setup algorithm. // get the flows for the A streamendpoint. // the flows spec is empty and hence we do a exhaustive match. - AVStreams::flowSpec *a_flows = 0,*b_flows = 0; - CORBA::Any_ptr flows_any; + AVStreams::flowSpec a_flows,b_flows; + CORBA::Any_var flows_any; flows_any = sep_a->get_property_value ("Flows",ACE_TRY_ENV); ACE_TRY_CHECK; - *flows_any >>= a_flows; + AVStreams::flowSpec_ptr temp_flows; + flows_any.in () >>= temp_flows; + a_flows = *temp_flows; flows_any = sep_b->get_property_value ("Flows",ACE_TRY_ENV); ACE_TRY_CHECK; - *flows_any >>= b_flows; + flows_any.in () >>= temp_flows; + b_flows = *temp_flows; u_int i; FlowEndPoint_Map *a_fep_map; FlowEndPoint_Map *b_fep_map; @@ -676,17 +705,17 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a, ACE_NEW_RETURN (b_fep_map, FlowEndPoint_Map, 0); - for (i=0;ilength ();i++) + for (i=0;iget_fep (flowname, - ACE_TRY_ENV); + CORBA::Object_var fep_obj = + sep_a->get_fep (flowname, + ACE_TRY_ENV); ACE_TRY_CHECK; - AVStreams::FlowEndPoint_ptr fep; - fep = AVStreams::FlowEndPoint::_narrow (fep_obj, - ACE_TRY_ENV); + AVStreams::FlowEndPoint_ptr fep = + AVStreams::FlowEndPoint::_narrow (fep_obj.in (), + ACE_TRY_ENV); ACE_TRY_CHECK; TAO_String_Hash_Key fep_key (flowname); result = a_fep_map->bind (fep_key,fep); @@ -694,17 +723,17 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a, ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind failed for %s",flowname)); } // get the flowendpoints for streamendpoint_b - for (i=0;ilength ();i++) + for (i=0;iget_fep (flowname, - ACE_TRY_ENV); + CORBA::Object_var fep_obj = + sep_b->get_fep (flowname, + ACE_TRY_ENV); ACE_TRY_CHECK; - AVStreams::FlowEndPoint_ptr fep; - fep = AVStreams::FlowEndPoint::_narrow (fep_obj, - ACE_TRY_ENV); + AVStreams::FlowEndPoint_ptr fep = + AVStreams::FlowEndPoint::_narrow (fep_obj.in (), + ACE_TRY_ENV); ACE_TRY_CHECK; TAO_String_Hash_Key fep_key (flowname); result = b_fep_map->bind (fep_key,fep); @@ -719,26 +748,40 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a, } else { - FlowEndPoint_Map *spec_fep_map; - ACE_NEW_RETURN (spec_fep_map, + FlowEndPoint_Map *spec_fep_map_a,*spec_fep_map_b; + ACE_NEW_RETURN (spec_fep_map_a, + FlowEndPoint_Map, + 0); + ACE_NEW_RETURN (spec_fep_map_b, FlowEndPoint_Map, 0); for (u_int i=0; i< flow_spec.length ();i++) { - TAO_String_Hash_Key fep_key (flow_spec [i]); + TAO_Forward_FlowSpec_Entry *entry; + ACE_NEW_RETURN (entry, + TAO_Forward_FlowSpec_Entry, + 0); + entry->parse (flow_spec[i].in ()); + TAO_String_Hash_Key fep_key (entry->flowname ()); AVStreams::FlowEndPoint_ptr fep; result = a_fep_map->find (fep_key,fep); if (result == -1) - { - result = b_fep_map->find (fep_key,fep); - if (result == -1) - ACE_ERROR_RETURN ((LM_ERROR,"Fep not found for flowname: %s",flow_spec[i].in ()),0); - } - result = spec_fep_map->bind (fep_key,fep); + ACE_ERROR_RETURN ((LM_ERROR,"Fep not found on A side for flowname: %s",flow_spec[i].in ()),0); + + result = spec_fep_map_a->bind (fep_key,fep); + if (result == -1) + ACE_DEBUG ((LM_DEBUG,"Bind faile for %s",flow_spec[i].in ())); + + result = b_fep_map->find (fep_key,fep); + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR,"Fep not found on B side for flowname: %s",flow_spec[i].in ()),0); + + result = spec_fep_map_b->bind (fep_key,fep); if (result == -1) ACE_DEBUG ((LM_DEBUG,"Bind faile for %s",flow_spec[i].in ())); } - map_a = map_b = spec_fep_map; + map_a = spec_fep_map_a; + map_b = spec_fep_map_b; } TAO_AV_QoS qos (stream_qos); @@ -746,85 +789,115 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a, // uses the first match policy. FlowEndPoint_Map_Iterator a_feps_iterator (*map_a), b_feps_iterator (*map_b); FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry; - for (;a_feps_iterator.next (a_feps_entry) != 0;a_feps_iterator.advance ()) + ACE_TRY_EX (flow_connect) { - for (;b_feps_iterator.next (b_feps_entry) != 0; b_feps_iterator.advance ()) + + for (;a_feps_iterator.next (a_feps_entry) != 0;a_feps_iterator.advance ()) { - AVStreams::FlowEndPoint_ptr fep_a = a_feps_entry->int_id_; - AVStreams::FlowEndPoint_ptr fep_b = b_feps_entry->int_id_; - AVStreams::FlowConnection_ptr flow_connection; - if (fep_b->get_connected_fep () != 0) + for (;b_feps_iterator.next (b_feps_entry) != 0; b_feps_iterator.advance ()) { - if (fep_a->is_fep_compatible (fep_b, - ACE_TRY_ENV) == 1) + AVStreams::FlowEndPoint_ptr fep_a = a_feps_entry->int_id_; + AVStreams::FlowEndPoint_ptr fep_b = b_feps_entry->int_id_; + AVStreams::FlowConnection_var flow_connection; + if (CORBA::is_nil (fep_b->get_connected_fep ())) { - // assume that flow names are same so that we - // can use either of them. - CORBA::Object_ptr flow_connection_obj; - - if (!CORBA::is_nil ((flow_connection_obj = this->get_flow_connection ((*a_flows)[i],ACE_TRY_ENV)))) - { - flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj,ACE_TRY_ENV); - ACE_TRY_CHECK; - } - else - { - TAO_FlowConnection *flowConnection; - ACE_NEW_RETURN (flowConnection, - TAO_FlowConnection, - 0); - flow_connection = flowConnection->_this (ACE_TRY_ENV); - ACE_TRY_CHECK; - } - // make sure that a_feps is flow_producer and b_feps is flow_consumer - // There should be a way to find which flow endpoint is producer and which is consumer. - AVStreams::FlowProducer_ptr producer = AVStreams::FlowProducer::_nil (); - AVStreams::FlowConsumer_ptr consumer = AVStreams::FlowConsumer::_nil (); - ACE_TRY_EX (producer_check) - { - producer = AVStreams::FlowProducer::_narrow (fep_a,ACE_TRY_ENV); - ACE_TRY_CHECK_EX (producer_check); - consumer = - AVStreams::FlowConsumer::_narrow (fep_b,ACE_TRY_ENV); - ACE_TRY_CHECK; - } - ACE_CATCHANY + if (fep_a->is_fep_compatible (fep_b, + ACE_TRY_ENV) == 1) { - producer = AVStreams::FlowProducer::_narrow (fep_b,ACE_TRY_ENV); - ACE_TRY_CHECK; - consumer = AVStreams::FlowConsumer::_narrow (fep_a,ACE_TRY_ENV); - ACE_TRY_CHECK - } - ACE_ENDTRY; - ACE_CHECK_RETURN (0); - CORBA::Any_ptr flowname_any; - char *fep_a_name,*fep_b_name; - flowname_any = fep_a->get_property_value ("FlowName",ACE_TRY_ENV); - *flowname_any >>= fep_a_name; - flowname_any = fep_b->get_property_value ("FlowName",ACE_TRY_ENV); - *flowname_any >>= fep_b_name; - AVStreams::QoS flow_qos; - flow_qos.QoSType = fep_a_name; - flow_qos.QoSParams.length (0); - result = qos.get_flow_qos (fep_a_name,flow_qos); - if (result == -1) - { - flow_qos.QoSType = fep_b_name; - result = qos.get_flow_qos (fep_b_name,flow_qos); + ACE_TRY_CHECK_EX (flow_connect); + // assume that flow names are same so that we + // can use either of them. + CORBA::Object_var flow_connection_obj; + CORBA::Any_var flowname_any = fep_a->get_property_value ("Flow", + ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connect); + char *flowname = 0; + flowname_any.in () >>= flowname; + ACE_TRY_EX (flow_connection) + { + flow_connection_obj = this->get_flow_connection (flowname,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connection); + flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connection); + } + ACE_CATCHANY + { + TAO_FlowConnection *flowConnection; + ACE_NEW_RETURN (flowConnection, + TAO_FlowConnection, + 0); + flow_connection = flowConnection->_this (ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connect); + this->set_flow_connection (flowname, + flow_connection, + ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connect); + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); + + // make sure that a_feps is flow_producer and b_feps is flow_consumer + // There should be a way to find which flow endpoint is producer and which is consumer. + AVStreams::FlowProducer_var producer = AVStreams::FlowProducer::_nil (); + AVStreams::FlowConsumer_var consumer = AVStreams::FlowConsumer::_nil (); + + ACE_TRY_EX (producer_check) + { + producer = AVStreams::FlowProducer::_narrow (fep_a,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (producer_check); + consumer = + AVStreams::FlowConsumer::_narrow (fep_b,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (producer_check); + } + ACE_CATCHANY + { + producer = AVStreams::FlowProducer::_narrow (fep_b,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connect); + consumer = AVStreams::FlowConsumer::_narrow (fep_a,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connect); + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); + CORBA::String_var fep_a_name,fep_b_name; + CORBA::String temp_name; + flowname_any = fep_a->get_property_value ("FlowName",ACE_TRY_ENV); + flowname_any.in () >>= temp_name; + fep_a_name = CORBA::string_dup (temp_name); + flowname_any = fep_b->get_property_value ("FlowName",ACE_TRY_ENV); + flowname_any.in () >>= temp_name; + fep_b_name = CORBA::string_dup (temp_name); + AVStreams::QoS flow_qos; + flow_qos.QoSType = fep_a_name; + flow_qos.QoSParams.length (0); + result = qos.get_flow_qos (fep_a_name.in (),flow_qos); if (result == -1) - ACE_DEBUG ((LM_DEBUG,"No QoS Specified for this flow")); + { + flow_qos.QoSType = fep_b_name; + result = qos.get_flow_qos (fep_b_name.in (),flow_qos); + if (result == -1) + ACE_DEBUG ((LM_DEBUG,"No QoS Specified for this flow")); + } + flow_connection->connect (producer,consumer,flow_qos,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow_connect); } - flow_connection->connect (producer,consumer,flow_qos,ACE_TRY_ENV); } } } } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamCtrl::bind:flow_connect block"); + return 0; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); } ACE_CATCHANY { // error was thrown because one of the streamendpoints is light profile. // Now connect the streams together - this->sep_a_->connect (this->sep_b_, + this->sep_a_->connect (this->sep_b_.in (), stream_qos, flow_spec, ACE_TRY_ENV); @@ -879,8 +952,8 @@ TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev, if (result < 0) return AVStreams::VDev::_nil (); } - sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_); - return AVStreams::VDev::_duplicate (entry.vdev_); + sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ()); + return AVStreams::VDev::_duplicate (entry.vdev_.in ()); } CORBA::Boolean @@ -964,7 +1037,7 @@ TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configurati } ACE_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_MCastConfigIf::set_format"); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_MCastConfigIf::set_configure"); return; } ACE_ENDTRY; @@ -1101,30 +1174,27 @@ TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &the_spec, return 0; } -// // factory method to make a new TCP flow handler. -// int -// TAO_Base_StreamEndPoint::make_tcp_flow_handler (TAO_AV_TCP_Flow_Handler *&sh) -// { -// // ACE_NEW_RETURN (sh, -// // TAO_AV_TCP_Flow_Handler, -// // -1); -// return 0; -// } - -// // factory method to make a new TCP flow handler. -// int -// TAO_Base_StreamEndPoint::make_udp_flow_handler (TAO_AV_UDP_Flow_Handler *&sh) -// { -// return 0; -// } - -// int -// TAO_Base_StreamEndPoint::make_dgram_mcast_flow_handler (TAO_AV_UDP_MCast_Flow_Handler *&sh) -// { -// ACE_NEW_RETURN (sh, -// TAO_AV_UDP_MCast_Flow_Handler, -// -1); -// } +// The following function is for backward compatibility. +CORBA::Boolean +TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &) +{ + return 1; +} + +// The following function is for backward compatibility. +CORBA::Boolean +TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &) +{ + return 1; +} + +// The following function is for backward compatibility. +CORBA::Boolean +TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &, + CORBA::Environment &) +{ + return 1; +} int TAO_Base_StreamEndPoint::set_protocol_object (const char *flowname, @@ -1147,8 +1217,8 @@ TAO_Base_StreamEndPoint::get_callback (const char *flowname, // constructor. TAO_StreamEndPoint::TAO_StreamEndPoint (void) - :flow_count_ (1), - negotiator_ (AVStreams::Negotiator::_nil ()), + :flow_count_ (0), + flow_num_ (0), mcast_port_ (ACE_DEFAULT_MULTICAST_PORT) { this->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR); @@ -1156,27 +1226,6 @@ TAO_StreamEndPoint::TAO_StreamEndPoint (void) } -// The following function is for backward compatibility. -CORBA::Boolean -TAO_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &) -{ - return 1; -} - -// The following function is for backward compatibility. -CORBA::Boolean -TAO_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &) -{ - return 1; -} - -// The following function is for backward compatibility. -CORBA::Boolean -TAO_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &, - CORBA::Environment &) -{ - return 1; -} CORBA::Boolean TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder, @@ -1185,20 +1234,22 @@ TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder, CORBA::Environment &ACE_TRY_ENV) { CORBA::Boolean retv = 0; - this->peer_sep_ = responder; + this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder); ACE_TRY_EX (negotiate) { - if (!CORBA::is_nil (this->negotiator_)) + if (!CORBA::is_nil (this->negotiator_.in ())) { - CORBA::Any_ptr negotiator_any = responder->get_property_value ("Negotiator"); + CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator"); if (negotiator_any != 0) { AVStreams::Negotiator_ptr peer_negotiator; - *negotiator_any >>= peer_negotiator; + negotiator_any.in () >>= peer_negotiator; if (!CORBA::is_nil (peer_negotiator)) { CORBA::Boolean result = - this->negotiator_->negotiate (peer_negotiator,qos,ACE_TRY_ENV); + this->negotiator_->negotiate (peer_negotiator, + qos, + ACE_TRY_ENV); ACE_TRY_CHECK_EX (negotiate); if (!result) ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint::Connect (): negotiate failed\n")); @@ -1216,25 +1267,25 @@ TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder, ACE_TRY_EX (available_protocols) { // choose protocols based on what the remote endpoint can support. - CORBA::Any_ptr protocols_any = + CORBA::Any_var protocols_any = responder->get_property_value ("AvailableProtocols",ACE_TRY_ENV); ACE_TRY_CHECK_EX (available_protocols); - if (protocols_any != 0) + AVStreams::protocolSpec peer_protocols; + AVStreams::protocolSpec_ptr temp_protocols; + protocols_any.in () >>= temp_protocols; + peer_protocols = *temp_protocols; + for (u_int i=0;i>= peer_protocols; - for (u_int i=0;ilength ();i++) - { - if (this->protocol_ != 0) - break; - for (u_int j=0;jprotocols_.length ();j++) - if (ACE_OS::strcmp ((*peer_protocols)[i],this->protocols_[j]) == 0) - { + if (this->protocol_ != 0) + break; + for (u_int j=0;jprotocols_.length ();j++) + if (ACE_OS::strcmp (peer_protocols [i], + this->protocols_[j]) == 0) + { // we'll agree upon the first protocol that matches. - this->protocol_ = CORBA::string_copy ((*peer_protocols) [i]); - break; - } - } + this->protocol_ = CORBA::string_copy (peer_protocols [i]); + break; + } } } ACE_CATCHANY @@ -1277,7 +1328,9 @@ TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder, if (result < 0) ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_forward_flows failed\n"),0); - retv = responder->request_connection (this->_this (ACE_TRY_ENV), + AVStreams::StreamEndPoint_var streamendpoint = this->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + retv = responder->request_connection (streamendpoint, 0, network_qos, flow_spec, @@ -1557,17 +1610,36 @@ TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj, { // exception implies the flow name is not defined and is system generated. ACE_OS::sprintf (flow_name,"flow%d",flow_num_++); + ACE_TRY_EX (flow) + { + // exception implies the flow name is not defined and is system generated. + ACE_OS::sprintf (flow_name,"flow%d",flow_num_++); + CORBA::Any flowname_any; + flowname_any <<= flow_name; + fep->define_property ("Flow",flowname_any,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint::add_fep"); + return 0; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); } ACE_ENDTRY; - // Add it to the sequence of flowNames supported. - // put the flowname and the flowendpoint in a hashtable. - TAO_String_Hash_Key fep_name_key (flow_name); - if (this->fep_map_.bind (fep_name_key,fep) != 0) - { - ACE_THROW_RETURN (AVStreams::streamOpFailed (),0); - } - ACE_TRY_EX (flows) + ACE_CHECK_RETURN (0); + ACE_TRY { + fep->lock (ACE_TRY_ENV); + ACE_TRY_CHECK; + // Add it to the sequence of flowNames supported. + // put the flowname and the flowendpoint in a hashtable. + TAO_String_Hash_Key fep_name_key (CORBA::string_dup (flow_name)); + if (this->fep_map_.bind (fep_name_key,fep) != 0) + { + ACE_THROW_RETURN (AVStreams::streamOpFailed (),0); + } // increment the flow count. this->flow_count_++; this->flows_.length (this->flow_count_); @@ -1578,7 +1650,7 @@ TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj, this->define_property ("Flows", flows_any, ACE_TRY_ENV); - ACE_TRY_CHECK_EX (flows); + ACE_TRY_CHECK; } ACE_CATCHANY { @@ -1698,6 +1770,20 @@ TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &the_qos, TAO_StreamEndPoint::~TAO_StreamEndPoint (void) { //this->handle_close (); + TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin (); + TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end (); + for ( ; begin != end; ++begin) + { + TAO_FlowSpec_Entry *entry = *begin; + delete entry; + } + begin = this->reverse_flow_spec_set.begin (); + end = this->reverse_flow_spec_set.end (); + for (; begin != end; ++begin) + { + TAO_FlowSpec_Entry *entry = *begin; + delete entry; + } } @@ -1716,15 +1802,34 @@ TAO_StreamEndPoint_A::start (const AVStreams::flowSpec &flow_spec, ACE_THROW_SPEC ((CORBA::SystemException, AVStreams::noSuchFlow)) { - TAO_StreamEndPoint::start (flow_spec,ACE_TRY_ENV); - ACE_CHECK; - this->peer_sep_->start (flow_spec,ACE_TRY_ENV); + ACE_TRY + { + TAO_StreamEndPoint::start (flow_spec,ACE_TRY_ENV); + ACE_TRY_CHECK; + if (CORBA::is_nil (this->peer_sep_.in ())) + { + CORBA::Any_var sep_any; + sep_any = this->get_property_value ("PeerAdapter", + ACE_TRY_ENV); + ACE_TRY_CHECK; + AVStreams::StreamEndPoint_B_ptr sep; + sep_any.in () >>= sep; + this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (sep); + } + this->peer_sep_->start (flow_spec,ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint_A::start"); + } + ACE_ENDTRY; ACE_CHECK; } void TAO_StreamEndPoint_A::stop (const AVStreams::flowSpec &flow_spec, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException, AVStreams::noSuchFlow)) { @@ -1753,16 +1858,15 @@ TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos, { ACE_TRY_EX (narrow) { - AVStreams::QoS flow_qos; result = qos.get_flow_qos (forward_entry.flowname (),flow_qos); if (result < 0) ACE_DEBUG ((LM_DEBUG,"QoS not found for %s",forward_entry.flowname ())); // Narrow it to FlowProducer. - AVStreams::FlowProducer_ptr producer = AVStreams::FlowProducer::_narrow (flow_endpoint,ACE_TRY_ENV); + AVStreams::FlowProducer_var producer = AVStreams::FlowProducer::_narrow (flow_endpoint,ACE_TRY_ENV); ACE_TRY_CHECK_EX (narrow); // Else narrow succeeeded. - if (!CORBA::is_nil (producer)) + if (!CORBA::is_nil (producer.in ())) { CORBA::Boolean is_met (0); CORBA::String_var address = @@ -1967,21 +2071,18 @@ TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl, ACE_TRY_CHECK; - this->streamctrl_ = the_ctrl; - this->peer_ = the_peer_dev; + this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl); + this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev); - CORBA::Any_ptr anyptr; + CORBA::Any_var anyptr; CORBA::String media_ctrl_ior; anyptr = this->peer_->get_property_value ("Related_MediaCtrl", ACE_TRY_ENV); ACE_TRY_CHECK; - if (anyptr != 0) - { - *anyptr >>= media_ctrl_ior; - ACE_DEBUG ((LM_DEBUG,"(%P|%t)The Media Control IOR is %s\n", - media_ctrl_ior)); - } + anyptr.in () >>= media_ctrl_ior; + ACE_DEBUG ((LM_DEBUG,"(%P|%t)The Media Control IOR is %s\n", + media_ctrl_ior)); CORBA::Object_ptr media_ctrl_obj = TAO_ORB_Core_instance ()->orb ()->string_to_object (media_ctrl_ior,ACE_TRY_ENV); @@ -2022,7 +2123,7 @@ TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr /* the_ctrl */, AVStreams::QoSRequestFailed, AVStreams::streamOpFailed)) { - this->mcast_peer_ = mcast_peer; + this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer); return 1; } @@ -2125,7 +2226,10 @@ TAO_VDev::~TAO_VDev (void) TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy) - : endpoint_strategy_ (endpoint_strategy) + : endpoint_strategy_ (endpoint_strategy), + flow_count_ (0), + flow_num_ (0), + stream_ctrl_ (0) { } @@ -2146,17 +2250,18 @@ TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device, ACE_TRY { ACE_UNUSED_ARG (is_met); - TAO_StreamCtrl *stream_ctrl; - ACE_NEW_RETURN (stream_ctrl, + ACE_NEW_RETURN (this->stream_ctrl_, TAO_StreamCtrl, 0); - stream_ctrl->bind_devs (peer_device, - AVStreams::MMDevice::_duplicate (this->_this (ACE_TRY_ENV)), - the_qos, - the_spec, - ACE_TRY_ENV); + AVStreams::MMDevice_var mmdevice = this->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + this->stream_ctrl_->bind_devs (peer_device, + mmdevice.in (), + the_qos, + the_spec, + ACE_TRY_ENV); ACE_TRY_CHECK; - streamctrl = stream_ctrl->_this (ACE_TRY_ENV); + streamctrl = this->stream_ctrl_->_this (ACE_TRY_ENV); ACE_TRY_CHECK; } ACE_CATCHANY @@ -2250,14 +2355,14 @@ TAO_MMDevice::create_A_B (MMDevice_Type type, forward_entry.parse (flow_spec[i]); TAO_String_Hash_Key flow_key (forward_entry.flowname ()); AVStreams::FDev_ptr flow_dev; - AVStreams::FlowConnection_ptr flowconnection = AVStreams::FlowConnection::_nil (); + AVStreams::FlowConnection_var flowconnection = AVStreams::FlowConnection::_nil (); ACE_TRY_EX (flowconnection) { // Get the flowconnection for this flow. CORBA::Object_var flowconnection_obj = streamctrl->get_flow_connection (forward_entry.flowname (),ACE_TRY_ENV); ACE_TRY_CHECK_EX (flowconnection); - if (!CORBA::is_nil (flowconnection_obj)) + if (!CORBA::is_nil (flowconnection_obj.in ())) { flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj,ACE_TRY_ENV); ACE_TRY_CHECK_EX (flowconnection); @@ -2274,7 +2379,7 @@ TAO_MMDevice::create_A_B (MMDevice_Type type, if (result < 0) ACE_ERROR_RETURN ((LM_ERROR,"fdev_map::find failed\n"),0); CORBA::String_var named_fdev; - AVStreams::FlowEndPoint_ptr flow_endpoint = AVStreams::FlowEndPoint::_nil (); + AVStreams::FlowEndPoint_var flow_endpoint; AVStreams::QoS flow_qos; result = qos.get_flow_qos (forward_entry.flowname (),flow_qos); if (result < 0) @@ -2291,7 +2396,7 @@ TAO_MMDevice::create_A_B (MMDevice_Type type, // hence A is the producer for this flow and B is the consumer for this flow. // We have to create a producer from the FDev for this flow. flow_endpoint = - flow_dev->create_producer (flowconnection, + flow_dev->create_producer (flowconnection.in (), flow_qos, met_qos, named_fdev.inout (), @@ -2301,7 +2406,7 @@ TAO_MMDevice::create_A_B (MMDevice_Type type, case MMDEVICE_B: { flow_endpoint = - flow_dev->create_consumer (flowconnection, + flow_dev->create_consumer (flowconnection.in (), flow_qos, met_qos, named_fdev.inout (), @@ -2377,7 +2482,7 @@ TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl, const AVStreams::flowSpec &flow_spec, CORBA::Environment &ACE_TRY_ENV) { - AVStreams::StreamEndPoint_A_ptr sep_a = AVStreams::StreamEndPoint_A::_nil (); + AVStreams::StreamEndPoint_A_ptr sep_a; AVStreams::StreamEndPoint_ptr sep; ACE_TRY { @@ -2470,17 +2575,32 @@ TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj, } ACE_CATCHANY { - // exception implies the flow name is not defined and is system generated. - ACE_OS::sprintf (flow_name,"flow%d",flow_num_++); + ACE_TRY_EX (flow) + { + // exception implies the flow name is not defined and is system generated. + ACE_OS::sprintf (flow_name,"flow%d",flow_num_++); + CORBA::Any flowname_any; + flowname_any <<= flow_name; + fdev->define_property ("Flow",flowname_any,ACE_TRY_ENV); + ACE_TRY_CHECK_EX (flow); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_MMDevice::add_fdev"); + return 0; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); } ACE_ENDTRY; + ACE_CHECK_RETURN (0); if (CORBA::is_nil (fdev)) return 0; // Add it to the sequence of flowNames supported. // put the flowname and the fdev in a hashtable. - TAO_String_Hash_Key fdev_name_key (flow_name); + TAO_String_Hash_Key fdev_name_key (CORBA::string_dup (flow_name)); if (this->fdev_map_.bind (fdev_name_key,fdev) != 0) ACE_THROW_RETURN (AVStreams::streamOpFailed (),0); @@ -2521,7 +2641,7 @@ TAO_MMDevice::get_fdev (const char *flow_name, TAO_String_Hash_Key fdev_name_key (flow_name); FDev_Map::ENTRY *fdev_entry = 0; if (this->fdev_map_.find (fdev_name_key,fdev_entry) == 0) - return fdev_entry->int_id_; + return AVStreams::FDev::_duplicate (fdev_entry->int_id_); return 0; } @@ -2568,6 +2688,11 @@ TAO_MMDevice::remove_fdev (const char *flow_name, // destructor. TAO_MMDevice::~TAO_MMDevice (void) { + delete this->stream_ctrl_; + FDev_Map_Iterator iterator (fdev_map_); + FDev_Map_Entry *entry = 0; + for (;iterator.next (entry) != 0; iterator.advance ()) + CORBA::release (entry->int_id_); } //------------------------------------------------------------------ @@ -2576,9 +2701,8 @@ TAO_MMDevice::~TAO_MMDevice (void) // default constructor. TAO_FlowConnection::TAO_FlowConnection (void) - :producer_ (0), - consumer_ (0), - fp_name_ (0) + :fp_name_ (CORBA::string_dup ("")), + ip_multicast_ (1) { } @@ -2587,7 +2711,20 @@ void TAO_FlowConnection::stop (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - ACE_UNUSED_ARG (ACE_TRY_ENV); + ACE_TRY + { + this->flow_producer_->stop (ACE_TRY_ENV); + ACE_TRY_CHECK; + this->flow_consumer_->stop (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::stop"); + return; + } + ACE_ENDTRY; + ACE_CHECK; } // start this flow. @@ -2595,13 +2732,30 @@ void TAO_FlowConnection::start (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - ACE_UNUSED_ARG (ACE_TRY_ENV); + ACE_TRY + { + this->flow_consumer_->start (ACE_TRY_ENV); + ACE_TRY_CHECK; + this->flow_producer_->start (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::start"); + return; + } + ACE_ENDTRY; + ACE_CHECK; } // destroy this flow. void TAO_FlowConnection::destroy (CORBA::Environment &ACE_TRY_ENV) { + this->flow_producer_->destroy (ACE_TRY_ENV); + ACE_CHECK; + this->flow_consumer_->destroy (ACE_TRY_ENV); + ACE_CHECK; int result = deactivate_servant (this); if (result < 0) ACE_DEBUG ((LM_DEBUG,"TAO_FlowConnection::destroy failed\n")); @@ -2628,9 +2782,12 @@ TAO_FlowConnection::use_flow_protocol (const char * fp_name, AVStreams::FPError, AVStreams::notSupported)) { - ACE_UNUSED_ARG (fp_settings); - ACE_UNUSED_ARG (ACE_TRY_ENV); - this->fp_name_ = (char *)fp_name; + this->fp_name_ = fp_name; + this->fp_settings_ = fp_settings; + this->flow_producer_->use_flow_protocol (fp_name,fp_settings,ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + this->flow_consumer_->use_flow_protocol (fp_name,fp_settings,ACE_TRY_ENV); + ACE_CHECK_RETURN (0); return 1; } @@ -2646,18 +2803,48 @@ TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event, CORBA::Boolean TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party, AVStreams::FDev_ptr b_party, - AVStreams::QoS & the_qos, + AVStreams::QoS & flow_qos, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException, AVStreams::streamOpFailed, AVStreams::streamOpDenied, AVStreams::QoSRequestFailed)) { - ACE_UNUSED_ARG (a_party); - ACE_UNUSED_ARG (b_party); - ACE_UNUSED_ARG (the_qos); - ACE_UNUSED_ARG (ACE_TRY_ENV); - return 0; + CORBA::Boolean result = 0; + ACE_TRY + { + AVStreams::FlowConnection_var flowconnection = this->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + CORBA::Boolean met_qos; + CORBA::String_var named_fdev ((const char *)""); + AVStreams::FlowProducer_var producer = + a_party->create_producer (flowconnection, + flow_qos, + met_qos, + named_fdev.inout (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + AVStreams::FlowConsumer_var consumer = + b_party->create_consumer (flowconnection, + flow_qos, + met_qos, + named_fdev.inout (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + result = this->connect (producer.in (), + consumer.in (), + flow_qos, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::connect_devs"); + return 0; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); + return result; } // connect the producer and the consumer @@ -2673,33 +2860,35 @@ TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr flow_producer, { ACE_TRY { - this->producer_ = flow_producer; - this->consumer_ = flow_consumer; - - this->producer_->set_peer (this->_this (ACE_TRY_ENV), - this->consumer_, - the_qos, - ACE_TRY_ENV); + this->flow_producer_ = AVStreams::FlowProducer::_duplicate (flow_producer); + this->flow_consumer_ = AVStreams::FlowConsumer::_duplicate (flow_consumer); + + AVStreams::FlowConnection_var flowconnection = + this->_this (ACE_TRY_ENV); + this->flow_producer_->set_peer (flowconnection.in (), + this->flow_consumer_.in (), + the_qos, + ACE_TRY_ENV); ACE_TRY_CHECK; - this->consumer_->set_peer (this->_this (ACE_TRY_ENV), - this->producer_, - the_qos, - ACE_TRY_ENV); + this->flow_consumer_->set_peer (flowconnection.in (), + this->flow_producer_.in (), + the_qos, + ACE_TRY_ENV); ACE_TRY_CHECK; char *consumer_address = - this->consumer_->go_to_listen (the_qos, - 0,// false for is_mcast - this->producer_, - this->fp_name_, - ACE_TRY_ENV); + this->flow_consumer_->go_to_listen (the_qos, + 0,// false for is_mcast + this->flow_producer_.in (), + this->fp_name_.inout (), + ACE_TRY_ENV); ACE_TRY_CHECK; - this->producer_->connect_to_peer (the_qos, - consumer_address, - this->fp_name_, - ACE_TRY_ENV); + this->flow_producer_->connect_to_peer (the_qos, + consumer_address, + this->fp_name_.inout (), + ACE_TRY_ENV); ACE_TRY_CHECK; } ACE_CATCHANY @@ -2728,9 +2917,36 @@ TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr flow_producer, AVStreams::alreadyConnected, AVStreams::notSupported)) { - ACE_UNUSED_ARG (the_qos); - ACE_UNUSED_ARG (ACE_TRY_ENV); - this->producer_ = flow_producer; + ACE_TRY + { + this->flow_producer_ = AVStreams::FlowProducer::_duplicate (flow_producer); + CORBA::Boolean met_qos; + char *mcast_address = ""; + char *address = this->flow_producer_->connect_mcast (the_qos, + met_qos, + mcast_address, + this->fp_name_.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + TAO_Forward_FlowSpec_Entry entry ("","","","",address); + if (entry.address () != 0) + { + // Internet multicasting is in use. + this->producer_address_ = address; + } + else + { + // ATM Multicasting is in use. + this->ip_multicast_ = 0; + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::add_producer"); + return 0; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); return 1; } @@ -2741,9 +2957,50 @@ TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr flow_consumer, ACE_THROW_SPEC ((CORBA::SystemException, AVStreams::alreadyConnected)) { - ACE_UNUSED_ARG (the_qos); - ACE_UNUSED_ARG (ACE_TRY_ENV); - this->consumer_ = flow_consumer; + ACE_TRY + { + this->flow_consumer_ = AVStreams::FlowConsumer::_duplicate (flow_consumer); + AVStreams::protocolSpec protocols (1); + protocols.length (1); + protocols [0] = CORBA::string_dup (this->producer_address_); + if (!this->ip_multicast_) + { + this->flow_consumer_->set_protocol_restriction (protocols, + ACE_TRY_ENV); + ACE_TRY_CHECK; + char * address = + this->flow_consumer_->go_to_listen (the_qos, + 1, + this->flow_producer_.in (), + this->fp_name_.inout (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + CORBA::Boolean is_met; + this->flow_producer_->connect_mcast (the_qos, + is_met, + address, + this->fp_name_.inout (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + else + { + // IP Multicasting. + this->flow_consumer_->go_to_listen (the_qos, + 1, + this->flow_producer_.in (), + this->fp_name_.inout (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::add_consumer"); + return 0; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (0); return 1; } @@ -2764,9 +3021,69 @@ TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target, //default constructor. TAO_FlowEndPoint::TAO_FlowEndPoint (void) - :related_sep_ (0), - related_flow_connection_ (0) + :lock_ (0) +{ +} + +TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname, + AVStreams::protocolSpec &protocols, + const char *format) +{ + this->open (flowname,protocols,format); +} + +int +TAO_FlowEndPoint::open (const char *flowname, + AVStreams::protocolSpec &protocols, + const char *format) +{ + this->flowname_ = flowname; + this->format_ = format; + + ACE_DEBUG ((LM_DEBUG,"TAO_FlowEndPoint::open\n")); + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + CORBA::Any flowname_any; + flowname_any <<= flowname; + this->define_property ("Flow", + flowname_any, + ACE_TRY_ENV); + ACE_TRY_CHECK; + this->set_format (format, + ACE_TRY_ENV); + ACE_TRY_CHECK; + this->protocol_addresses_ = protocols; + AVStreams::protocolSpec protocol_spec (protocols.length ()); + protocol_spec.length (protocols.length ()); + ACE_DEBUG ((LM_DEBUG,"%N:%l\n")); + for (u_int i=0;iset_protocol_restriction (protocol_spec, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowEndPoint::open"); + return -1; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (-1); + return 0; +} + + +int +TAO_FlowEndPoint::set_flowname (const char *flowname) { + this->flowname_ = flowname; + return 0; } // used by one flowconnection so that multiple connections cant use @@ -2777,7 +3094,10 @@ TAO_FlowEndPoint::lock (CORBA::Environment &ACE_TRY_ENV) { // lock the current flowendpoint ACE_UNUSED_ARG (ACE_TRY_ENV); - return 0; + if (this->lock_) + return 0; + this->lock_ = 1; + return 1; } // unlocks the flowendpoint ,becomes free to be used in another flow. @@ -2786,6 +3106,7 @@ TAO_FlowEndPoint::unlock (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { ACE_UNUSED_ARG (ACE_TRY_ENV); + this->lock_ = 0; } // The start,stop and destroy are to be handled by the application. @@ -2793,14 +3114,14 @@ void TAO_FlowEndPoint::stop (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - ACE_UNUSED_ARG (ACE_TRY_ENV); + this->protocol_object_->stop (); } void TAO_FlowEndPoint::start (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - ACE_UNUSED_ARG (ACE_TRY_ENV); + this->protocol_object_->start (); } void @@ -2817,7 +3138,7 @@ TAO_FlowEndPoint::related_sep (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { ACE_UNUSED_ARG (ACE_TRY_ENV); - return this->related_sep_; + return AVStreams::StreamEndPoint::_duplicate (this->related_sep_); } void @@ -2826,7 +3147,7 @@ TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep, ACE_THROW_SPEC ((CORBA::SystemException)) { ACE_UNUSED_ARG (ACE_TRY_ENV); - this->related_sep_ = related_sep; + this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep); } AVStreams::FlowConnection_ptr @@ -2834,7 +3155,7 @@ TAO_FlowEndPoint::related_flow_connection (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { ACE_UNUSED_ARG (ACE_TRY_ENV); - return this->related_flow_connection_; + return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_); } void @@ -2843,7 +3164,7 @@ TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related ACE_THROW_SPEC ((CORBA::SystemException)) { ACE_UNUSED_ARG (ACE_TRY_ENV); - this->related_flow_connection_ = related_flow_connection; + this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection); } // returns the connected peer for this flow @@ -2892,6 +3213,7 @@ TAO_FlowEndPoint::set_format (const char * format, ACE_THROW_SPEC ((CORBA::SystemException, AVStreams::notSupported)) { + this->format_ = format; ACE_TRY { // make this a property so that is_fep_compatible can query this and @@ -2918,6 +3240,7 @@ TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_set AVStreams::PropertyException, AVStreams::streamOpFailed)) { + this->dev_params_ = new_settings; ACE_TRY { CORBA::Any DevParams_property; @@ -2943,12 +3266,29 @@ TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & prot { ACE_TRY { + ACE_DEBUG ((LM_DEBUG,"%N:%l\n")); + for (u_int i=0;idefine_property ("AvailableProtocols", AvailableProtocols_property, ACE_TRY_ENV); ACE_TRY_CHECK; + AVStreams::protocolSpec *temp_spec; + CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols", + ACE_TRY_ENV); + ACE_TRY_CHECK; + temp_any.in () >>= temp_spec; + ACE_DEBUG ((LM_DEBUG,"%N:%l\n")); + for (i=0;ilength ();i++) + { + const char *protocol = (*temp_spec)[i].in (); + ACE_DEBUG ((LM_DEBUG,"%s\n",protocol)); + } this->protocols_ = protocols; } ACE_CATCHANY @@ -2971,58 +3311,48 @@ TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep, // check whether the passed flowendpoint is compatible with this flowendpoint. // should we check for the availableFormats and choose one format. // get my format value - CORBA::Any_ptr format_ptr; - CORBA::String my_format,peer_format; + CORBA::Any_var format_ptr; + CORBA::String_var my_format,peer_format; + CORBA::String temp_format; format_ptr = this->get_property_value ("Format", ACE_TRY_ENV); ACE_TRY_CHECK; - if (format_ptr != 0) - *format_ptr >>= my_format; - else - // property is not defined - ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) TAO_FlowEndPoint::is_fep_compatible"),0); - + format_ptr.in () >>= temp_format; + my_format = CORBA::string_dup (temp_format); // get my peer's format value - format_ptr = peer_fep->get_property_value ("Format", ACE_TRY_ENV); ACE_TRY_CHECK; - if (format_ptr != 0) - *format_ptr >>= peer_format; - else - // property is not defined - ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) TAO_FlowEndPoint::is_fep_compatible"),0); - - if (ACE_OS::strcmp (my_format,peer_format) != 0) + format_ptr.in () >>= temp_format; + peer_format = CORBA::string_dup (temp_format); + if (ACE_OS::strcmp (my_format.in (), + peer_format.in ()) != 0) return 0; // since formats are same, check for a common protocol - CORBA::Any* AvailableProtocols_ptr; - AVStreams::protocolSpec *my_protocol_spec,*peer_protocol_spec; - + CORBA::Any_var AvailableProtocols_ptr; + AVStreams::protocolSpec my_protocol_spec,peer_protocol_spec; + AVStreams::protocolSpec_ptr temp_protocols;; AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols", ACE_TRY_ENV); ACE_TRY_CHECK; - if (AvailableProtocols_ptr != 0) - *AvailableProtocols_ptr >>= my_protocol_spec; - else - ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) TAO_FlowEndPoint::is_fep_compatible"),0); - + AvailableProtocols_ptr.in () >>= temp_protocols; + my_protocol_spec = *temp_protocols; AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols", ACE_TRY_ENV); ACE_TRY_CHECK; - if (AvailableProtocols_ptr != 0) - *AvailableProtocols_ptr >>= peer_protocol_spec; + AvailableProtocols_ptr.in () >>= temp_protocols; + peer_protocol_spec = *temp_protocols; int protocol_match = 0; - for (u_int i=0;i<(*my_protocol_spec).length ();i++) + for (u_int i=0;ihandle_go_to_listen (the_qos,is_mcast,peer,flowProtocol,ACE_TRY_ENV); -} + AVStreams::protocolSpec my_protocol_spec,peer_protocol_spec; + AVStreams::protocolSpec_ptr temp_protocols; + CORBA::Any_var AvailableProtocols_ptr = + peer_fep->get_property_value ("AvailableProtocols", + ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + AvailableProtocols_ptr.in () >>= temp_protocols; + peer_protocol_spec = *temp_protocols; + AvailableProtocols_ptr = + this->get_property_value ("AvailableProtocols", + ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + AvailableProtocols_ptr.in () >>= temp_protocols; + my_protocol_spec = *temp_protocols; + int protocol_match = 0; + CORBA::String_var listen_protocol; + for (u_int i=0;iprotocol_addresses_.length ();j++) + if (ACE_OS::strncmp (this->protocol_addresses_ [i],listen_protocol,ACE_OS::strlen (listen_protocol)) == 0) + { + // Now listen on that protocol. + TAO_Forward_FlowSpec_Entry *entry; + ACE_NEW_RETURN (entry, + TAO_Forward_FlowSpec_Entry (this->flowname_.in (), + CORBA::string_dup (""), + this->format_.in (), + flowProtocol, + this->protocol_addresses_ [i]), + 0); + + TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry (); + TAO_AV_FlowSpecSet flow_spec_set; + flow_spec_set.insert (entry); + int result = acceptor_registry->open (this, + TAO_AV_CORE::instance (), + flow_spec_set); + if (result < 0) + return 0; + char *listen_address = entry->get_local_addr_str (); + char *address; + ACE_NEW_RETURN (address, + char [BUFSIZ], + 0); + ACE_OS::sprintf (address,"%s=%s",listen_protocol.in (),listen_address); + return address; + } return 0; } + CORBA::Boolean TAO_FlowEndPoint::connect_to_peer (AVStreams::QoS & the_qos, const char * address, @@ -3104,25 +3491,34 @@ TAO_FlowEndPoint::connect_to_peer (AVStreams::QoS & the_qos, AVStreams::FPError, AVStreams::QoSRequestFailed)) { - // Right now since the A/V framework doesnt bother about the - // protocols we leave it to the application to handle the connection - // to its peer. When A/V Streams implements common protocol - // interaction like UDP and TCP this will be handled by the - // framework. - - return this->handle_connect_to_peer (the_qos,address,use_flow_protocol,ACE_TRY_ENV); + TAO_Forward_FlowSpec_Entry *entry; + ACE_NEW_RETURN (entry, + TAO_Forward_FlowSpec_Entry (this->flowname_.in (), + CORBA::string_dup (""), + this->format_.in (), + CORBA::string_dup (""), + address), + 0); + TAO_AV_FlowSpecSet flow_spec_set; + flow_spec_set.insert (entry); + TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry (); + int result = connector_registry->open (this, + TAO_AV_CORE::instance (), + flow_spec_set); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_FlowEndPoint::connector_registry::open failed\n"),0); + return 1; } -CORBA::Boolean -TAO_FlowEndPoint::handle_connect_to_peer (AVStreams::QoS & /* the_qos */, - const char * /* address */, - const char * /* use_flow_protocol */, - CORBA::Environment &/* ACE_TRY_ENV */) +int +TAO_FlowEndPoint::set_protocol_object (const char *flowname, + TAO_AV_Protocol_Object *object) { + ACE_DEBUG ((LM_DEBUG,"TAO_FlowEndPoint::set_protocol_object\n")); + this->protocol_object_ = object; return 0; } - // ------------------------------------------------------------ // TAO_FlowProducer class // ------------------------------------------------------------ @@ -3132,6 +3528,13 @@ TAO_FlowProducer::TAO_FlowProducer (void) { } +TAO_FlowProducer::TAO_FlowProducer (const char *flowname, + AVStreams::protocolSpec protocols, + const char *format) +{ + this->open (flowname,protocols,format); +} + // multicast is currently not supported char * TAO_FlowProducer::connect_mcast (AVStreams::QoS & /* the_qos */, @@ -3199,6 +3602,13 @@ TAO_FlowConsumer::TAO_FlowConsumer (void) { } +TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname, + AVStreams::protocolSpec protocols, + const char *format) +{ + this->open (flowname,protocols,format); +} + // ------------------------------------------------------------ // TAO_FDev // ------------------------------------------------------------ @@ -3208,6 +3618,27 @@ TAO_FDev::TAO_FDev (void) { } +TAO_FDev::TAO_FDev (const char *flowname) + :flowname_ (flowname) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + CORBA::Any flowname_any; + flowname_any <<= flowname; + this->define_property ("Flow", + flowname_any, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FDev::TAO_FDev"); + } + ACE_ENDTRY; + ACE_CHECK; +} + AVStreams::FlowProducer_ptr TAO_FDev::create_producer (AVStreams::FlowConnection_ptr the_requester, AVStreams::QoS & the_qos, @@ -3384,9 +3815,16 @@ TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter) TAO_Tokenizer::~TAO_Tokenizer () { - // CORBA::string_free (this->string_); +// ACE_Array_Iterator iterator (this->token_array_); +// char **entry = 0; +// for (; iterator.next (entry) != 0; iterator.advance ()) +// { +// ACE_DEBUG ((LM_DEBUG,"%s\n",*entry)); +// CORBA::string_free (*entry); +// } } + int TAO_Tokenizer::parse (const char *string,char delimiter) { @@ -3447,7 +3885,7 @@ char* TAO_Tokenizer::token (void) { if (count_ < num_tokens_) - return this->token_array_[this->count_++]; + return CORBA::string_dup (this->token_array_[this->count_++]); else return 0; } @@ -3463,7 +3901,7 @@ TAO_Tokenizer::operator [] (size_t index) const { if (index >= this->num_tokens_) return 0; - return this->token_array_[index]; + return CORBA::string_dup (this->token_array_[index]); } //------------------------------------------------------------ @@ -3491,10 +3929,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (void) // constructor. TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname, - char *direction, - char *format_name, - char *flow_protocol, - char *carrier_protocol, + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *carrier_protocol, ACE_Addr *address) :address_ (address), address_str_ (0), @@ -3512,7 +3950,35 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname, protocol_object_ (0) { this->set_protocol (); - this->set_direction (direction); + this->set_direction (this->direction_str_); +} + +TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname, + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *address) + :address_ (0), + address_str_ (CORBA::string_dup (address)), + format_ (CORBA::string_dup (format_name)), + direction_str_ (CORBA::string_dup (direction)), + flowname_ (CORBA::string_dup (flowname)), + carrier_protocol_ (0), + flow_protocol_ (CORBA::string_dup (flow_protocol)), + use_flow_protocol_ (0), + entry_ (0), + peer_addr_ (0), + local_addr_ (0), + transport_ (0), + handler_ (0), + protocol_object_ (0) +{ + ACE_CString cstring(this->address_str_,0,0); + int colon_pos = cstring.find (':'); + if (colon_pos != cstring.npos) + cstring[colon_pos] = ';'; + this->address_str_ = cstring.rep (); + this->parse_address (this->address_str_); } // Destructor. @@ -3522,7 +3988,6 @@ TAO_FlowSpec_Entry::~TAO_FlowSpec_Entry (void) CORBA::string_free (this->direction_str_); CORBA::string_free (this->flowname_); CORBA::string_free (this->carrier_protocol_); - CORBA::string_free (this->flowname_); } int @@ -3726,6 +4191,31 @@ TAO_FlowSpec_Entry::get_local_addr (void) return this->local_addr_; } +char * +TAO_FlowSpec_Entry::get_local_addr_str (void) +{ + switch (this->local_addr_->get_type ()) + { + case AF_INET: + { + char *buf; + ACE_NEW_RETURN (buf, + char [BUFSIZ], + 0); + ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr *,this->local_addr_); + inet_addr->addr_to_string (buf,BUFSIZ); + ACE_CString cstring (buf,0,0); + int colon_pos = cstring.find (':'); + if (colon_pos != cstring.npos) + cstring[colon_pos] = ';'; + return cstring.rep (); + } + break; + default: + ACE_ERROR_RETURN ((LM_ERROR,"Address family not supported"),0); + } +} + TAO_AV_Transport* TAO_FlowSpec_Entry::transport (void) { @@ -3773,10 +4263,10 @@ TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (void) // constructor to construct the entry from the arguments. TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (const char *flowname, - char *direction, - char *format_name , - char *flow_protocol , - char *carrier_protocol , + const char *direction, + const char *format_name , + const char *flow_protocol , + const char *carrier_protocol , ACE_Addr *address ) :TAO_FlowSpec_Entry (flowname, direction, @@ -3787,6 +4277,20 @@ TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (const char *flowname, { } +// constructor to construct the entry from the arguments. +TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (const char *flowname, + const char *direction, + const char *format_name , + const char *flow_protocol , + const char *address ) + :TAO_FlowSpec_Entry (flowname, + direction, + format_name, + flow_protocol, + address) +{ +} + int TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry) { @@ -3876,11 +4380,11 @@ TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (void) // constructor to construct an entry from the arguments. TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname, - char *direction, - char *format_name , - char *flow_protocol , - char *carrier_protocol , - ACE_Addr *address ) + const char *direction, + const char *format_name , + const char *flow_protocol , + const char *carrier_protocol, + ACE_Addr *address) :TAO_FlowSpec_Entry (flowname, direction, format_name, @@ -3890,6 +4394,20 @@ TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname, { } +// constructor to construct an entry from the arguments. +TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname, + const char *direction, + const char *format_name , + const char *flow_protocol , + const char *address) + :TAO_FlowSpec_Entry (flowname, + direction, + format_name, + flow_protocol, + address) +{ +} + int TAO_Reverse_FlowSpec_Entry::parse (const char *flowSpec_entry) { @@ -3988,7 +4506,7 @@ TAO_AV_QoS::set (AVStreams::streamQoS &stream_qos) for (u_int j=0;jstream_qos_.length ();j++) { - TAO_String_Hash_Key qos_key (this->stream_qos_[j].QoSType); + TAO_String_Hash_Key qos_key (CORBA::string_dup (this->stream_qos_[j].QoSType)); int result = this->qos_map_.bind (qos_key,this->stream_qos_[j]); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR,"qos_map::bind failed\n"),-1); diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h index 5777820f257..f4a6263afc0 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h +++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h @@ -195,6 +195,9 @@ public: TAO_Basic_StreamCtrl (void); // Default Constructor + virtual ~TAO_Basic_StreamCtrl (void); + // Destructor. + virtual void stop (const AVStreams::flowSpec &the_spec, CORBA::Environment &env = CORBA::Environment::default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException, @@ -258,20 +261,19 @@ public: // Not implemented in the light profile, will raise the notsupported // exception - virtual ~TAO_Basic_StreamCtrl (void); - // Destructor - protected: - AVStreams::VDev_ptr vdev_a_; - AVStreams::VDev_ptr vdev_b_; + AVStreams::VDev_var vdev_a_; + AVStreams::VDev_var vdev_b_; // The Virtual Devices for this stream - AVStreams::StreamEndPoint_A_ptr sep_a_; - AVStreams::StreamEndPoint_B_ptr sep_b_; + AVStreams::StreamEndPoint_A_var sep_a_; + AVStreams::StreamEndPoint_B_var sep_b_; // The Endpoints for this stream typedef ACE_Hash_Map_Manager FlowConnection_Map; + typedef ACE_Hash_Map_Iterator FlowConnection_Map_Iterator; + typedef ACE_Hash_Map_Entry FlowConnection_Map_Entry; FlowConnection_Map flow_connection_map_; // Hash table for the flow names and its corresponding flowconnection object reference. AVStreams::FlowConnection_seq flowConnections_; @@ -339,6 +341,9 @@ public: TAO_StreamCtrl (void); // Default Constructor + virtual ~TAO_StreamCtrl (void); + // virtual destructor. + virtual CORBA::Boolean bind_devs (AVStreams::MMDevice_ptr a_party, AVStreams::MMDevice_ptr b_party, AVStreams::streamQoS &the_qos, @@ -402,23 +407,24 @@ public: // Changes the QoS associated with the stream // Empty the_spec means apply operation to all flows - virtual ~TAO_StreamCtrl (void); - // Destructor. - protected: struct MMDevice_Map_Entry { - AVStreams::StreamEndPoint_ptr sep_; - AVStreams::VDev_ptr vdev_; + AVStreams::StreamEndPoint_var sep_; + AVStreams::VDev_var vdev_; AVStreams::flowSpec flowspec_; AVStreams::streamQoS qos_; }; - ACE_Hash_Map_Manager mmdevice_a_map_; - ACE_Hash_Map_Manager mmdevice_b_map_; + typedef ACE_Hash_Map_Manager MMDevice_Map; + typedef ACE_Hash_Map_Iterator MMDevice_Map_Iterator; + + MMDevice_Map mmdevice_a_map_; + MMDevice_Map mmdevice_b_map_; TAO_MCastConfigIf *mcastconfigif_; - AVStreams::MCastConfigIf_ptr mcastconfigif_ptr_; + AVStreams::MCastConfigIf_var mcastconfigif_ptr_; + AVStreams::StreamCtrl_var streamctrl_; }; class TAO_ORBSVCS_Export TAO_MCastConfigIf @@ -430,7 +436,7 @@ public: struct Peer_Info { - AVStreams::VDev_ptr peer_; + AVStreams::VDev_var peer_; AVStreams::streamQoS qos_; AVStreams::flowSpec flow_spec_; }; @@ -518,14 +524,14 @@ public: CORBA::Environment &env = CORBA::Environment::default_environment ()); // Application needs to define this - virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec) = 0; + virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec); // Application needs to define this - virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec) = 0; + virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec); // Application needs to define this virtual CORBA::Boolean handle_connection_requested (AVStreams::flowSpec &the_spec, - CORBA::Environment &env = CORBA::Environment::default_environment ()) = 0; + CORBA::Environment &env = CORBA::Environment::default_environment ()); // Application needs to define this // virtual int make_tcp_flow_handler (TAO_AV_TCP_Flow_Handler *&handler); @@ -673,15 +679,15 @@ public: virtual ~TAO_StreamEndPoint (void); // Destructor - virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec); - // Defined for backward compatibility. +// virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec); +// // Defined for backward compatibility. - virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec); - // Defined for backward compatibility. +// virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec); +// // Defined for backward compatibility. - virtual CORBA::Boolean handle_connection_requested (AVStreams::flowSpec &the_spec, - CORBA::Environment &env = CORBA::Environment::default_environment ()); - // Defined for backward compatibility. +// virtual CORBA::Boolean handle_connection_requested (AVStreams::flowSpec &the_spec, +// CORBA::Environment &env = CORBA::Environment::default_environment ()); +// // Defined for backward compatibility. CORBA::Boolean multiconnect (AVStreams::streamQoS &the_qos, AVStreams::flowSpec &the_spec, @@ -707,7 +713,7 @@ protected: CORBA::Long source_id_; // source id used for multicast. - AVStreams::Negotiator_ptr negotiator_; + AVStreams::Negotiator_var negotiator_; // our local negotiator for QoS. AVStreams::protocolSpec protocols_; @@ -724,7 +730,7 @@ protected: ACE_Hash_Map_Manager dgram_mcast_handler_map_; TAO_AV_FlowSpecSet forward_flow_spec_set; TAO_AV_FlowSpecSet reverse_flow_spec_set; - AVStreams::StreamEndPoint_ptr peer_sep_; + AVStreams::StreamEndPoint_var peer_sep_; AVStreams::SFPStatus *sfp_status_; }; @@ -1017,6 +1023,8 @@ protected: // current flow number used for system generation of flow names. typedef ACE_Hash_Map_Manager FDev_Map; + typedef ACE_Hash_Map_Iterator FDev_Map_Iterator; + typedef ACE_Hash_Map_Entry FDev_Map_Entry; FDev_Map fdev_map_; // hash table for the flownames and its corresponding flowEndpoint @@ -1024,6 +1032,8 @@ protected: AVStreams::flowSpec flows_; // sequence of supported flow names. + + TAO_StreamCtrl *stream_ctrl_; }; class TAO_FlowConsumer; @@ -1122,18 +1132,24 @@ public: // drops a flow endpoint from the flow. protected: - AVStreams::FlowProducer *producer_; + AVStreams::FlowProducer_var flow_producer_; // The producer of this flow. - AVStreams::FlowConsumer *consumer_; + AVStreams::FlowConsumer_var flow_consumer_; // The consumer of this flow - char * fp_name_; - // name of the flow protocol to be used. + CORBA::String_var fp_name_; + CORBA::Any fp_settings_; + CORBA::String_var producer_address_; + // The multicast address returned by the producer. + + int ip_multicast_; + // IP Multicasting is used. }; class TAO_ORBSVCS_Export TAO_FlowEndPoint : public virtual POA_AVStreams::FlowEndPoint, public virtual TAO_PropertySet, - public virtual PortableServer::RefCountServantBase + public virtual PortableServer::RefCountServantBase, + public virtual TAO_Base_StreamEndPoint { // = DESCRIPTION // This class is used per flow e.g video flow and an audio flow @@ -1144,6 +1160,19 @@ public: TAO_FlowEndPoint (void); //default constructor. + TAO_FlowEndPoint (const char *flowname, + AVStreams::protocolSpec &protocols, + const char *format); + + int open (const char *flowname, + AVStreams::protocolSpec &protocols, + const char *format); + + int set_flowname (const char *flowname); + + virtual int set_protocol_object (const char *flowname, + TAO_AV_Protocol_Object *object); + virtual CORBA::Boolean lock (CORBA::Environment &env = CORBA::Environment::default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException)); @@ -1264,13 +1293,6 @@ public: AVStreams::QoSRequestFailed)); // connect to the peer endpoint. - virtual CORBA::Boolean handle_connect_to_peer (AVStreams::QoS & the_qos, - const char * address, - const char * use_flow_protocol, - CORBA::Environment &env = - CORBA::Environment::default_environment ()); - // hook method to be overridden by the application to handle the connection request. - virtual char * go_to_listen (AVStreams::QoS & the_qos, CORBA::Boolean is_mcast, AVStreams::FlowProducer_ptr peer, @@ -1284,28 +1306,32 @@ public: // listen request from the peer. - virtual char * handle_go_to_listen (AVStreams::QoS & the_qos, - CORBA::Boolean is_mcast, - AVStreams::FlowProducer_ptr peer, - char *& flowProtocol, - CORBA::Environment &env = CORBA::Environment::default_environment ()); - // applications should override this method. - protected: - AVStreams::StreamEndPoint_ptr related_sep_; + AVStreams::StreamEndPoint_var related_sep_; // The related streamendpoint. - AVStreams::FlowConnection_ptr related_flow_connection_; + AVStreams::FlowConnection_var related_flow_connection_; // The related flow connection reference - AVStreams::FlowEndPoint_ptr peer_fep_; + AVStreams::FlowEndPoint_var peer_fep_; // The peer flowendpoint reference. AVStreams::protocolSpec protocols_; // Available protocols for this flowendpoint. - AVStreams::MCastConfigIf_ptr mcast_peer_; + AVStreams::protocolSpec protocol_addresses_; + // Address information for the protocols. + + AVStreams::MCastConfigIf_var mcast_peer_; // The multicast peer endpoint. + + CORBA::Boolean lock_; + // Lock. + + CORBA::String_var format_; + CORBA::String_var flowname_; + CosPropertyService::Properties dev_params_; + TAO_AV_Protocol_Object *protocol_object_; }; class TAO_ORBSVCS_Export TAO_FlowProducer: @@ -1317,6 +1343,10 @@ public: TAO_FlowProducer (void); // default constructor + TAO_FlowProducer (const char *flowname, + AVStreams::protocolSpec protocols, + const char *format); + virtual char * connect_mcast (AVStreams::QoS & the_qos, CORBA::Boolean_out is_met, const char * address, @@ -1360,7 +1390,9 @@ public: TAO_FlowConsumer (void); // default constructor. - + TAO_FlowConsumer (const char *flowname, + AVStreams::protocolSpec protocols, + const char *format); }; class TAO_ORBSVCS_Export TAO_FDev : @@ -1372,6 +1404,9 @@ public: TAO_FDev (void); // default constructor + TAO_FDev (const char *flowname); + // constructor taking a flowname. + AVStreams::FlowProducer_ptr create_producer (AVStreams::FlowConnection_ptr the_requester, AVStreams::QoS & the_qos, CORBA::Boolean_out met_qos, @@ -1443,6 +1478,7 @@ protected: typedef ACE_DLList_Iterator PRODUCER_LIST_ITERATOR; ACE_DLList consumer_list_; typedef ACE_DLList_Iterator CONSUMER_LIST_ITERATOR; + CORBA::String_var flowname_; }; class TAO_ORBSVCS_Export TAO_MediaControl @@ -1542,59 +1578,69 @@ public: // default constructor. TAO_FlowSpec_Entry (const char *flowname, - char *direction, - char *format_name, - char *flow_protocol, - char *carrier_protocol, + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *carrier_protocol, ACE_Addr *address); // constructor to construct an entry from the arguments. + TAO_FlowSpec_Entry (const char *flowname, + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *address); + virtual int parse (const char* flowSpec_entry) = 0; // construct the entry from a string specified by the flowSpec grammar. virtual ~TAO_FlowSpec_Entry (void); // virtual destructor. - virtual int direction (void); + int direction (void); // accessor to the direction. - virtual char * direction_str (void); + char * direction_str (void); // string version of direction . - virtual char *flow_protocol_str (void); + char *flow_protocol_str (void); // accesor to the flow protocol string. - virtual ACE_Addr *address (void); + ACE_Addr *address (void); // Address of the carrier protocol. - virtual char * address_str (void); + char * address_str (void); // Address in string format i. hostname:port. - virtual TAO_AV_Core::Protocol carrier_protocol (void); + TAO_AV_Core::Protocol carrier_protocol (void); // carrier protocol i.e TCP,UDP,RTP/UDP. - virtual char * carrier_protocol_str (void); + char * carrier_protocol_str (void); // string version of carrier protocol. - virtual char *format (void); + char *format (void); // format to be used for this flow. - virtual const char *flowname (void); + const char *flowname (void); // name of this flow. virtual char *entry_to_string (void) = 0; // converts the entry to a string. - virtual int set_peer_addr (ACE_Addr *peer_addr); - virtual ACE_Addr *get_peer_addr (void); - virtual int set_local_addr (ACE_Addr *peer_addr); - virtual ACE_Addr *get_local_addr (void); - virtual TAO_AV_Transport *transport (void); - virtual void transport (TAO_AV_Transport *transport); - virtual TAO_AV_Flow_Handler* handler (void); - virtual void handler (TAO_AV_Flow_Handler *handler); - virtual TAO_AV_Protocol_Object* protocol_object (void); - virtual void protocol_object (TAO_AV_Protocol_Object *object); + int set_peer_addr (ACE_Addr *peer_addr); + ACE_Addr *get_peer_addr (void); + int set_local_addr (ACE_Addr *peer_addr); + ACE_Addr *get_local_addr (void); + char *get_local_addr_str (void); + TAO_AV_Transport *transport (void); + void transport (TAO_AV_Transport *transport); + TAO_AV_Flow_Handler* handler (void); + void handler (TAO_AV_Flow_Handler *handler); + TAO_AV_Protocol_Object* protocol_object (void); + void protocol_object (TAO_AV_Protocol_Object *object); + + int parse_address (char *format_string); + // sets the address for this flow. protected: int parse_flow_protocol_string (char *flow_options_string); // parses the flow protocol string with tokens separated by : @@ -1602,8 +1648,6 @@ protected: int set_direction (char *direction_string); // sets the direction flag. - int parse_address (char *format_string); - // sets the address for this flow. int set_protocol (void); // sets the protocol_ enum from the carrier_protocol_ string. @@ -1660,13 +1704,19 @@ public: // default constructor. TAO_Forward_FlowSpec_Entry (const char *flowname, - char *direction = "", - char *format_name = "", - char *flow_protocol = "", - char *carrier_protocol = "", - ACE_Addr *address = 0); + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *carrier_protocol, + ACE_Addr *address); // constructor to construct an entry from the arguments. + TAO_Forward_FlowSpec_Entry (const char *flowname, + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *address); + virtual char *entry_to_string (void); // converts the entry to a string. @@ -1688,13 +1738,20 @@ public: // default constructor. TAO_Reverse_FlowSpec_Entry (const char *flowname, - char *direction = "", - char *format_name = "", - char *flow_protocol = "", - char *carrier_protocol = "", - ACE_Addr *address = 0); + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *carrier_protocol, + ACE_Addr *address); // constructor to construct an entry from the arguments. + TAO_Reverse_FlowSpec_Entry (const char *flowname, + const char *direction, + const char *format_name, + const char *flow_protocol, + const char *address); + // Takes the address in protocol=endpoint form. + virtual char *entry_to_string (void); // converts the entry to a string. -- cgit v1.2.1