summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-07-14 04:33:33 +0000
committernaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-07-14 04:33:33 +0000
commit0e79f27d378445f3e9cc2c9b3de448fd71366705 (patch)
treefdcba786fc4207e95d976c564fa3547e873c44b7
parenta47b948b64946edd023f7a8ad6d2679c5424c9f7 (diff)
downloadATCD-0e79f27d378445f3e9cc2c9b3de448fd71366705.tar.gz
Added code so that the multipoint binding work in the full profile. Now
the users can use full profile object like FDevs inside MMDevice to create flow producers and flow consumers. Right now the case of one multipoint source and multiple multipoint sinks has been tested with the Full profile.
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp749
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h17
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/MCast.cpp1
3 files changed, 535 insertions, 232 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
index a7e3310349e..7b19070f8bc 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
@@ -487,6 +487,8 @@ TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec,
{
TAO_Basic_StreamCtrl::stop (flow_spec,ACE_TRY_ENV);
ACE_TRY_CHECK;
+ if (this->flow_connection_map_.current_size () > 0)
+ return;
MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
MMDevice_Map::ENTRY *entry = 0;
for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
@@ -522,6 +524,8 @@ TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec,
{
TAO_Basic_StreamCtrl::start (flow_spec,ACE_TRY_ENV);
ACE_TRY_CHECK;
+ if (this->flow_connection_map_.current_size () > 0)
+ return;
MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
MMDevice_Map::ENTRY *entry = 0;
for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
@@ -558,6 +562,8 @@ TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec,
{
TAO_Basic_StreamCtrl::destroy (flow_spec,ACE_TRY_ENV);
ACE_TRY_CHECK;
+ if (this->flow_connection_map_.current_size () > 0)
+ return;
MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
MMDevice_Map::ENTRY *entry = 0;
for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
@@ -637,6 +643,13 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG,"(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
+ // Define ourselves as the related_streamctrl property of the sep.
+ CORBA::Any streamctrl_any;
+ streamctrl_any <<= this->streamctrl_.in ();
+ this->sep_a_->define_property ("Related_StreamCtrl",
+ streamctrl_any,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
// add the mmdevice,sep and vdev to the map.
MMDevice_Map_Entry map_entry;
MMDevice_Map_Hash_Key key (a_party);
@@ -683,6 +696,14 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in (),
ACE_TRY_ENV)));
ACE_TRY_CHECK;
+ // Define ourselves as the related_streamctrl property of the sep.
+ CORBA::Any streamctrl_any;
+ streamctrl_any <<= this->streamctrl_.in ();
+ this->sep_b_->define_property ("Related_StreamCtrl",
+ streamctrl_any,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
// add the mmdevice,sep and vdev to the map.
MMDevice_Map_Entry map_entry;
MMDevice_Map_Hash_Key key (b_party);
@@ -697,7 +718,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
}
}
- if (CORBA::is_nil (b_party))
+// In the full profile case there's no VDev.
+ if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
{
if (!this->mcastconfigif_)
{
@@ -719,14 +741,17 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
if (CORBA::is_nil (a_party))
{
- // Multicast sink being added.
- if (!this->mcastconfigif_)
- ACE_ERROR_RETURN ((LM_ERROR,"first add a source and then a sink\n"),0);
- this->mcastconfigif_->set_peer (this->vdev_b_.in (),
- the_qos,
- the_flows,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ if (!CORBA::is_nil (this->vdev_b_.in ()))
+ {
+ // Multicast sink being added.
+ if (!this->mcastconfigif_)
+ ACE_ERROR_RETURN ((LM_ERROR,"first add a source and then a sink\n"),0);
+ this->mcastconfigif_->set_peer (this->vdev_b_.in (),
+ the_qos,
+ the_flows,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
int connect_leaf_success = 0;
ACE_TRY_EX (connect_leaf)
@@ -984,7 +1009,9 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
flowname_any.in () >>= flowname;
ACE_TRY_EX (flow_connection)
{
- flow_connection_obj = this->get_flow_connection (flowname,ACE_TRY_ENV);
+ flow_connection_obj =
+ this->get_flow_connection (flowname,
+ ACE_TRY_ENV);
ACE_TRY_CHECK_EX (flow_connection);
flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in (),
ACE_TRY_ENV);
@@ -1972,111 +1999,159 @@ TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
CORBA::Environment &ACE_TRY_ENV)
{
ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint::multiconnect\n"));
- int result = 0;
- TAO_AV_QoS qos (stream_qos);
- for (u_int i=0;i< flow_spec.length ();i++)
+ ACE_TRY
{
- TAO_Forward_FlowSpec_Entry *forward_entry;
- ACE_NEW_RETURN (forward_entry,
- TAO_Forward_FlowSpec_Entry,
- 0);
- forward_entry->parse (flow_spec[i]);
- TAO_String_Hash_Key mcast_key (forward_entry->flowname ());
- AVStreams::FlowEndPoint_ptr flow_endpoint = AVStreams::FlowEndPoint::_nil ();
- if (this->fep_map_.find (mcast_key,flow_endpoint) == 0)
+ int result = 0;
+ TAO_AV_QoS qos (stream_qos);
+ for (u_int i=0;i< flow_spec.length ();i++)
{
- ACE_TRY_EX (narrow)
+ TAO_Forward_FlowSpec_Entry *forward_entry;
+ ACE_NEW_RETURN (forward_entry,
+ TAO_Forward_FlowSpec_Entry,
+ 0);
+ forward_entry->parse (flow_spec[i]);
+ TAO_String_Hash_Key mcast_key (forward_entry->flowname ());
+ AVStreams::FlowEndPoint_ptr flow_endpoint = AVStreams::FlowEndPoint::_nil ();
+ if (this->fep_map_.find (mcast_key,flow_endpoint) == 0)
{
- AVStreams::QoS flow_qos;
- result = qos.get_flow_qos (forward_entry->flowname (),flow_qos);
- if (result < 0)
- ACE_DEBUG ((LM_DEBUG,"QoS not found for %s",forward_entry->flowname ()));
- // Narrow it to FlowProducer.
- AVStreams::FlowProducer_var producer = AVStreams::FlowProducer::_narrow (flow_endpoint,ACE_TRY_ENV);
- ACE_TRY_CHECK_EX (narrow);
- // Else narrow succeeeded.
- if (!CORBA::is_nil (producer.in ()))
+ ACE_TRY_EX (narrow)
+ {
+ AVStreams::QoS flow_qos;
+ result = qos.get_flow_qos (forward_entry->flowname (),flow_qos);
+ if (result < 0)
+ ACE_DEBUG ((LM_DEBUG,"QoS not found for %s",forward_entry->flowname ()));
+ // Narrow it to FlowProducer.
+ AVStreams::FlowProducer_var producer = AVStreams::FlowProducer::_narrow (flow_endpoint,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (narrow);
+ // Else narrow succeeeded.
+ if (!CORBA::is_nil (producer.in ()))
+ {
+ AVStreams::FlowConnection_var flow_connection;
+ ACE_TRY_EX (flow_connection)
+ {
+ if (CORBA::is_nil (this->streamctrl_.in ()))
+ {
+ CORBA::Any_var streamctrl_any;
+ streamctrl_any = this->get_property_value ("Related_StreamCtrl",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ AVStreams::StreamCtrl_ptr streamctrl;
+ streamctrl_any.in () >>= streamctrl;
+ this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
+ }
+
+ CORBA::Object_var flow_connection_obj =
+ this->streamctrl_->get_flow_connection (forward_entry->flowname (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connection);
+ flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connection);
+ }
+ ACE_CATCHANY
+ {
+ TAO_FlowConnection *flowConnection;
+ ACE_NEW_RETURN (flowConnection,
+ TAO_FlowConnection,
+ 0);
+ flowConnection->set_mcast_addr (this->mcast_addr_,this->mcast_port_++);
+ flow_connection = flowConnection->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->streamctrl_->set_flow_connection (forward_entry->flowname (),
+ flow_connection,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
+ result = flow_connection->add_producer (producer.in (),
+ flow_qos,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ if (result == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"),0);
+ }
+ }
+ ACE_CATCHANY
{
- CORBA::Boolean is_met (0);
- CORBA::String_var address =
- producer->connect_mcast (flow_qos,
- is_met,
- "",
- "");
+ // Narrow failed and since its not a flowproducer its an error.
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"FlowProducer::_narrow");
+ ACE_ERROR_RETURN ((LM_ERROR,"sep_a doesn't contain a flowproducer"),0);
}
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
}
- ACE_CATCHANY
+ ACE_INET_Addr *mcast_addr;
+ TAO_FlowSpec_Entry *entry = 0;
+ result = this->mcast_entry_map_.find (mcast_key,entry);
+ if (result == 0)
{
- // Narrow failed and since its not a flowproducer its an error.
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"FlowProducer::_narrow");
- ACE_ERROR_RETURN ((LM_ERROR,"sep_a doesn't contain a flowproducer"),0);
+ mcast_addr = ACE_dynamic_cast (ACE_INET_Addr *,entry->get_local_addr ());
+ char str_addr [BUFSIZ];
+ result = mcast_addr->addr_to_string (str_addr,BUFSIZ);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint::multiconnect ::addr_to_string failed\n"),0);
+ ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint_A::multiconnect:%s\n",str_addr));
+ TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
+ entry->direction_str (),
+ entry->format (),
+ entry->flow_protocol_str (),
+ entry->carrier_protocol_str (),
+ entry->get_local_addr ());
+ flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
}
- ACE_ENDTRY;
- ACE_CHECK_RETURN (0);
- }
- ACE_INET_Addr *mcast_addr;
- TAO_FlowSpec_Entry *entry = 0;
- result = this->mcast_entry_map_.find (mcast_key,entry);
- if (result == 0)
- {
- mcast_addr = ACE_dynamic_cast (ACE_INET_Addr *,entry->get_local_addr ());
- char str_addr [BUFSIZ];
- result = mcast_addr->addr_to_string (str_addr,BUFSIZ);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint::multiconnect ::addr_to_string failed\n"),0);
- ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint_A::multiconnect:%s\n",str_addr));
- TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
- entry->direction_str (),
- entry->format (),
- entry->flow_protocol_str (),
- entry->carrier_protocol_str (),
- entry->get_local_addr ());
- flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
- }
- else
- {
- switch (forward_entry->direction ())
+ else
{
- case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
- {
- ACE_NEW_RETURN (mcast_addr,
- ACE_INET_Addr,
- 0);
- mcast_addr->set (this->mcast_port_++,this->mcast_addr_);
- char buf[BUFSIZ];
- mcast_addr->addr_to_string (buf,BUFSIZ);
- ACE_DEBUG ((LM_DEBUG,"%s\n",buf));
- TAO_Forward_FlowSpec_Entry *new_entry;
- ACE_NEW_RETURN (new_entry,
- TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
- forward_entry->direction_str (),
- forward_entry->format (),
- forward_entry->flow_protocol_str (),
- forward_entry->carrier_protocol_str (),
- mcast_addr),
- 0);
- flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
-
- this->forward_flow_spec_set.insert (new_entry);
- TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
- result = acceptor_registry->open (this,
- TAO_AV_CORE::instance (),
- this->forward_flow_spec_set);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"Acceptor_Registry::open failed\n"),0);
- result = this->mcast_entry_map_.bind (mcast_key,new_entry);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"mcast_entry::bind failed"),0);
- }
- break;
- case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
- // OUT implies we're the sink.
- break;
- default:
- break;
+ switch (forward_entry->direction ())
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
+ {
+ ACE_NEW_RETURN (mcast_addr,
+ ACE_INET_Addr,
+ 0);
+ mcast_addr->set (this->mcast_port_++,this->mcast_addr_);
+ char buf[BUFSIZ];
+ mcast_addr->addr_to_string (buf,BUFSIZ);
+ ACE_DEBUG ((LM_DEBUG,"%s\n",buf));
+ TAO_Forward_FlowSpec_Entry *new_entry;
+ ACE_NEW_RETURN (new_entry,
+ TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
+ forward_entry->direction_str (),
+ forward_entry->format (),
+ forward_entry->flow_protocol_str (),
+ forward_entry->carrier_protocol_str (),
+ mcast_addr),
+ 0);
+ flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
+
+ this->forward_flow_spec_set.insert (new_entry);
+ TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
+ result = acceptor_registry->open (this,
+ TAO_AV_CORE::instance (),
+ this->forward_flow_spec_set);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"Acceptor_Registry::open failed\n"),0);
+ result = this->mcast_entry_map_.bind (mcast_key,new_entry);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"mcast_entry::bind failed"),0);
+ }
+ break;
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
+ // OUT implies we're the sink.
+ break;
+ default:
+ break;
+ }
}
}
}
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint_A::multiconnect");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
return 1;
}
@@ -2115,62 +2190,131 @@ TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
}
CORBA::Boolean
-TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &the_qos,
+TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
AVStreams::flowSpec &flow_spec,
CORBA::Environment &ACE_TRY_ENV)
{
ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint_B::multiconnect\n"));
- int result = 0;
- for (u_int i=0;i< flow_spec.length ();i++)
+ ACE_TRY
{
- TAO_Forward_FlowSpec_Entry *forward_entry;
- ACE_NEW_RETURN (forward_entry,
- TAO_Forward_FlowSpec_Entry,
- 0);
- forward_entry->parse (flow_spec[i]);
- TAO_String_Hash_Key mcast_key (forward_entry->flowname ());
- TAO_FlowSpec_Entry *mcast_entry = 0;
- ACE_INET_Addr *mcast_addr;
- mcast_addr = ACE_dynamic_cast (ACE_INET_Addr *,forward_entry->address ());
- if (mcast_addr == 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"),0);
- result = this->mcast_entry_map_.find (mcast_key,mcast_entry);
- if (result == 0)
- {
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_B::multiconnect::handler already found\n"),0);
- }
- else
+ int result = 0;
+ TAO_AV_QoS qos (stream_qos);
+ for (u_int i=0;i< flow_spec.length ();i++)
{
- switch (forward_entry->direction ())
+ TAO_Forward_FlowSpec_Entry *forward_entry;
+ ACE_NEW_RETURN (forward_entry,
+ TAO_Forward_FlowSpec_Entry,
+ 0);
+ forward_entry->parse (flow_spec[i]);
+ TAO_String_Hash_Key mcast_key (forward_entry->flowname ());
+ AVStreams::FlowEndPoint_var flow_endpoint;
+ if (this->fep_map_.find (mcast_key,flow_endpoint.out ()) == 0)
{
- case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
- {
- // IN means we're the sink.
- // @@ We have to take care of this.
-// result = this->make_dgram_mcast_flow_handler (mcast_dgram);
-// if (result < 0)
-// return 0;
-
- this->forward_flow_spec_set.insert (forward_entry);
- TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
- result = connector_registry->open (this,
- TAO_AV_CORE::instance (),
- this->forward_flow_spec_set);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"connector_registry::open failed\n"),0);
- result = this->mcast_entry_map_.bind (mcast_key,forward_entry);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"dgram_mcast_handler::bind failed"),0);
- }
- break;
- case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
- // OUT implies we're the source.,which is an error.
- break;
- default:
- break;
+ AVStreams::FlowConsumer_var consumer;
+ ACE_TRY_EX (narrow)
+ {
+ consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (narrow);
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"FlowConsumer::_narrow");
+ ACE_ERROR_RETURN ((LM_ERROR,"sep_b doesn't contain a flowconsumer"),0);
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
+ AVStreams::QoS flow_qos;
+ result = qos.get_flow_qos (forward_entry->flowname (),flow_qos);
+ if (result < 0)
+ ACE_DEBUG ((LM_DEBUG,"QoS not found for %s",forward_entry->flowname ()));
+ AVStreams::FlowConnection_var flow_connection;
+ ACE_TRY_EX (flow_connection)
+ {
+ if (CORBA::is_nil (this->streamctrl_.in ()))
+ {
+ CORBA::Any_var streamctrl_any;
+ streamctrl_any = this->get_property_value ("Related_StreamCtrl",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ AVStreams::StreamCtrl_ptr streamctrl;
+ streamctrl_any.in () >>= streamctrl;
+ this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
+ }
+ CORBA::Object_var flow_connection_obj =
+ this->streamctrl_->get_flow_connection (forward_entry->flowname (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connection);
+ flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connection);
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
+ result = flow_connection->add_consumer (consumer.in (),
+ flow_qos,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ if (result == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"),0);
+ }
+ else
+ {
+ TAO_FlowSpec_Entry *mcast_entry = 0;
+ ACE_INET_Addr *mcast_addr;
+ mcast_addr = ACE_dynamic_cast (ACE_INET_Addr *,forward_entry->address ());
+ if (mcast_addr == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"),0);
+ result = this->mcast_entry_map_.find (mcast_key,mcast_entry);
+ if (result == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_StreamEndPoint_B::multiconnect::handler already found\n"),0);
+ }
+ else
+ {
+ switch (forward_entry->direction ())
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
+ {
+ // IN means we're the sink.
+ // @@ We have to take care of this.
+ // result = this->make_dgram_mcast_flow_handler (mcast_dgram);
+ // if (result < 0)
+ // return 0;
+
+ this->forward_flow_spec_set.insert (forward_entry);
+ TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
+ result = connector_registry->open (this,
+ TAO_AV_CORE::instance (),
+ this->forward_flow_spec_set);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"connector_registry::open failed\n"),0);
+ result = this->mcast_entry_map_.bind (mcast_key,forward_entry);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"dgram_mcast_handler::bind failed"),0);
+ }
+ break;
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
+ // OUT implies we're the source.,which is an error.
+ break;
+ default:
+ break;
+ }
+ }
}
}
}
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint_B::multiconnect");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
return 1;
}
@@ -2862,6 +3006,14 @@ TAO_FlowConnection::TAO_FlowConnection (void)
{
}
+int
+TAO_FlowConnection::set_mcast_addr (ACE_UINT32 mcast_addr, u_short mcast_port)
+{
+ this->mcast_addr_ = mcast_addr;
+ this->mcast_port_ = mcast_port;
+ return 0;
+}
+
// stop this flow.
void
TAO_FlowConnection::stop (CORBA::Environment &ACE_TRY_ENV)
@@ -2869,10 +3021,24 @@ TAO_FlowConnection::stop (CORBA::Environment &ACE_TRY_ENV)
{
ACE_TRY
{
- this->flow_producer_->stop (ACE_TRY_ENV);
- ACE_TRY_CHECK;
- this->flow_consumer_->stop (ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
+ ();
+ for (FlowProducer_SetItor producer_end =
+ this->flow_producer_set_.end ();
+ producer_begin != producer_end; ++producer_begin)
+ {
+ (*producer_begin)->stop (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
+ ();
+ for (FlowConsumer_SetItor consumer_end =
+ this->flow_consumer_set_.end ();
+ consumer_begin != consumer_end; ++consumer_begin)
+ {
+ (*consumer_begin)->stop (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
}
ACE_CATCHANY
{
@@ -2890,10 +3056,24 @@ TAO_FlowConnection::start (CORBA::Environment &ACE_TRY_ENV)
{
ACE_TRY
{
- this->flow_consumer_->start (ACE_TRY_ENV);
- ACE_TRY_CHECK;
- this->flow_producer_->start (ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
+ ();
+ for (FlowConsumer_SetItor consumer_end =
+ this->flow_consumer_set_.end ();
+ consumer_begin != consumer_end; ++consumer_begin)
+ {
+ (*consumer_begin)->start (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
+ ();
+ for (FlowProducer_SetItor producer_end =
+ this->flow_producer_set_.end ();
+ producer_begin != producer_end; ++producer_begin)
+ {
+ (*producer_begin)->start (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
}
ACE_CATCHANY
{
@@ -2908,9 +3088,33 @@ TAO_FlowConnection::start (CORBA::Environment &ACE_TRY_ENV)
void
TAO_FlowConnection::destroy (CORBA::Environment &ACE_TRY_ENV)
{
- this->flow_producer_->destroy (ACE_TRY_ENV);
- ACE_CHECK;
- this->flow_consumer_->destroy (ACE_TRY_ENV);
+ ACE_TRY
+ {
+ FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
+ ();
+ for (FlowProducer_SetItor producer_end =
+ this->flow_producer_set_.end ();
+ producer_begin != producer_end; ++producer_begin)
+ {
+ (*producer_begin)->start (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
+ ();
+ for (FlowConsumer_SetItor consumer_end =
+ this->flow_consumer_set_.end ();
+ consumer_begin != consumer_end; ++consumer_begin)
+ {
+ (*consumer_begin)->start (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::destroy");
+ return;
+ }
+ ACE_ENDTRY;
ACE_CHECK;
int result = deactivate_servant (this);
if (result < 0)
@@ -2940,10 +3144,26 @@ TAO_FlowConnection::use_flow_protocol (const char * fp_name,
{
this->fp_name_ = fp_name;
this->fp_settings_ = fp_settings;
- this->flow_producer_->use_flow_protocol (fp_name,fp_settings,ACE_TRY_ENV);
- ACE_CHECK_RETURN (0);
- this->flow_consumer_->use_flow_protocol (fp_name,fp_settings,ACE_TRY_ENV);
- ACE_CHECK_RETURN (0);
+ FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
+ ();
+ for (FlowProducer_SetItor producer_end =
+ this->flow_producer_set_.end ();
+ producer_begin != producer_end; ++producer_begin)
+ {
+ (*producer_begin)->use_flow_protocol
+ (fp_name,fp_settings,ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ }
+ FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
+ ();
+ for (FlowConsumer_SetItor consumer_end =
+ this->flow_consumer_set_.end ();
+ consumer_begin != consumer_end; ++consumer_begin)
+ {
+ (*consumer_begin)->use_flow_protocol
+ (fp_name,fp_settings,ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ }
return 1;
}
@@ -3005,8 +3225,8 @@ TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
// connect the producer and the consumer
CORBA::Boolean
-TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr flow_producer,
- AVStreams::FlowConsumer_ptr flow_consumer,
+TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
+ AVStreams::FlowConsumer_ptr consumer,
AVStreams::QoS & the_qos,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
@@ -3016,35 +3236,39 @@ TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr flow_producer,
{
ACE_TRY
{
- this->flow_producer_ = AVStreams::FlowProducer::_duplicate (flow_producer);
- this->flow_consumer_ = AVStreams::FlowConsumer::_duplicate (flow_consumer);
+ AVStreams::FlowProducer_ptr flow_producer =
+ AVStreams::FlowProducer::_duplicate (producer);
+ AVStreams::FlowConsumer_ptr flow_consumer =
+ AVStreams::FlowConsumer::_duplicate (consumer);
+ this->flow_producer_set_.insert (flow_producer);
+ this->flow_consumer_set_.insert (flow_consumer);
AVStreams::FlowConnection_var flowconnection =
this->_this (ACE_TRY_ENV);
- this->flow_producer_->set_peer (flowconnection.in (),
- this->flow_consumer_.in (),
- the_qos,
- ACE_TRY_ENV);
+ flow_producer->set_peer (flowconnection.in (),
+ flow_consumer,
+ the_qos,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
- this->flow_consumer_->set_peer (flowconnection.in (),
- this->flow_producer_.in (),
- the_qos,
- ACE_TRY_ENV);
+ flow_consumer->set_peer (flowconnection.in (),
+ flow_producer,
+ the_qos,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
char *consumer_address =
- this->flow_consumer_->go_to_listen (the_qos,
- 0,// false for is_mcast
- this->flow_producer_.in (),
- this->fp_name_.inout (),
- ACE_TRY_ENV);
+ flow_consumer->go_to_listen (the_qos,
+ 0,// false for is_mcast
+ flow_producer,
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
- this->flow_producer_->connect_to_peer (the_qos,
- consumer_address,
- this->fp_name_.inout (),
- ACE_TRY_ENV);
+ flow_producer->connect_to_peer (the_qos,
+ consumer_address,
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
}
ACE_CATCHANY
@@ -3066,7 +3290,7 @@ TAO_FlowConnection::disconnect (CORBA::Environment &ACE_TRY_ENV)
}
CORBA::Boolean
-TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr flow_producer,
+TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
AVStreams::QoS & the_qos,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
@@ -3075,25 +3299,49 @@ TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr flow_producer,
{
ACE_TRY
{
- this->flow_producer_ = AVStreams::FlowProducer::_duplicate (flow_producer);
- CORBA::Boolean met_qos;
- char *mcast_address = "";
- char *address = this->flow_producer_->connect_mcast (the_qos,
- met_qos,
- mcast_address,
- this->fp_name_.in (),
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- TAO_Forward_FlowSpec_Entry entry ("","","","",address);
- if (entry.address () != 0)
+ if (this->producer_address_.in () == 0)
{
- // Internet multicasting is in use.
- this->producer_address_ = address;
- }
- else
- {
- // ATM Multicasting is in use.
- this->ip_multicast_ = 0;
+ AVStreams::FlowProducer_ptr flow_producer =
+ AVStreams::FlowProducer::_duplicate (producer);
+ this->flow_producer_set_.insert (flow_producer);
+ CORBA::Boolean met_qos;
+ ACE_INET_Addr mcast_addr (this->mcast_port_,
+ this->mcast_addr_);
+ char mcast_address[BUFSIZ];
+ mcast_addr.addr_to_string (mcast_address,BUFSIZ);
+ char *address = flow_producer->connect_mcast (the_qos,
+ met_qos,
+ mcast_address,
+ this->fp_name_.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ TAO_Forward_FlowSpec_Entry entry ("","","","",address);
+ if (entry.address () != 0)
+ {
+ // Internet multicasting is in use.
+ this->producer_address_ = address;
+ }
+ else
+ {
+ // ATM Multicasting is in use.
+ this->ip_multicast_ = 0;
+ }
+ // set the multicast peer.
+ if (CORBA::is_nil (this->mcastconfigif_.in ()))
+ {
+ ACE_NEW_RETURN (this->mcastconfigif_i_,
+ TAO_MCastConfigIf,
+ 0);
+ this->mcastconfigif_ = this->mcastconfigif_i_->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ AVStreams::FlowConnection_var flowconnection = this->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ flow_producer->set_Mcast_peer (flowconnection.in (),
+ this->mcastconfigif_,
+ the_qos,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
}
}
ACE_CATCHANY
@@ -3107,7 +3355,7 @@ TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr flow_producer,
}
CORBA::Boolean
-TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr flow_consumer,
+TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
AVStreams::QoS & the_qos,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
@@ -3115,40 +3363,63 @@ TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr flow_consumer,
{
ACE_TRY
{
- this->flow_consumer_ = AVStreams::FlowConsumer::_duplicate (flow_consumer);
+ AVStreams::FlowConsumer_ptr flow_consumer =
+ AVStreams::FlowConsumer::_duplicate (consumer);
+ this->flow_consumer_set_.insert (flow_consumer);
+ FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
+ // @@Lets take that the first entry as the only producer. We're
+ // not sure if we can have multiple flow producers in a
+ // flowconnection.
+ AVStreams::FlowProducer_ptr flow_producer = (*begin);
+
AVStreams::protocolSpec protocols (1);
protocols.length (1);
protocols [0] = CORBA::string_dup (this->producer_address_);
if (!this->ip_multicast_)
{
- this->flow_consumer_->set_protocol_restriction (protocols,
+ flow_consumer->set_protocol_restriction (protocols,
ACE_TRY_ENV);
ACE_TRY_CHECK;
char * address =
- this->flow_consumer_->go_to_listen (the_qos,
- 1,
- this->flow_producer_.in (),
- this->fp_name_.inout (),
- ACE_TRY_ENV);
+ flow_consumer->go_to_listen (the_qos,
+ 1,
+ flow_producer,
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
CORBA::Boolean is_met;
- this->flow_producer_->connect_mcast (the_qos,
- is_met,
- address,
- this->fp_name_.inout (),
- ACE_TRY_ENV);
+ flow_producer->connect_mcast (the_qos,
+ is_met,
+ address,
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
}
else
{
+ // The spec says go_to_listen is called with the multicast
+ // address returned from the connect_mcast call called
+ // during add_producer. But go_to_listen doesn't have a
+ // address parameter. I guess it should be connect_to_peer.
// IP Multicasting.
- this->flow_consumer_->go_to_listen (the_qos,
- 1,
- this->flow_producer_.in (),
- this->fp_name_.inout (),
- ACE_TRY_ENV);
+ flow_consumer->connect_to_peer (the_qos,
+ this->producer_address_.in (),
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
}
+ if (CORBA::is_nil (this->mcastconfigif_.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"),0);
+ // @@ Is this the right place to do set_peer?
+ AVStreams::flowSpec flow_spec;
+ AVStreams::streamQoS stream_qos (1);
+ stream_qos.length (1);
+ stream_qos [0] = the_qos;
+ this->mcastconfigif_->set_peer (flow_consumer,
+ stream_qos,
+ flow_spec,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
}
ACE_CATCHANY
{
@@ -3552,7 +3823,7 @@ TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr /* the_fc */,
ACE_THROW_SPEC ((CORBA::SystemException,
AVStreams::QoSRequestFailed))
{
- this->mcast_peer_ = mcast_peer;
+ this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
return 0;
}
@@ -3695,16 +3966,40 @@ TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
char *
TAO_FlowProducer::connect_mcast (AVStreams::QoS & /* the_qos */,
CORBA::Boolean_out /* is_met */,
- const char * /* address */,
- const char * /* use_flow_protocol */,
- CORBA::Environment &/* ACE_TRY_ENV */)
+ const char *address,
+ const char * use_flow_protocol,
+ CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
AVStreams::failedToConnect,
AVStreams::notSupported,
AVStreams::FPError,
AVStreams::QoSRequestFailed))
{
- return 0;
+ // Depending on our available protocols we can create one
+ // of the multicast protocol objects. Right now we'll create
+ // only UDP multicast addresses.
+ // The address variable gives the multicast address to subscribe to.
+ for (u_int i=0;i<this->protocols_.length ();i++)
+ {
+ // choose the protocol which supports multicast.
+ }
+ char full_address [BUFSIZ];
+ ACE_OS::sprintf (full_address,"%s=%s","UDP",address);
+ TAO_Forward_FlowSpec_Entry entry (this->flowname_.in (),
+ "",
+ this->format_.in (),
+ use_flow_protocol,
+ full_address);
+ TAO_AV_FlowSpecSet mcast_set;
+ mcast_set.insert (&entry);
+ TAO_AV_Acceptor_Registry *acceptor_registry =
+ TAO_AV_CORE::instance ()->acceptor_registry ();
+ int result = acceptor_registry->open (this,
+ TAO_AV_CORE::instance (),
+ mcast_set);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"),0);
+ return CORBA::string_dup (full_address);
}
// gets the reverse channel for feedback.
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
index df497142d30..abcbf9e302b 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
+++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
@@ -754,6 +754,7 @@ protected:
TAO_AV_FlowSpecSet reverse_flow_spec_set;
AVStreams::StreamEndPoint_var peer_sep_;
AVStreams::SFPStatus *sfp_status_;
+ AVStreams::StreamCtrl_var streamctrl_;
};
@@ -1141,11 +1142,15 @@ public:
AVStreams::notConnected));
// drops a flow endpoint from the flow.
+ int set_mcast_addr (ACE_UINT32 addr,u_short port);
protected:
- AVStreams::FlowProducer_var flow_producer_;
- // The producer of this flow.
- AVStreams::FlowConsumer_var flow_consumer_;
- // The consumer of this flow
+ typedef ACE_Unbounded_Set<AVStreams::FlowProducer_ptr> FlowProducer_Set;
+ typedef ACE_Unbounded_Set_Iterator<AVStreams::FlowProducer_ptr> FlowProducer_SetItor;
+ typedef ACE_Unbounded_Set<AVStreams::FlowConsumer_ptr> FlowConsumer_Set;
+ typedef ACE_Unbounded_Set_Iterator<AVStreams::FlowConsumer_ptr> FlowConsumer_SetItor;
+
+ FlowProducer_Set flow_producer_set_;
+ FlowConsumer_Set flow_consumer_set_;
CORBA::String_var fp_name_;
CORBA::Any fp_settings_;
CORBA::String_var producer_address_;
@@ -1153,6 +1158,10 @@ protected:
int ip_multicast_;
// IP Multicasting is used.
+ TAO_MCastConfigIf *mcastconfigif_i_;
+ AVStreams::MCastConfigIf_var mcastconfigif_;
+ u_short mcast_port_;
+ ACE_UINT32 mcast_addr_;
};
class TAO_ORBSVCS_Export TAO_FlowEndPoint :
diff --git a/TAO/orbsvcs/orbsvcs/AV/MCast.cpp b/TAO/orbsvcs/orbsvcs/AV/MCast.cpp
index e83bbf0b08d..aee6c4e8147 100644
--- a/TAO/orbsvcs/orbsvcs/AV/MCast.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/MCast.cpp
@@ -187,7 +187,6 @@ TAO_AV_UDP_MCast_Flow_Handler::TAO_AV_UDP_MCast_Flow_Handler (TAO_AV_Callback *c
{
ACE_NEW (transport_,
TAO_AV_UDP_MCast_Transport (this));
-
}
ACE_HANDLE