diff options
author | naga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-07-13 16:58:48 +0000 |
---|---|---|
committer | naga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-07-13 16:58:48 +0000 |
commit | 23e6a0eca5f0679947156d52edefa2fe89efef43 (patch) | |
tree | 08935d85c836483423b6cc5a602f6397b196e436 | |
parent | 445b87a0bbfebb97785c07c24cdf714f186236aa (diff) | |
download | ATCD-23e6a0eca5f0679947156d52edefa2fe89efef43.tar.gz |
Fixed a few a error in the multipoint bind_devs case. It works now with
one multicast source and multiple multicast sinks.
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp | 290 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h | 36 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/MCast.cpp | 10 |
3 files changed, 221 insertions, 115 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp index 4cee9f39950..a7e3310349e 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp @@ -125,12 +125,6 @@ TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec, } } } - if (CORBA::is_nil (this->sep_a_.in ())) - return; - - // Make the upcall into the application - this->sep_a_->stop (flow_spec, ACE_TRY_ENV); - ACE_TRY_CHECK; } ACE_CATCHANY { @@ -180,12 +174,6 @@ TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec, } } } - if (CORBA::is_nil (this->sep_a_.in ())) - return; - - // Make the upcall into the application - this->sep_a_->start (flow_spec, ACE_TRY_ENV); - ACE_TRY_CHECK; } ACE_CATCHANY { @@ -226,21 +214,15 @@ TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec, FlowConnection_Map_Entry *entry = 0; for (;iterator.next (entry) != 0;iterator.advance ()) { - entry->int_id_->start (ACE_TRY_ENV); + entry->int_id_->destroy (ACE_TRY_ENV); ACE_TRY_CHECK; } } } - if (CORBA::is_nil (this->sep_a_.in ())) - return; - - // Make the upcall into the application - this->sep_a_->destroy (flow_spec, ACE_TRY_ENV); - ACE_TRY_CHECK; } ACE_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_Basic_StreamCtrl::start"); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_Basic_StreamCtrl::destroy"); return; } ACE_ENDTRY; @@ -493,6 +475,112 @@ TAO_StreamCtrl::~TAO_StreamCtrl (void) delete this->mcastconfigif_; } +// Stop the transfer of data of the stream +// Empty the_spec means apply operation to all flows +void +TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException, + AVStreams::noSuchFlow)) +{ + ACE_TRY + { + TAO_Basic_StreamCtrl::stop (flow_spec,ACE_TRY_ENV); + ACE_TRY_CHECK; + MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_); + MMDevice_Map::ENTRY *entry = 0; + for (;a_iterator.next (entry)!= 0;a_iterator.advance ()) + { + entry->int_id_.sep_->stop (flow_spec, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_); + for (;b_iterator.next (entry)!= 0;b_iterator.advance ()) + { + entry->int_id_.sep_->stop (flow_spec, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_Basic_StreamCtrl::stop"); + return; + } + ACE_ENDTRY; + ACE_CHECK; +} + +// Start the transfer of data in the stream. +// Empty the_spec means apply operation to all flows +void +TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException, + AVStreams::noSuchFlow)) +{ + ACE_TRY + { + TAO_Basic_StreamCtrl::start (flow_spec,ACE_TRY_ENV); + ACE_TRY_CHECK; + MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_); + MMDevice_Map::ENTRY *entry = 0; + for (;a_iterator.next (entry)!= 0;a_iterator.advance ()) + { + entry->int_id_.sep_->start (flow_spec, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_); + for (;b_iterator.next (entry)!= 0;b_iterator.advance ()) + { + entry->int_id_.sep_->start (flow_spec, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_Basic_StreamCtrl::start"); + return; + } + ACE_ENDTRY; + ACE_CHECK; +} + +// Tears down the stream. This will close the connection, and delete +// the streamendpoint and vdev associated with this stream +// Empty the_spec means apply operation to all flows +void +TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException, + AVStreams::noSuchFlow)) +{ + ACE_TRY + { + TAO_Basic_StreamCtrl::destroy (flow_spec,ACE_TRY_ENV); + ACE_TRY_CHECK; + MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_); + MMDevice_Map::ENTRY *entry = 0; + for (;a_iterator.next (entry)!= 0;a_iterator.advance ()) + { + entry->int_id_.sep_->destroy (flow_spec, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_); + for (;b_iterator.next (entry)!= 0;b_iterator.advance ()) + { + entry->int_id_.sep_->destroy (flow_spec, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_Basic_StreamCtrl::destroy"); + return; + } + ACE_ENDTRY; + ACE_CHECK; +} + // request the two MMDevices to create vdev and stream endpoints. save // the references returned. @@ -611,7 +699,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, if (CORBA::is_nil (b_party)) { - if (this->mcastconfigif_ != 0) + if (!this->mcastconfigif_) { ACE_NEW_RETURN (this->mcastconfigif_, TAO_MCastConfigIf, @@ -632,7 +720,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party, if (CORBA::is_nil (a_party)) { // Multicast sink being added. - if (this->mcastconfigif_ != 0) + if (!this->mcastconfigif_) ACE_ERROR_RETURN ((LM_ERROR,"first add a source and then a sink\n"),0); this->mcastconfigif_->set_peer (this->vdev_b_.in (), the_qos, @@ -1303,11 +1391,11 @@ TAO_StreamEndPoint::TAO_StreamEndPoint (void) mcast_port_ (ACE_DEFAULT_MULTICAST_PORT) { this->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR); + ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %ud",this->mcast_addr_)); // this->handle_open (); } - CORBA::Boolean TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder, AVStreams::streamQoS &qos, @@ -1877,49 +1965,6 @@ TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void) ACE_DEBUG ((LM_DEBUG,"(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n")); } -void -TAO_StreamEndPoint_A::start (const AVStreams::flowSpec &flow_spec, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException, - AVStreams::noSuchFlow)) -{ - ACE_TRY - { - TAO_StreamEndPoint::start (flow_spec,ACE_TRY_ENV); - ACE_TRY_CHECK; - if (CORBA::is_nil (this->peer_sep_.in ())) - { - CORBA::Any_var sep_any; - sep_any = this->get_property_value ("PeerAdapter", - ACE_TRY_ENV); - ACE_TRY_CHECK; - AVStreams::StreamEndPoint_B_ptr sep; - sep_any.in () >>= sep; - this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (sep); - } - this->peer_sep_->start (flow_spec,ACE_TRY_ENV); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint_A::start"); - } - ACE_ENDTRY; - ACE_CHECK; -} - -void -TAO_StreamEndPoint_A::stop (const AVStreams::flowSpec &flow_spec, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException, - AVStreams::noSuchFlow)) -{ - TAO_StreamEndPoint::stop (flow_spec,ACE_TRY_ENV); - ACE_CHECK; - this->peer_sep_->stop (flow_spec,ACE_TRY_ENV); - ACE_CHECK; -} - // IP Multicast style connect. CORBA::Boolean TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos, @@ -1931,18 +1976,21 @@ TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos, TAO_AV_QoS qos (stream_qos); for (u_int i=0;i< flow_spec.length ();i++) { - TAO_Forward_FlowSpec_Entry forward_entry; - forward_entry.parse (flow_spec[i]); - TAO_String_Hash_Key mcast_key (forward_entry.flowname ()); + TAO_Forward_FlowSpec_Entry *forward_entry; + ACE_NEW_RETURN (forward_entry, + TAO_Forward_FlowSpec_Entry, + 0); + forward_entry->parse (flow_spec[i]); + TAO_String_Hash_Key mcast_key (forward_entry->flowname ()); AVStreams::FlowEndPoint_ptr flow_endpoint = AVStreams::FlowEndPoint::_nil (); if (this->fep_map_.find (mcast_key,flow_endpoint) == 0) { ACE_TRY_EX (narrow) { AVStreams::QoS flow_qos; - result = qos.get_flow_qos (forward_entry.flowname (),flow_qos); + result = qos.get_flow_qos (forward_entry->flowname (),flow_qos); if (result < 0) - ACE_DEBUG ((LM_DEBUG,"QoS not found for %s",forward_entry.flowname ())); + ACE_DEBUG ((LM_DEBUG,"QoS not found for %s",forward_entry->flowname ())); // Narrow it to FlowProducer. AVStreams::FlowProducer_var producer = AVStreams::FlowProducer::_narrow (flow_endpoint,ACE_TRY_ENV); ACE_TRY_CHECK_EX (narrow); @@ -1966,39 +2014,59 @@ TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos, ACE_ENDTRY; ACE_CHECK_RETURN (0); } - TAO_AV_UDP_MCast_Flow_Handler *mcast_dgram = 0; - ACE_INET_Addr mcast_addr; - result = this->dgram_mcast_handler_map_.find (mcast_key,mcast_dgram); + ACE_INET_Addr *mcast_addr; + TAO_FlowSpec_Entry *entry = 0; + result = this->mcast_entry_map_.find (mcast_key,entry); if (result == 0) { - mcast_dgram->get_local_addr (mcast_addr); + mcast_addr = ACE_dynamic_cast (ACE_INET_Addr *,entry->get_local_addr ()); char str_addr [BUFSIZ]; - result = mcast_addr.addr_to_string (str_addr,BUFSIZ); + result = mcast_addr->addr_to_string (str_addr,BUFSIZ); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint::multiconnect ::addr_to_string failed\n"),0); - char temp_entry [BUFSIZ]; - ACE_OS::sprintf (temp_entry, - "%s\\\\UDP=%s",forward_entry.flowname (),str_addr); - flow_spec[i] = temp_entry; + ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint_A::multiconnect:%s\n",str_addr)); + TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (), + entry->direction_str (), + entry->format (), + entry->flow_protocol_str (), + entry->carrier_protocol_str (), + entry->get_local_addr ()); + flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ()); } else { - switch (forward_entry.direction ()) + switch (forward_entry->direction ()) { case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: { - // IN means we're the source. - mcast_addr.set (this->mcast_addr_++,this->mcast_port_); - // @@ We have to take care of this. -// result = this->make_dgram_mcast_flow_handler (mcast_dgram); -// if (result < 0) -// return 0; - result = mcast_dgram->subscribe (mcast_addr); + ACE_NEW_RETURN (mcast_addr, + ACE_INET_Addr, + 0); + mcast_addr->set (this->mcast_port_++,this->mcast_addr_); + char buf[BUFSIZ]; + mcast_addr->addr_to_string (buf,BUFSIZ); + ACE_DEBUG ((LM_DEBUG,"%s\n",buf)); + TAO_Forward_FlowSpec_Entry *new_entry; + ACE_NEW_RETURN (new_entry, + TAO_Forward_FlowSpec_Entry (forward_entry->flowname (), + forward_entry->direction_str (), + forward_entry->format (), + forward_entry->flow_protocol_str (), + forward_entry->carrier_protocol_str (), + mcast_addr), + 0); + flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ()); + + this->forward_flow_spec_set.insert (new_entry); + TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry (); + result = acceptor_registry->open (this, + TAO_AV_CORE::instance (), + this->forward_flow_spec_set); if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"%p\n","subscribe"),0); - result = this->dgram_mcast_handler_map_.bind (mcast_key,mcast_dgram); + ACE_ERROR_RETURN ((LM_ERROR,"Acceptor_Registry::open failed\n"),0); + result = this->mcast_entry_map_.bind (mcast_key,new_entry); if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"dgram_mcast_handler::bind failed"),0); + ACE_ERROR_RETURN ((LM_ERROR,"mcast_entry::bind failed"),0); } break; case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: @@ -2055,22 +2123,25 @@ TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &the_qos, int result = 0; for (u_int i=0;i< flow_spec.length ();i++) { - TAO_Forward_FlowSpec_Entry forward_entry; - forward_entry.parse (flow_spec[i]); - TAO_String_Hash_Key mcast_key (forward_entry.flowname ()); - TAO_AV_UDP_MCast_Flow_Handler *mcast_dgram = 0; + TAO_Forward_FlowSpec_Entry *forward_entry; + ACE_NEW_RETURN (forward_entry, + TAO_Forward_FlowSpec_Entry, + 0); + forward_entry->parse (flow_spec[i]); + TAO_String_Hash_Key mcast_key (forward_entry->flowname ()); + TAO_FlowSpec_Entry *mcast_entry = 0; ACE_INET_Addr *mcast_addr; - mcast_addr = ACE_dynamic_cast (ACE_INET_Addr *,forward_entry.address ()); + mcast_addr = ACE_dynamic_cast (ACE_INET_Addr *,forward_entry->address ()); if (mcast_addr == 0) ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"),0); - result = this->dgram_mcast_handler_map_.find (mcast_key,mcast_dgram); + result = this->mcast_entry_map_.find (mcast_key,mcast_entry); if (result == 0) { ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_B::multiconnect::handler already found\n"),0); } else { - switch (forward_entry.direction ()) + switch (forward_entry->direction ()) { case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: { @@ -2080,10 +2151,14 @@ TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &the_qos, // if (result < 0) // return 0; - result = mcast_dgram->subscribe (*mcast_addr); + this->forward_flow_spec_set.insert (forward_entry); + TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry (); + result = connector_registry->open (this, + TAO_AV_CORE::instance (), + this->forward_flow_spec_set); if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"%p\n","subscribe"),0); - result = this->dgram_mcast_handler_map_.bind (mcast_key,mcast_dgram); + ACE_ERROR_RETURN ((LM_ERROR,"connector_registry::open failed\n"),0); + result = this->mcast_entry_map_.bind (mcast_key,forward_entry); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR,"dgram_mcast_handler::bind failed"),0); } @@ -4161,7 +4236,18 @@ TAO_FlowSpec_Entry::set_protocol (void) if (ACE_OS::strcmp (this->carrier_protocol_,"TCP") == 0) this->protocol_ = TAO_AV_Core::TAO_AV_TCP; else if (ACE_OS::strcmp (this->carrier_protocol_,"UDP") == 0) - this->protocol_ = TAO_AV_Core::TAO_AV_UDP; + { + this->protocol_ = TAO_AV_Core::TAO_AV_UDP; + if (this->address_ != 0) + { + ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->address_); + char buf[BUFSIZ]; + inet_addr->addr_to_string (buf,BUFSIZ); + ACE_DEBUG ((LM_DEBUG,"TAO_FlowSpec_Entry::set_protocol:%s\n",buf)); + if (IN_CLASSD (inet_addr->get_ip_address ())) + this->protocol_ = TAO_AV_Core::TAO_AV_UDP_MCAST; + } + } else if (ACE_OS::strcmp (this->carrier_protocol_,"AAL5") == 0) this->protocol_ = TAO_AV_Core::TAO_AV_AAL5; else if (ACE_OS::strcmp (this->carrier_protocol_,"AAL3_4") == 0) diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h index f4a6263afc0..df497142d30 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h +++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h @@ -344,6 +344,28 @@ public: virtual ~TAO_StreamCtrl (void); // virtual destructor. + virtual void stop (const AVStreams::flowSpec &the_spec, + CORBA::Environment &env = CORBA::Environment::default_environment ()) + ACE_THROW_SPEC ((CORBA::SystemException, + AVStreams::noSuchFlow)); + // Stop the transfer of data of the stream + // Empty the_spec means apply operation to all flows + + virtual void start (const AVStreams::flowSpec &the_spec, + CORBA::Environment &env = CORBA::Environment::default_environment ()) + ACE_THROW_SPEC ((CORBA::SystemException, + AVStreams::noSuchFlow)); + // Start the transfer of data in the stream. + // Empty the_spec means apply operation to all flows + + virtual void destroy (const AVStreams::flowSpec &the_spec, + CORBA::Environment &env = CORBA::Environment::default_environment ()) + ACE_THROW_SPEC ((CORBA::SystemException, + AVStreams::noSuchFlow)); + // Tears down the stream. This will close the connection, and delete + // the streamendpoint and vdev associated with this stream + // Empty the_spec means apply operation to all flows + virtual CORBA::Boolean bind_devs (AVStreams::MMDevice_ptr a_party, AVStreams::MMDevice_ptr b_party, AVStreams::streamQoS &the_qos, @@ -727,7 +749,7 @@ protected: u_short mcast_port_; ACE_UINT32 mcast_addr_; - ACE_Hash_Map_Manager <TAO_String_Hash_Key, TAO_AV_UDP_MCast_Flow_Handler *,ACE_Null_Mutex> dgram_mcast_handler_map_; + ACE_Hash_Map_Manager <TAO_String_Hash_Key, TAO_FlowSpec_Entry*,ACE_Null_Mutex> mcast_entry_map_; TAO_AV_FlowSpecSet forward_flow_spec_set; TAO_AV_FlowSpecSet reverse_flow_spec_set; AVStreams::StreamEndPoint_var peer_sep_; @@ -746,18 +768,6 @@ public: TAO_StreamEndPoint_A (void); // Constructor - virtual void stop (const AVStreams::flowSpec &the_spec, - CORBA::Environment &env = CORBA::Environment::default_environment ()) - ACE_THROW_SPEC ((CORBA::SystemException, - AVStreams::noSuchFlow)); - // Stop the stream. Empty the_spec means, for all the flows - - virtual void start (const AVStreams::flowSpec &the_spec, - CORBA::Environment &env = CORBA::Environment::default_environment ()) - ACE_THROW_SPEC ((CORBA::SystemException, - AVStreams::noSuchFlow)); - // Start the stream, Empty the_spec means, for all the flows - virtual CORBA::Boolean multiconnect (AVStreams::streamQoS &the_qos, AVStreams::flowSpec &the_spec, CORBA::Environment &env = CORBA::Environment::default_environment ()) diff --git a/TAO/orbsvcs/orbsvcs/AV/MCast.cpp b/TAO/orbsvcs/orbsvcs/AV/MCast.cpp index 436e7df322d..e83bbf0b08d 100644 --- a/TAO/orbsvcs/orbsvcs/AV/MCast.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/MCast.cpp @@ -57,6 +57,11 @@ TAO_AV_UDP_MCast_Acceptor::open (TAO_Base_StreamEndPoint *endpoint, return result; entry->handler (handler); + ACE_INET_Addr *local_addr = 0; + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr (*mcast_addr), + -1); + entry->set_local_addr (local_addr); return 0; } @@ -147,6 +152,11 @@ TAO_AV_UDP_MCast_Connector::connect (TAO_FlowSpec_Entry *entry, return result; entry->handler (handler); transport = handler->transport (); + ACE_INET_Addr *local_addr = 0; + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr (*mcast_addr), + -1); + entry->set_local_addr (local_addr); return 0; } |