summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-07-12 01:06:00 +0000
committernaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-07-12 01:06:00 +0000
commit0fa7172c89be4041d90519a662c7586f8a719b23 (patch)
treec2665e4ff5ca4d9a0258e7cd56d5a8fd9b853702
parentc9197610145cf40e3ef86c85f4d9b9d722bf9fa4 (diff)
downloadATCD-0fa7172c89be4041d90519a662c7586f8a719b23.tar.gz
Added code for the FlowEndPoint to make use of the Pluggable Data Protocols.
Also fixed a few bugs in the full profile TAO_StreamCtrl::bind and other full profile classes like TAO_FlowConnection. Now the AV application can make use of FDev in MMDevices for each flow and automagically the appropriate protocols will be chosen depending on the common protocol between 2 flow endpoints.
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp1254
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h231
2 files changed, 1030 insertions, 455 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
index a5203aa47c4..45b10888710 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
@@ -64,6 +64,8 @@ get_flowname (const char *flow_spec_entry_str)
ACE_CString flow_name;
if (slash_pos != flow_spec_entry.npos)
flow_name = flow_spec_entry.substring (0,slash_pos);
+ else
+ flow_name = flow_spec_entry_str;
return CORBA::string_dup (flow_name.c_str ());
}
@@ -81,15 +83,7 @@ AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
// Constructor
TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
- :vdev_a_ (AVStreams::VDev::_nil ()),
- vdev_b_ (AVStreams::VDev::_nil ()),
- sep_a_ (AVStreams::StreamEndPoint_A::_nil ()),
- sep_b_ (AVStreams::StreamEndPoint_B::_nil ()),
- flow_count_ (0)
-{
-}
-
-TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
+ :flow_count_ (0)
{
}
@@ -112,7 +106,7 @@ TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec,
flow_connection_entry->int_id_->stop (ACE_TRY_ENV);
}
- if (CORBA::is_nil (this->sep_a_))
+ if (CORBA::is_nil (this->sep_a_.in ()))
return;
// Make the upcall into the application
@@ -138,7 +132,7 @@ TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec,
flow_connection_entry->int_id_->start (ACE_TRY_ENV);
}
- if (CORBA::is_nil (this->sep_a_))
+ if (CORBA::is_nil (this->sep_a_.in ()))
return;
// Make the upcall into the application
@@ -165,7 +159,7 @@ TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec,
flow_connection_entry->int_id_->destroy (ACE_TRY_ENV);
}
- if (CORBA::is_nil (this->sep_a_))
+ if (CORBA::is_nil (this->sep_a_.in ()))
return;
// Make the upcall into the application
@@ -220,7 +214,7 @@ TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
AVStreams::FPError))
{
- if (!CORBA::is_nil (this->sep_a_))
+ if (!CORBA::is_nil (this->sep_a_.in ()))
{
this->sep_a_->set_FPStatus (flow_spec,fp_name,fp_settings,ACE_TRY_ENV);
ACE_CHECK;
@@ -238,7 +232,7 @@ TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name,
TAO_String_Hash_Key flow_name_key (flow_name);
FlowConnection_Map::ENTRY *flow_connection_entry = 0;
if (this->flow_connection_map_.find (flow_name_key,flow_connection_entry) == 0)
- return flow_connection_entry->int_id_;
+ return AVStreams::FlowConnection::_duplicate (flow_connection_entry->int_id_);
else
ACE_THROW_RETURN (AVStreams::noSuchFlow (),CORBA::Object::_nil ());
}
@@ -266,12 +260,20 @@ TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
ACE_ENDTRY;
ACE_CHECK;
// add the flowname and the flowconnection to the hashtable.
+ this->flows_.length (this->flow_count_ + 1);
this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
TAO_String_Hash_Key flow_name_key (flow_name);
if (this->flow_connection_map_.bind (flow_name_key,flow_connection) != 0)
ACE_THROW (AVStreams::noSuchFlow ());// is this right?
}
+TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
+{
+ FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
+ FlowConnection_Map_Entry *entry = 0;
+ for (;iterator.next (entry) != 0;iterator.advance ())
+ CORBA::release (entry->int_id_);
+}
// ----------------------------------------------------------------------
// TAO_Negotiator
@@ -389,13 +391,25 @@ MMDevice_Map_Hash_Key::hash (void) const
// ----------------------------------------------------------------------
TAO_StreamCtrl::TAO_StreamCtrl (void)
- :mcastconfigif_ (0),
- mcastconfigif_ptr_ (0)
+ :mcastconfigif_ (0)
{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ this->streamctrl_ = this->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamCtrl::TAO_StreamCtrl");
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
}
TAO_StreamCtrl::~TAO_StreamCtrl (void)
{
+ delete this->mcastconfigif_;
}
// request the two MMDevices to create vdev and stream endpoints. save
@@ -444,8 +458,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
else
{
this->sep_a_ =
- a_party-> create_A (this->_this (ACE_TRY_ENV),
- this->vdev_a_,
+ a_party-> create_A (this->streamctrl_.in (),
+ this->vdev_a_.out (),
the_qos,
met_qos,
named_vdev.inout (),
@@ -457,8 +471,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
// add the mmdevice,sep and vdev to the map.
MMDevice_Map_Entry map_entry;
MMDevice_Map_Hash_Key key (a_party);
- map_entry.sep_ = this->sep_a_;
- map_entry.vdev_ = this->vdev_a_;
+ map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
+ map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
map_entry.flowspec_ = the_flows;
map_entry.qos_ = the_qos;
result =
@@ -484,8 +498,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
{
this->sep_b_ =
- b_party-> create_B (this->_this (ACE_TRY_ENV),
- this->vdev_b_,
+ b_party-> create_B (this->streamctrl_.in (),
+ this->vdev_b_.out (),
the_qos,
met_qos,
named_vdev.inout (),
@@ -497,14 +511,14 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
ACE_DEBUG ((LM_DEBUG,
"\n(%P|%t)stream_endpoint_b_ = %s",
- TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_,
+ TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in (),
ACE_TRY_ENV)));
ACE_TRY_CHECK;
// add the mmdevice,sep and vdev to the map.
MMDevice_Map_Entry map_entry;
- MMDevice_Map_Hash_Key key (a_party);
- map_entry.sep_ = this->sep_a_;
- map_entry.vdev_ = this->vdev_a_;
+ MMDevice_Map_Hash_Key key (b_party);
+ map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
+ map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
map_entry.flowspec_ = the_flows;
map_entry.qos_ = the_qos;
int result =
@@ -525,8 +539,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
ACE_TRY_CHECK;
}
// Multicast source being added.
- CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->_this (ACE_TRY_ENV),
- this->mcastconfigif_ptr_,
+ CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
+ this->mcastconfigif_ptr_.in (),
the_qos,
the_flows,
ACE_TRY_ENV);
@@ -539,7 +553,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
// Multicast sink being added.
if (this->mcastconfigif_ != 0)
ACE_ERROR_RETURN ((LM_ERROR,"first add a source and then a sink\n"),0);
- this->mcastconfigif_->set_peer (this->vdev_b_,
+ this->mcastconfigif_->set_peer (this->vdev_b_.in (),
the_qos,
the_flows,
ACE_TRY_ENV);
@@ -548,7 +562,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
int connect_leaf_success = 0;
ACE_TRY_EX (connect_leaf)
{
- connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_,
+ connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
the_qos,
the_flows,
ACE_TRY_ENV);
@@ -578,18 +592,18 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
{
- if (!CORBA::is_nil (this->vdev_a_) && !CORBA::is_nil (this->vdev_b_))
+ if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
{
// Tell the 2 VDev's about one another
- this->vdev_a_->set_peer (this->_this (ACE_TRY_ENV),
- this->vdev_b_,
+ this->vdev_a_->set_peer (this->streamctrl_.in (),
+ this->vdev_b_.in (),
the_qos,
the_flows,
ACE_TRY_ENV);
ACE_TRY_CHECK;
- this->vdev_b_->set_peer (this->_this (ACE_TRY_ENV),
- this->vdev_a_,
+ this->vdev_b_->set_peer (this->streamctrl_.in (),
+ this->vdev_a_.in (),
the_qos,
the_flows,
ACE_TRY_ENV);
@@ -599,7 +613,7 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
// Now connect the streams together. This will
// establish the connection
CORBA::Boolean result =
- this->sep_a_->connect (this->sep_b_,
+ this->sep_a_->connect (this->sep_b_.in (),
the_qos,
the_flows,
ACE_TRY_ENV);
@@ -611,8 +625,8 @@ TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
{
// Its full profile
// we have feps in the sep then dont call connect instead call bind on the streamctrl.
- this->bind (this->sep_a_,
- this->sep_b_,
+ this->bind (this->sep_a_.in (),
+ this->sep_b_.in (),
the_qos,
the_flows,
ACE_TRY_ENV);
@@ -656,17 +670,32 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
"a_party or b_party null!"),
0);
+ // Define each other as their peers.
+ CORBA::Any sep_any;
+ sep_any <<= sep_b;
+ sep_a->define_property ("PeerAdapter",
+ sep_any,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ sep_any <<= sep_a;
+ sep_b->define_property ("PeerAdapter",
+ sep_any,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
// since its full profile we do the viable stream setup algorithm.
// get the flows for the A streamendpoint.
// the flows spec is empty and hence we do a exhaustive match.
- AVStreams::flowSpec *a_flows = 0,*b_flows = 0;
- CORBA::Any_ptr flows_any;
+ AVStreams::flowSpec a_flows,b_flows;
+ CORBA::Any_var flows_any;
flows_any = sep_a->get_property_value ("Flows",ACE_TRY_ENV);
ACE_TRY_CHECK;
- *flows_any >>= a_flows;
+ AVStreams::flowSpec_ptr temp_flows;
+ flows_any.in () >>= temp_flows;
+ a_flows = *temp_flows;
flows_any = sep_b->get_property_value ("Flows",ACE_TRY_ENV);
ACE_TRY_CHECK;
- *flows_any >>= b_flows;
+ flows_any.in () >>= temp_flows;
+ b_flows = *temp_flows;
u_int i;
FlowEndPoint_Map *a_fep_map;
FlowEndPoint_Map *b_fep_map;
@@ -676,17 +705,17 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
ACE_NEW_RETURN (b_fep_map,
FlowEndPoint_Map,
0);
- for (i=0;i<a_flows->length ();i++)
+ for (i=0;i<a_flows.length ();i++)
{
- const char *flowname = (*a_flows)[i];
+ const char *flowname = a_flows[i];
// get the flowendpoint references.
- CORBA::Object_ptr fep_obj;
- fep_obj = sep_a->get_fep (flowname,
- ACE_TRY_ENV);
+ CORBA::Object_var fep_obj =
+ sep_a->get_fep (flowname,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
- AVStreams::FlowEndPoint_ptr fep;
- fep = AVStreams::FlowEndPoint::_narrow (fep_obj,
- ACE_TRY_ENV);
+ AVStreams::FlowEndPoint_ptr fep =
+ AVStreams::FlowEndPoint::_narrow (fep_obj.in (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
TAO_String_Hash_Key fep_key (flowname);
result = a_fep_map->bind (fep_key,fep);
@@ -694,17 +723,17 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind failed for %s",flowname));
}
// get the flowendpoints for streamendpoint_b
- for (i=0;i<b_flows->length ();i++)
+ for (i=0;i<b_flows.length ();i++)
{
- const char *flowname = (*b_flows)[i];
+ const char *flowname = b_flows[i];
// get the flowendpoint references.
- CORBA::Object_ptr fep_obj;
- fep_obj = sep_b->get_fep (flowname,
- ACE_TRY_ENV);
+ CORBA::Object_var fep_obj =
+ sep_b->get_fep (flowname,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
- AVStreams::FlowEndPoint_ptr fep;
- fep = AVStreams::FlowEndPoint::_narrow (fep_obj,
- ACE_TRY_ENV);
+ AVStreams::FlowEndPoint_ptr fep =
+ AVStreams::FlowEndPoint::_narrow (fep_obj.in (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
TAO_String_Hash_Key fep_key (flowname);
result = b_fep_map->bind (fep_key,fep);
@@ -719,26 +748,40 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
}
else
{
- FlowEndPoint_Map *spec_fep_map;
- ACE_NEW_RETURN (spec_fep_map,
+ FlowEndPoint_Map *spec_fep_map_a,*spec_fep_map_b;
+ ACE_NEW_RETURN (spec_fep_map_a,
+ FlowEndPoint_Map,
+ 0);
+ ACE_NEW_RETURN (spec_fep_map_b,
FlowEndPoint_Map,
0);
for (u_int i=0; i< flow_spec.length ();i++)
{
- TAO_String_Hash_Key fep_key (flow_spec [i]);
+ TAO_Forward_FlowSpec_Entry *entry;
+ ACE_NEW_RETURN (entry,
+ TAO_Forward_FlowSpec_Entry,
+ 0);
+ entry->parse (flow_spec[i].in ());
+ TAO_String_Hash_Key fep_key (entry->flowname ());
AVStreams::FlowEndPoint_ptr fep;
result = a_fep_map->find (fep_key,fep);
if (result == -1)
- {
- result = b_fep_map->find (fep_key,fep);
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"Fep not found for flowname: %s",flow_spec[i].in ()),0);
- }
- result = spec_fep_map->bind (fep_key,fep);
+ ACE_ERROR_RETURN ((LM_ERROR,"Fep not found on A side for flowname: %s",flow_spec[i].in ()),0);
+
+ result = spec_fep_map_a->bind (fep_key,fep);
+ if (result == -1)
+ ACE_DEBUG ((LM_DEBUG,"Bind faile for %s",flow_spec[i].in ()));
+
+ result = b_fep_map->find (fep_key,fep);
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"Fep not found on B side for flowname: %s",flow_spec[i].in ()),0);
+
+ result = spec_fep_map_b->bind (fep_key,fep);
if (result == -1)
ACE_DEBUG ((LM_DEBUG,"Bind faile for %s",flow_spec[i].in ()));
}
- map_a = map_b = spec_fep_map;
+ map_a = spec_fep_map_a;
+ map_b = spec_fep_map_b;
}
TAO_AV_QoS qos (stream_qos);
@@ -746,85 +789,115 @@ TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
// uses the first match policy.
FlowEndPoint_Map_Iterator a_feps_iterator (*map_a), b_feps_iterator (*map_b);
FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
- for (;a_feps_iterator.next (a_feps_entry) != 0;a_feps_iterator.advance ())
+ ACE_TRY_EX (flow_connect)
{
- for (;b_feps_iterator.next (b_feps_entry) != 0; b_feps_iterator.advance ())
+
+ for (;a_feps_iterator.next (a_feps_entry) != 0;a_feps_iterator.advance ())
{
- AVStreams::FlowEndPoint_ptr fep_a = a_feps_entry->int_id_;
- AVStreams::FlowEndPoint_ptr fep_b = b_feps_entry->int_id_;
- AVStreams::FlowConnection_ptr flow_connection;
- if (fep_b->get_connected_fep () != 0)
+ for (;b_feps_iterator.next (b_feps_entry) != 0; b_feps_iterator.advance ())
{
- if (fep_a->is_fep_compatible (fep_b,
- ACE_TRY_ENV) == 1)
+ AVStreams::FlowEndPoint_ptr fep_a = a_feps_entry->int_id_;
+ AVStreams::FlowEndPoint_ptr fep_b = b_feps_entry->int_id_;
+ AVStreams::FlowConnection_var flow_connection;
+ if (CORBA::is_nil (fep_b->get_connected_fep ()))
{
- // assume that flow names are same so that we
- // can use either of them.
- CORBA::Object_ptr flow_connection_obj;
-
- if (!CORBA::is_nil ((flow_connection_obj = this->get_flow_connection ((*a_flows)[i],ACE_TRY_ENV))))
- {
- flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj,ACE_TRY_ENV);
- ACE_TRY_CHECK;
- }
- else
- {
- TAO_FlowConnection *flowConnection;
- ACE_NEW_RETURN (flowConnection,
- TAO_FlowConnection,
- 0);
- flow_connection = flowConnection->_this (ACE_TRY_ENV);
- ACE_TRY_CHECK;
- }
- // make sure that a_feps is flow_producer and b_feps is flow_consumer
- // There should be a way to find which flow endpoint is producer and which is consumer.
- AVStreams::FlowProducer_ptr producer = AVStreams::FlowProducer::_nil ();
- AVStreams::FlowConsumer_ptr consumer = AVStreams::FlowConsumer::_nil ();
- ACE_TRY_EX (producer_check)
- {
- producer = AVStreams::FlowProducer::_narrow (fep_a,ACE_TRY_ENV);
- ACE_TRY_CHECK_EX (producer_check);
- consumer =
- AVStreams::FlowConsumer::_narrow (fep_b,ACE_TRY_ENV);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
+ if (fep_a->is_fep_compatible (fep_b,
+ ACE_TRY_ENV) == 1)
{
- producer = AVStreams::FlowProducer::_narrow (fep_b,ACE_TRY_ENV);
- ACE_TRY_CHECK;
- consumer = AVStreams::FlowConsumer::_narrow (fep_a,ACE_TRY_ENV);
- ACE_TRY_CHECK
- }
- ACE_ENDTRY;
- ACE_CHECK_RETURN (0);
- CORBA::Any_ptr flowname_any;
- char *fep_a_name,*fep_b_name;
- flowname_any = fep_a->get_property_value ("FlowName",ACE_TRY_ENV);
- *flowname_any >>= fep_a_name;
- flowname_any = fep_b->get_property_value ("FlowName",ACE_TRY_ENV);
- *flowname_any >>= fep_b_name;
- AVStreams::QoS flow_qos;
- flow_qos.QoSType = fep_a_name;
- flow_qos.QoSParams.length (0);
- result = qos.get_flow_qos (fep_a_name,flow_qos);
- if (result == -1)
- {
- flow_qos.QoSType = fep_b_name;
- result = qos.get_flow_qos (fep_b_name,flow_qos);
+ ACE_TRY_CHECK_EX (flow_connect);
+ // assume that flow names are same so that we
+ // can use either of them.
+ CORBA::Object_var flow_connection_obj;
+ CORBA::Any_var flowname_any = fep_a->get_property_value ("Flow",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connect);
+ char *flowname = 0;
+ flowname_any.in () >>= flowname;
+ ACE_TRY_EX (flow_connection)
+ {
+ flow_connection_obj = this->get_flow_connection (flowname,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connection);
+ flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connection);
+ }
+ ACE_CATCHANY
+ {
+ TAO_FlowConnection *flowConnection;
+ ACE_NEW_RETURN (flowConnection,
+ TAO_FlowConnection,
+ 0);
+ flow_connection = flowConnection->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connect);
+ this->set_flow_connection (flowname,
+ flow_connection,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connect);
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
+
+ // make sure that a_feps is flow_producer and b_feps is flow_consumer
+ // There should be a way to find which flow endpoint is producer and which is consumer.
+ AVStreams::FlowProducer_var producer = AVStreams::FlowProducer::_nil ();
+ AVStreams::FlowConsumer_var consumer = AVStreams::FlowConsumer::_nil ();
+
+ ACE_TRY_EX (producer_check)
+ {
+ producer = AVStreams::FlowProducer::_narrow (fep_a,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (producer_check);
+ consumer =
+ AVStreams::FlowConsumer::_narrow (fep_b,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (producer_check);
+ }
+ ACE_CATCHANY
+ {
+ producer = AVStreams::FlowProducer::_narrow (fep_b,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connect);
+ consumer = AVStreams::FlowConsumer::_narrow (fep_a,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connect);
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
+ CORBA::String_var fep_a_name,fep_b_name;
+ CORBA::String temp_name;
+ flowname_any = fep_a->get_property_value ("FlowName",ACE_TRY_ENV);
+ flowname_any.in () >>= temp_name;
+ fep_a_name = CORBA::string_dup (temp_name);
+ flowname_any = fep_b->get_property_value ("FlowName",ACE_TRY_ENV);
+ flowname_any.in () >>= temp_name;
+ fep_b_name = CORBA::string_dup (temp_name);
+ AVStreams::QoS flow_qos;
+ flow_qos.QoSType = fep_a_name;
+ flow_qos.QoSParams.length (0);
+ result = qos.get_flow_qos (fep_a_name.in (),flow_qos);
if (result == -1)
- ACE_DEBUG ((LM_DEBUG,"No QoS Specified for this flow"));
+ {
+ flow_qos.QoSType = fep_b_name;
+ result = qos.get_flow_qos (fep_b_name.in (),flow_qos);
+ if (result == -1)
+ ACE_DEBUG ((LM_DEBUG,"No QoS Specified for this flow"));
+ }
+ flow_connection->connect (producer,consumer,flow_qos,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow_connect);
}
- flow_connection->connect (producer,consumer,flow_qos,ACE_TRY_ENV);
}
}
}
}
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamCtrl::bind:flow_connect block");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
}
ACE_CATCHANY
{
// error was thrown because one of the streamendpoints is light profile.
// Now connect the streams together
- this->sep_a_->connect (this->sep_b_,
+ this->sep_a_->connect (this->sep_b_.in (),
stream_qos,
flow_spec,
ACE_TRY_ENV);
@@ -879,8 +952,8 @@ TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
if (result < 0)
return AVStreams::VDev::_nil ();
}
- sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_);
- return AVStreams::VDev::_duplicate (entry.vdev_);
+ sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
+ return AVStreams::VDev::_duplicate (entry.vdev_.in ());
}
CORBA::Boolean
@@ -964,7 +1037,7 @@ TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configurati
}
ACE_CATCHANY
{
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_MCastConfigIf::set_format");
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_MCastConfigIf::set_configure");
return;
}
ACE_ENDTRY;
@@ -1101,30 +1174,27 @@ TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &the_spec,
return 0;
}
-// // factory method to make a new TCP flow handler.
-// int
-// TAO_Base_StreamEndPoint::make_tcp_flow_handler (TAO_AV_TCP_Flow_Handler *&sh)
-// {
-// // ACE_NEW_RETURN (sh,
-// // TAO_AV_TCP_Flow_Handler,
-// // -1);
-// return 0;
-// }
-
-// // factory method to make a new TCP flow handler.
-// int
-// TAO_Base_StreamEndPoint::make_udp_flow_handler (TAO_AV_UDP_Flow_Handler *&sh)
-// {
-// return 0;
-// }
-
-// int
-// TAO_Base_StreamEndPoint::make_dgram_mcast_flow_handler (TAO_AV_UDP_MCast_Flow_Handler *&sh)
-// {
-// ACE_NEW_RETURN (sh,
-// TAO_AV_UDP_MCast_Flow_Handler,
-// -1);
-// }
+// The following function is for backward compatibility.
+CORBA::Boolean
+TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
+{
+ return 1;
+}
+
+// The following function is for backward compatibility.
+CORBA::Boolean
+TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
+{
+ return 1;
+}
+
+// The following function is for backward compatibility.
+CORBA::Boolean
+TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &,
+ CORBA::Environment &)
+{
+ return 1;
+}
int
TAO_Base_StreamEndPoint::set_protocol_object (const char *flowname,
@@ -1147,8 +1217,8 @@ TAO_Base_StreamEndPoint::get_callback (const char *flowname,
// constructor.
TAO_StreamEndPoint::TAO_StreamEndPoint (void)
- :flow_count_ (1),
- negotiator_ (AVStreams::Negotiator::_nil ()),
+ :flow_count_ (0),
+ flow_num_ (0),
mcast_port_ (ACE_DEFAULT_MULTICAST_PORT)
{
this->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR);
@@ -1156,27 +1226,6 @@ TAO_StreamEndPoint::TAO_StreamEndPoint (void)
}
-// The following function is for backward compatibility.
-CORBA::Boolean
-TAO_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
-{
- return 1;
-}
-
-// The following function is for backward compatibility.
-CORBA::Boolean
-TAO_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
-{
- return 1;
-}
-
-// The following function is for backward compatibility.
-CORBA::Boolean
-TAO_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &,
- CORBA::Environment &)
-{
- return 1;
-}
CORBA::Boolean
TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
@@ -1185,20 +1234,22 @@ TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
CORBA::Environment &ACE_TRY_ENV)
{
CORBA::Boolean retv = 0;
- this->peer_sep_ = responder;
+ this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
ACE_TRY_EX (negotiate)
{
- if (!CORBA::is_nil (this->negotiator_))
+ if (!CORBA::is_nil (this->negotiator_.in ()))
{
- CORBA::Any_ptr negotiator_any = responder->get_property_value ("Negotiator");
+ CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
if (negotiator_any != 0)
{
AVStreams::Negotiator_ptr peer_negotiator;
- *negotiator_any >>= peer_negotiator;
+ negotiator_any.in () >>= peer_negotiator;
if (!CORBA::is_nil (peer_negotiator))
{
CORBA::Boolean result =
- this->negotiator_->negotiate (peer_negotiator,qos,ACE_TRY_ENV);
+ this->negotiator_->negotiate (peer_negotiator,
+ qos,
+ ACE_TRY_ENV);
ACE_TRY_CHECK_EX (negotiate);
if (!result)
ACE_DEBUG ((LM_DEBUG,"TAO_StreamEndPoint::Connect (): negotiate failed\n"));
@@ -1216,25 +1267,25 @@ TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
ACE_TRY_EX (available_protocols)
{
// choose protocols based on what the remote endpoint can support.
- CORBA::Any_ptr protocols_any =
+ CORBA::Any_var protocols_any =
responder->get_property_value ("AvailableProtocols",ACE_TRY_ENV);
ACE_TRY_CHECK_EX (available_protocols);
- if (protocols_any != 0)
+ AVStreams::protocolSpec peer_protocols;
+ AVStreams::protocolSpec_ptr temp_protocols;
+ protocols_any.in () >>= temp_protocols;
+ peer_protocols = *temp_protocols;
+ for (u_int i=0;i<peer_protocols.length ();i++)
{
- AVStreams::protocolSpec *peer_protocols;
- *protocols_any >>= peer_protocols;
- for (u_int i=0;i<peer_protocols->length ();i++)
- {
- if (this->protocol_ != 0)
- break;
- for (u_int j=0;j<this->protocols_.length ();j++)
- if (ACE_OS::strcmp ((*peer_protocols)[i],this->protocols_[j]) == 0)
- {
+ if (this->protocol_ != 0)
+ break;
+ for (u_int j=0;j<this->protocols_.length ();j++)
+ if (ACE_OS::strcmp (peer_protocols [i],
+ this->protocols_[j]) == 0)
+ {
// we'll agree upon the first protocol that matches.
- this->protocol_ = CORBA::string_copy ((*peer_protocols) [i]);
- break;
- }
- }
+ this->protocol_ = CORBA::string_copy (peer_protocols [i]);
+ break;
+ }
}
}
ACE_CATCHANY
@@ -1277,7 +1328,9 @@ TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_forward_flows failed\n"),0);
- retv = responder->request_connection (this->_this (ACE_TRY_ENV),
+ AVStreams::StreamEndPoint_var streamendpoint = this->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ retv = responder->request_connection (streamendpoint,
0,
network_qos,
flow_spec,
@@ -1557,17 +1610,36 @@ TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj,
{
// exception implies the flow name is not defined and is system generated.
ACE_OS::sprintf (flow_name,"flow%d",flow_num_++);
+ ACE_TRY_EX (flow)
+ {
+ // exception implies the flow name is not defined and is system generated.
+ ACE_OS::sprintf (flow_name,"flow%d",flow_num_++);
+ CORBA::Any flowname_any;
+ flowname_any <<= flow_name;
+ fep->define_property ("Flow",flowname_any,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow);
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint::add_fep");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
}
ACE_ENDTRY;
- // Add it to the sequence of flowNames supported.
- // put the flowname and the flowendpoint in a hashtable.
- TAO_String_Hash_Key fep_name_key (flow_name);
- if (this->fep_map_.bind (fep_name_key,fep) != 0)
- {
- ACE_THROW_RETURN (AVStreams::streamOpFailed (),0);
- }
- ACE_TRY_EX (flows)
+ ACE_CHECK_RETURN (0);
+ ACE_TRY
{
+ fep->lock (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ // Add it to the sequence of flowNames supported.
+ // put the flowname and the flowendpoint in a hashtable.
+ TAO_String_Hash_Key fep_name_key (CORBA::string_dup (flow_name));
+ if (this->fep_map_.bind (fep_name_key,fep) != 0)
+ {
+ ACE_THROW_RETURN (AVStreams::streamOpFailed (),0);
+ }
// increment the flow count.
this->flow_count_++;
this->flows_.length (this->flow_count_);
@@ -1578,7 +1650,7 @@ TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj,
this->define_property ("Flows",
flows_any,
ACE_TRY_ENV);
- ACE_TRY_CHECK_EX (flows);
+ ACE_TRY_CHECK;
}
ACE_CATCHANY
{
@@ -1698,6 +1770,20 @@ TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &the_qos,
TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
{
//this->handle_close ();
+ TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
+ TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
+ for ( ; begin != end; ++begin)
+ {
+ TAO_FlowSpec_Entry *entry = *begin;
+ delete entry;
+ }
+ begin = this->reverse_flow_spec_set.begin ();
+ end = this->reverse_flow_spec_set.end ();
+ for (; begin != end; ++begin)
+ {
+ TAO_FlowSpec_Entry *entry = *begin;
+ delete entry;
+ }
}
@@ -1716,15 +1802,34 @@ TAO_StreamEndPoint_A::start (const AVStreams::flowSpec &flow_spec,
ACE_THROW_SPEC ((CORBA::SystemException,
AVStreams::noSuchFlow))
{
- TAO_StreamEndPoint::start (flow_spec,ACE_TRY_ENV);
- ACE_CHECK;
- this->peer_sep_->start (flow_spec,ACE_TRY_ENV);
+ ACE_TRY
+ {
+ TAO_StreamEndPoint::start (flow_spec,ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ if (CORBA::is_nil (this->peer_sep_.in ()))
+ {
+ CORBA::Any_var sep_any;
+ sep_any = this->get_property_value ("PeerAdapter",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ AVStreams::StreamEndPoint_B_ptr sep;
+ sep_any.in () >>= sep;
+ this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (sep);
+ }
+ this->peer_sep_->start (flow_spec,ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_StreamEndPoint_A::start");
+ }
+ ACE_ENDTRY;
ACE_CHECK;
}
void
TAO_StreamEndPoint_A::stop (const AVStreams::flowSpec &flow_spec,
- CORBA::Environment &ACE_TRY_ENV)
+ CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
AVStreams::noSuchFlow))
{
@@ -1753,16 +1858,15 @@ TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
{
ACE_TRY_EX (narrow)
{
-
AVStreams::QoS flow_qos;
result = qos.get_flow_qos (forward_entry.flowname (),flow_qos);
if (result < 0)
ACE_DEBUG ((LM_DEBUG,"QoS not found for %s",forward_entry.flowname ()));
// Narrow it to FlowProducer.
- AVStreams::FlowProducer_ptr producer = AVStreams::FlowProducer::_narrow (flow_endpoint,ACE_TRY_ENV);
+ AVStreams::FlowProducer_var producer = AVStreams::FlowProducer::_narrow (flow_endpoint,ACE_TRY_ENV);
ACE_TRY_CHECK_EX (narrow);
// Else narrow succeeeded.
- if (!CORBA::is_nil (producer))
+ if (!CORBA::is_nil (producer.in ()))
{
CORBA::Boolean is_met (0);
CORBA::String_var address =
@@ -1967,21 +2071,18 @@ TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
ACE_TRY_CHECK;
- this->streamctrl_ = the_ctrl;
- this->peer_ = the_peer_dev;
+ this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
+ this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
- CORBA::Any_ptr anyptr;
+ CORBA::Any_var anyptr;
CORBA::String media_ctrl_ior;
anyptr = this->peer_->get_property_value ("Related_MediaCtrl",
ACE_TRY_ENV);
ACE_TRY_CHECK;
- if (anyptr != 0)
- {
- *anyptr >>= media_ctrl_ior;
- ACE_DEBUG ((LM_DEBUG,"(%P|%t)The Media Control IOR is %s\n",
- media_ctrl_ior));
- }
+ anyptr.in () >>= media_ctrl_ior;
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t)The Media Control IOR is %s\n",
+ media_ctrl_ior));
CORBA::Object_ptr media_ctrl_obj =
TAO_ORB_Core_instance ()->orb ()->string_to_object
(media_ctrl_ior,ACE_TRY_ENV);
@@ -2022,7 +2123,7 @@ TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr /* the_ctrl */,
AVStreams::QoSRequestFailed,
AVStreams::streamOpFailed))
{
- this->mcast_peer_ = mcast_peer;
+ this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
return 1;
}
@@ -2125,7 +2226,10 @@ TAO_VDev::~TAO_VDev (void)
TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
- : endpoint_strategy_ (endpoint_strategy)
+ : endpoint_strategy_ (endpoint_strategy),
+ flow_count_ (0),
+ flow_num_ (0),
+ stream_ctrl_ (0)
{
}
@@ -2146,17 +2250,18 @@ TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
ACE_TRY
{
ACE_UNUSED_ARG (is_met);
- TAO_StreamCtrl *stream_ctrl;
- ACE_NEW_RETURN (stream_ctrl,
+ ACE_NEW_RETURN (this->stream_ctrl_,
TAO_StreamCtrl,
0);
- stream_ctrl->bind_devs (peer_device,
- AVStreams::MMDevice::_duplicate (this->_this (ACE_TRY_ENV)),
- the_qos,
- the_spec,
- ACE_TRY_ENV);
+ AVStreams::MMDevice_var mmdevice = this->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->stream_ctrl_->bind_devs (peer_device,
+ mmdevice.in (),
+ the_qos,
+ the_spec,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
- streamctrl = stream_ctrl->_this (ACE_TRY_ENV);
+ streamctrl = this->stream_ctrl_->_this (ACE_TRY_ENV);
ACE_TRY_CHECK;
}
ACE_CATCHANY
@@ -2250,14 +2355,14 @@ TAO_MMDevice::create_A_B (MMDevice_Type type,
forward_entry.parse (flow_spec[i]);
TAO_String_Hash_Key flow_key (forward_entry.flowname ());
AVStreams::FDev_ptr flow_dev;
- AVStreams::FlowConnection_ptr flowconnection = AVStreams::FlowConnection::_nil ();
+ AVStreams::FlowConnection_var flowconnection = AVStreams::FlowConnection::_nil ();
ACE_TRY_EX (flowconnection)
{
// Get the flowconnection for this flow.
CORBA::Object_var flowconnection_obj =
streamctrl->get_flow_connection (forward_entry.flowname (),ACE_TRY_ENV);
ACE_TRY_CHECK_EX (flowconnection);
- if (!CORBA::is_nil (flowconnection_obj))
+ if (!CORBA::is_nil (flowconnection_obj.in ()))
{
flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj,ACE_TRY_ENV);
ACE_TRY_CHECK_EX (flowconnection);
@@ -2274,7 +2379,7 @@ TAO_MMDevice::create_A_B (MMDevice_Type type,
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"fdev_map::find failed\n"),0);
CORBA::String_var named_fdev;
- AVStreams::FlowEndPoint_ptr flow_endpoint = AVStreams::FlowEndPoint::_nil ();
+ AVStreams::FlowEndPoint_var flow_endpoint;
AVStreams::QoS flow_qos;
result = qos.get_flow_qos (forward_entry.flowname (),flow_qos);
if (result < 0)
@@ -2291,7 +2396,7 @@ TAO_MMDevice::create_A_B (MMDevice_Type type,
// hence A is the producer for this flow and B is the consumer for this flow.
// We have to create a producer from the FDev for this flow.
flow_endpoint =
- flow_dev->create_producer (flowconnection,
+ flow_dev->create_producer (flowconnection.in (),
flow_qos,
met_qos,
named_fdev.inout (),
@@ -2301,7 +2406,7 @@ TAO_MMDevice::create_A_B (MMDevice_Type type,
case MMDEVICE_B:
{
flow_endpoint =
- flow_dev->create_consumer (flowconnection,
+ flow_dev->create_consumer (flowconnection.in (),
flow_qos,
met_qos,
named_fdev.inout (),
@@ -2377,7 +2482,7 @@ TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
const AVStreams::flowSpec &flow_spec,
CORBA::Environment &ACE_TRY_ENV)
{
- AVStreams::StreamEndPoint_A_ptr sep_a = AVStreams::StreamEndPoint_A::_nil ();
+ AVStreams::StreamEndPoint_A_ptr sep_a;
AVStreams::StreamEndPoint_ptr sep;
ACE_TRY
{
@@ -2470,17 +2575,32 @@ TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj,
}
ACE_CATCHANY
{
- // exception implies the flow name is not defined and is system generated.
- ACE_OS::sprintf (flow_name,"flow%d",flow_num_++);
+ ACE_TRY_EX (flow)
+ {
+ // exception implies the flow name is not defined and is system generated.
+ ACE_OS::sprintf (flow_name,"flow%d",flow_num_++);
+ CORBA::Any flowname_any;
+ flowname_any <<= flow_name;
+ fdev->define_property ("Flow",flowname_any,ACE_TRY_ENV);
+ ACE_TRY_CHECK_EX (flow);
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_MMDevice::add_fdev");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
}
ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
if (CORBA::is_nil (fdev))
return 0;
// Add it to the sequence of flowNames supported.
// put the flowname and the fdev in a hashtable.
- TAO_String_Hash_Key fdev_name_key (flow_name);
+ TAO_String_Hash_Key fdev_name_key (CORBA::string_dup (flow_name));
if (this->fdev_map_.bind (fdev_name_key,fdev) != 0)
ACE_THROW_RETURN (AVStreams::streamOpFailed (),0);
@@ -2521,7 +2641,7 @@ TAO_MMDevice::get_fdev (const char *flow_name,
TAO_String_Hash_Key fdev_name_key (flow_name);
FDev_Map::ENTRY *fdev_entry = 0;
if (this->fdev_map_.find (fdev_name_key,fdev_entry) == 0)
- return fdev_entry->int_id_;
+ return AVStreams::FDev::_duplicate (fdev_entry->int_id_);
return 0;
}
@@ -2568,6 +2688,11 @@ TAO_MMDevice::remove_fdev (const char *flow_name,
// destructor.
TAO_MMDevice::~TAO_MMDevice (void)
{
+ delete this->stream_ctrl_;
+ FDev_Map_Iterator iterator (fdev_map_);
+ FDev_Map_Entry *entry = 0;
+ for (;iterator.next (entry) != 0; iterator.advance ())
+ CORBA::release (entry->int_id_);
}
//------------------------------------------------------------------
@@ -2576,9 +2701,8 @@ TAO_MMDevice::~TAO_MMDevice (void)
// default constructor.
TAO_FlowConnection::TAO_FlowConnection (void)
- :producer_ (0),
- consumer_ (0),
- fp_name_ (0)
+ :fp_name_ (CORBA::string_dup ("")),
+ ip_multicast_ (1)
{
}
@@ -2587,7 +2711,20 @@ void
TAO_FlowConnection::stop (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- ACE_UNUSED_ARG (ACE_TRY_ENV);
+ ACE_TRY
+ {
+ this->flow_producer_->stop (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->flow_consumer_->stop (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::stop");
+ return;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
}
// start this flow.
@@ -2595,13 +2732,30 @@ void
TAO_FlowConnection::start (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- ACE_UNUSED_ARG (ACE_TRY_ENV);
+ ACE_TRY
+ {
+ this->flow_consumer_->start (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->flow_producer_->start (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::start");
+ return;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
}
// destroy this flow.
void
TAO_FlowConnection::destroy (CORBA::Environment &ACE_TRY_ENV)
{
+ this->flow_producer_->destroy (ACE_TRY_ENV);
+ ACE_CHECK;
+ this->flow_consumer_->destroy (ACE_TRY_ENV);
+ ACE_CHECK;
int result = deactivate_servant (this);
if (result < 0)
ACE_DEBUG ((LM_DEBUG,"TAO_FlowConnection::destroy failed\n"));
@@ -2628,9 +2782,12 @@ TAO_FlowConnection::use_flow_protocol (const char * fp_name,
AVStreams::FPError,
AVStreams::notSupported))
{
- ACE_UNUSED_ARG (fp_settings);
- ACE_UNUSED_ARG (ACE_TRY_ENV);
- this->fp_name_ = (char *)fp_name;
+ this->fp_name_ = fp_name;
+ this->fp_settings_ = fp_settings;
+ this->flow_producer_->use_flow_protocol (fp_name,fp_settings,ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ this->flow_consumer_->use_flow_protocol (fp_name,fp_settings,ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
return 1;
}
@@ -2646,18 +2803,48 @@ TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event,
CORBA::Boolean
TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
AVStreams::FDev_ptr b_party,
- AVStreams::QoS & the_qos,
+ AVStreams::QoS & flow_qos,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
AVStreams::streamOpFailed,
AVStreams::streamOpDenied,
AVStreams::QoSRequestFailed))
{
- ACE_UNUSED_ARG (a_party);
- ACE_UNUSED_ARG (b_party);
- ACE_UNUSED_ARG (the_qos);
- ACE_UNUSED_ARG (ACE_TRY_ENV);
- return 0;
+ CORBA::Boolean result = 0;
+ ACE_TRY
+ {
+ AVStreams::FlowConnection_var flowconnection = this->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ CORBA::Boolean met_qos;
+ CORBA::String_var named_fdev ((const char *)"");
+ AVStreams::FlowProducer_var producer =
+ a_party->create_producer (flowconnection,
+ flow_qos,
+ met_qos,
+ named_fdev.inout (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ AVStreams::FlowConsumer_var consumer =
+ b_party->create_consumer (flowconnection,
+ flow_qos,
+ met_qos,
+ named_fdev.inout (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ result = this->connect (producer.in (),
+ consumer.in (),
+ flow_qos,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::connect_devs");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
+ return result;
}
// connect the producer and the consumer
@@ -2673,33 +2860,35 @@ TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr flow_producer,
{
ACE_TRY
{
- this->producer_ = flow_producer;
- this->consumer_ = flow_consumer;
-
- this->producer_->set_peer (this->_this (ACE_TRY_ENV),
- this->consumer_,
- the_qos,
- ACE_TRY_ENV);
+ this->flow_producer_ = AVStreams::FlowProducer::_duplicate (flow_producer);
+ this->flow_consumer_ = AVStreams::FlowConsumer::_duplicate (flow_consumer);
+
+ AVStreams::FlowConnection_var flowconnection =
+ this->_this (ACE_TRY_ENV);
+ this->flow_producer_->set_peer (flowconnection.in (),
+ this->flow_consumer_.in (),
+ the_qos,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
- this->consumer_->set_peer (this->_this (ACE_TRY_ENV),
- this->producer_,
- the_qos,
- ACE_TRY_ENV);
+ this->flow_consumer_->set_peer (flowconnection.in (),
+ this->flow_producer_.in (),
+ the_qos,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
char *consumer_address =
- this->consumer_->go_to_listen (the_qos,
- 0,// false for is_mcast
- this->producer_,
- this->fp_name_,
- ACE_TRY_ENV);
+ this->flow_consumer_->go_to_listen (the_qos,
+ 0,// false for is_mcast
+ this->flow_producer_.in (),
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
- this->producer_->connect_to_peer (the_qos,
- consumer_address,
- this->fp_name_,
- ACE_TRY_ENV);
+ this->flow_producer_->connect_to_peer (the_qos,
+ consumer_address,
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
}
ACE_CATCHANY
@@ -2728,9 +2917,36 @@ TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr flow_producer,
AVStreams::alreadyConnected,
AVStreams::notSupported))
{
- ACE_UNUSED_ARG (the_qos);
- ACE_UNUSED_ARG (ACE_TRY_ENV);
- this->producer_ = flow_producer;
+ ACE_TRY
+ {
+ this->flow_producer_ = AVStreams::FlowProducer::_duplicate (flow_producer);
+ CORBA::Boolean met_qos;
+ char *mcast_address = "";
+ char *address = this->flow_producer_->connect_mcast (the_qos,
+ met_qos,
+ mcast_address,
+ this->fp_name_.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ TAO_Forward_FlowSpec_Entry entry ("","","","",address);
+ if (entry.address () != 0)
+ {
+ // Internet multicasting is in use.
+ this->producer_address_ = address;
+ }
+ else
+ {
+ // ATM Multicasting is in use.
+ this->ip_multicast_ = 0;
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::add_producer");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
return 1;
}
@@ -2741,9 +2957,50 @@ TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr flow_consumer,
ACE_THROW_SPEC ((CORBA::SystemException,
AVStreams::alreadyConnected))
{
- ACE_UNUSED_ARG (the_qos);
- ACE_UNUSED_ARG (ACE_TRY_ENV);
- this->consumer_ = flow_consumer;
+ ACE_TRY
+ {
+ this->flow_consumer_ = AVStreams::FlowConsumer::_duplicate (flow_consumer);
+ AVStreams::protocolSpec protocols (1);
+ protocols.length (1);
+ protocols [0] = CORBA::string_dup (this->producer_address_);
+ if (!this->ip_multicast_)
+ {
+ this->flow_consumer_->set_protocol_restriction (protocols,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ char * address =
+ this->flow_consumer_->go_to_listen (the_qos,
+ 1,
+ this->flow_producer_.in (),
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ CORBA::Boolean is_met;
+ this->flow_producer_->connect_mcast (the_qos,
+ is_met,
+ address,
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ else
+ {
+ // IP Multicasting.
+ this->flow_consumer_->go_to_listen (the_qos,
+ 1,
+ this->flow_producer_.in (),
+ this->fp_name_.inout (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowConnection::add_consumer");
+ return 0;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (0);
return 1;
}
@@ -2764,9 +3021,69 @@ TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target,
//default constructor.
TAO_FlowEndPoint::TAO_FlowEndPoint (void)
- :related_sep_ (0),
- related_flow_connection_ (0)
+ :lock_ (0)
+{
+}
+
+TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
+ AVStreams::protocolSpec &protocols,
+ const char *format)
+{
+ this->open (flowname,protocols,format);
+}
+
+int
+TAO_FlowEndPoint::open (const char *flowname,
+ AVStreams::protocolSpec &protocols,
+ const char *format)
+{
+ this->flowname_ = flowname;
+ this->format_ = format;
+
+ ACE_DEBUG ((LM_DEBUG,"TAO_FlowEndPoint::open\n"));
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ CORBA::Any flowname_any;
+ flowname_any <<= flowname;
+ this->define_property ("Flow",
+ flowname_any,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->set_format (format,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->protocol_addresses_ = protocols;
+ AVStreams::protocolSpec protocol_spec (protocols.length ());
+ protocol_spec.length (protocols.length ());
+ ACE_DEBUG ((LM_DEBUG,"%N:%l\n"));
+ for (u_int i=0;i<protocols.length ();i++)
+ {
+ CORBA::String_var address = CORBA::string_dup (protocols [i].in ());
+ TAO_Forward_FlowSpec_Entry entry ("","","","",address.in ());
+ protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
+ ACE_DEBUG ((LM_DEBUG,"%s\n",protocol_spec[i].in ()));
+ }
+ this->set_protocol_restriction (protocol_spec,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FlowEndPoint::open");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+ return 0;
+}
+
+
+int
+TAO_FlowEndPoint::set_flowname (const char *flowname)
{
+ this->flowname_ = flowname;
+ return 0;
}
// used by one flowconnection so that multiple connections cant use
@@ -2777,7 +3094,10 @@ TAO_FlowEndPoint::lock (CORBA::Environment &ACE_TRY_ENV)
{
// lock the current flowendpoint
ACE_UNUSED_ARG (ACE_TRY_ENV);
- return 0;
+ if (this->lock_)
+ return 0;
+ this->lock_ = 1;
+ return 1;
}
// unlocks the flowendpoint ,becomes free to be used in another flow.
@@ -2786,6 +3106,7 @@ TAO_FlowEndPoint::unlock (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_UNUSED_ARG (ACE_TRY_ENV);
+ this->lock_ = 0;
}
// The start,stop and destroy are to be handled by the application.
@@ -2793,14 +3114,14 @@ void
TAO_FlowEndPoint::stop (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- ACE_UNUSED_ARG (ACE_TRY_ENV);
+ this->protocol_object_->stop ();
}
void
TAO_FlowEndPoint::start (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- ACE_UNUSED_ARG (ACE_TRY_ENV);
+ this->protocol_object_->start ();
}
void
@@ -2817,7 +3138,7 @@ TAO_FlowEndPoint::related_sep (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_UNUSED_ARG (ACE_TRY_ENV);
- return this->related_sep_;
+ return AVStreams::StreamEndPoint::_duplicate (this->related_sep_);
}
void
@@ -2826,7 +3147,7 @@ TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep,
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_UNUSED_ARG (ACE_TRY_ENV);
- this->related_sep_ = related_sep;
+ this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
}
AVStreams::FlowConnection_ptr
@@ -2834,7 +3155,7 @@ TAO_FlowEndPoint::related_flow_connection (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_UNUSED_ARG (ACE_TRY_ENV);
- return this->related_flow_connection_;
+ return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_);
}
void
@@ -2843,7 +3164,7 @@ TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_UNUSED_ARG (ACE_TRY_ENV);
- this->related_flow_connection_ = related_flow_connection;
+ this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
}
// returns the connected peer for this flow
@@ -2892,6 +3213,7 @@ TAO_FlowEndPoint::set_format (const char * format,
ACE_THROW_SPEC ((CORBA::SystemException,
AVStreams::notSupported))
{
+ this->format_ = format;
ACE_TRY
{
// make this a property so that is_fep_compatible can query this and
@@ -2918,6 +3240,7 @@ TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_set
AVStreams::PropertyException,
AVStreams::streamOpFailed))
{
+ this->dev_params_ = new_settings;
ACE_TRY
{
CORBA::Any DevParams_property;
@@ -2943,12 +3266,29 @@ TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & prot
{
ACE_TRY
{
+ ACE_DEBUG ((LM_DEBUG,"%N:%l\n"));
+ for (u_int i=0;i<protocols.length ();i++)
+ {
+ const char *protocol = (protocols)[i].in ();
+ ACE_DEBUG ((LM_DEBUG,"%s\n",protocol));
+ }
CORBA::Any AvailableProtocols_property;
AvailableProtocols_property <<= protocols;
this->define_property ("AvailableProtocols",
AvailableProtocols_property,
ACE_TRY_ENV);
ACE_TRY_CHECK;
+ AVStreams::protocolSpec *temp_spec;
+ CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ temp_any.in () >>= temp_spec;
+ ACE_DEBUG ((LM_DEBUG,"%N:%l\n"));
+ for (i=0;i<temp_spec->length ();i++)
+ {
+ const char *protocol = (*temp_spec)[i].in ();
+ ACE_DEBUG ((LM_DEBUG,"%s\n",protocol));
+ }
this->protocols_ = protocols;
}
ACE_CATCHANY
@@ -2971,58 +3311,48 @@ TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep,
// check whether the passed flowendpoint is compatible with this flowendpoint.
// should we check for the availableFormats and choose one format.
// get my format value
- CORBA::Any_ptr format_ptr;
- CORBA::String my_format,peer_format;
+ CORBA::Any_var format_ptr;
+ CORBA::String_var my_format,peer_format;
+ CORBA::String temp_format;
format_ptr = this->get_property_value ("Format",
ACE_TRY_ENV);
ACE_TRY_CHECK;
- if (format_ptr != 0)
- *format_ptr >>= my_format;
- else
- // property is not defined
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) TAO_FlowEndPoint::is_fep_compatible"),0);
-
+ format_ptr.in () >>= temp_format;
+ my_format = CORBA::string_dup (temp_format);
// get my peer's format value
-
format_ptr = peer_fep->get_property_value ("Format",
ACE_TRY_ENV);
ACE_TRY_CHECK;
- if (format_ptr != 0)
- *format_ptr >>= peer_format;
- else
- // property is not defined
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) TAO_FlowEndPoint::is_fep_compatible"),0);
-
- if (ACE_OS::strcmp (my_format,peer_format) != 0)
+ format_ptr.in () >>= temp_format;
+ peer_format = CORBA::string_dup (temp_format);
+ if (ACE_OS::strcmp (my_format.in (),
+ peer_format.in ()) != 0)
return 0;
// since formats are same, check for a common protocol
- CORBA::Any* AvailableProtocols_ptr;
- AVStreams::protocolSpec *my_protocol_spec,*peer_protocol_spec;
-
+ CORBA::Any_var AvailableProtocols_ptr;
+ AVStreams::protocolSpec my_protocol_spec,peer_protocol_spec;
+ AVStreams::protocolSpec_ptr temp_protocols;;
AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols",
ACE_TRY_ENV);
ACE_TRY_CHECK;
- if (AvailableProtocols_ptr != 0)
- *AvailableProtocols_ptr >>= my_protocol_spec;
- else
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) TAO_FlowEndPoint::is_fep_compatible"),0);
-
+ AvailableProtocols_ptr.in () >>= temp_protocols;
+ my_protocol_spec = *temp_protocols;
AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols",
ACE_TRY_ENV);
ACE_TRY_CHECK;
- if (AvailableProtocols_ptr != 0)
- *AvailableProtocols_ptr >>= peer_protocol_spec;
+ AvailableProtocols_ptr.in () >>= temp_protocols;
+ peer_protocol_spec = *temp_protocols;
int protocol_match = 0;
- for (u_int i=0;i<(*my_protocol_spec).length ();i++)
+ for (u_int i=0;i<my_protocol_spec.length ();i++)
{
CORBA::String_var my_protocol_string;
- for (u_int j=0;j<(*peer_protocol_spec).length ();j++)
+ for (u_int j=0;j<peer_protocol_spec.length ();j++)
{
CORBA::String_var peer_protocol_string;
- my_protocol_string = CORBA::string_dup ((*my_protocol_spec)[i]);
- peer_protocol_string = CORBA::string_dup ((*peer_protocol_spec)[j]);
+ my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
+ peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
if (ACE_OS::strcmp (my_protocol_string,peer_protocol_string) == 0)
{
protocol_match = 1;
@@ -3073,7 +3403,7 @@ TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr /* the_fc */,
char *
TAO_FlowEndPoint::go_to_listen (AVStreams::QoS & the_qos,
CORBA::Boolean is_mcast,
- AVStreams::FlowProducer_ptr peer,
+ AVStreams::FlowProducer_ptr peer_fep,
char *& flowProtocol,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
@@ -3081,19 +3411,76 @@ TAO_FlowEndPoint::go_to_listen (AVStreams::QoS & the_qos,
AVStreams::FPError,
AVStreams::QoSRequestFailed))
{
- return this->handle_go_to_listen (the_qos,is_mcast,peer,flowProtocol,ACE_TRY_ENV);
-}
+ AVStreams::protocolSpec my_protocol_spec,peer_protocol_spec;
+ AVStreams::protocolSpec_ptr temp_protocols;
+ CORBA::Any_var AvailableProtocols_ptr =
+ peer_fep->get_property_value ("AvailableProtocols",
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ AvailableProtocols_ptr.in () >>= temp_protocols;
+ peer_protocol_spec = *temp_protocols;
+ AvailableProtocols_ptr =
+ this->get_property_value ("AvailableProtocols",
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ AvailableProtocols_ptr.in () >>= temp_protocols;
+ my_protocol_spec = *temp_protocols;
+ int protocol_match = 0;
+ CORBA::String_var listen_protocol;
+ for (u_int i=0;i<my_protocol_spec.length ();i++)
+ {
+ CORBA::String_var my_protocol_string;
+ for (u_int j=0;j<peer_protocol_spec.length ();j++)
+ {
+ CORBA::String_var peer_protocol_string;
+ my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
+ peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
+ if (ACE_OS::strcmp (my_protocol_string,peer_protocol_string) == 0)
+ {
+ listen_protocol = my_protocol_string;
+ protocol_match = 1;
+ break;
+ }
+ }
+ if (protocol_match)
+ break;
+ }
+ if (!protocol_match)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"),0);
-char *
-TAO_FlowEndPoint::handle_go_to_listen (AVStreams::QoS & /* the_qos */,
- CORBA::Boolean /* is_mcast */,
- AVStreams::FlowProducer_ptr /* peer */,
- char *& /* flowProtocol */,
- CORBA::Environment &/* ACE_TRY_ENV */)
-{
+ for (u_int j=0;j<this->protocol_addresses_.length ();j++)
+ if (ACE_OS::strncmp (this->protocol_addresses_ [i],listen_protocol,ACE_OS::strlen (listen_protocol)) == 0)
+ {
+ // Now listen on that protocol.
+ TAO_Forward_FlowSpec_Entry *entry;
+ ACE_NEW_RETURN (entry,
+ TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
+ CORBA::string_dup (""),
+ this->format_.in (),
+ flowProtocol,
+ this->protocol_addresses_ [i]),
+ 0);
+
+ TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
+ TAO_AV_FlowSpecSet flow_spec_set;
+ flow_spec_set.insert (entry);
+ int result = acceptor_registry->open (this,
+ TAO_AV_CORE::instance (),
+ flow_spec_set);
+ if (result < 0)
+ return 0;
+ char *listen_address = entry->get_local_addr_str ();
+ char *address;
+ ACE_NEW_RETURN (address,
+ char [BUFSIZ],
+ 0);
+ ACE_OS::sprintf (address,"%s=%s",listen_protocol.in (),listen_address);
+ return address;
+ }
return 0;
}
+
CORBA::Boolean
TAO_FlowEndPoint::connect_to_peer (AVStreams::QoS & the_qos,
const char * address,
@@ -3104,25 +3491,34 @@ TAO_FlowEndPoint::connect_to_peer (AVStreams::QoS & the_qos,
AVStreams::FPError,
AVStreams::QoSRequestFailed))
{
- // Right now since the A/V framework doesnt bother about the
- // protocols we leave it to the application to handle the connection
- // to its peer. When A/V Streams implements common protocol
- // interaction like UDP and TCP this will be handled by the
- // framework.
-
- return this->handle_connect_to_peer (the_qos,address,use_flow_protocol,ACE_TRY_ENV);
+ TAO_Forward_FlowSpec_Entry *entry;
+ ACE_NEW_RETURN (entry,
+ TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
+ CORBA::string_dup (""),
+ this->format_.in (),
+ CORBA::string_dup (""),
+ address),
+ 0);
+ TAO_AV_FlowSpecSet flow_spec_set;
+ flow_spec_set.insert (entry);
+ TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
+ int result = connector_registry->open (this,
+ TAO_AV_CORE::instance (),
+ flow_spec_set);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_FlowEndPoint::connector_registry::open failed\n"),0);
+ return 1;
}
-CORBA::Boolean
-TAO_FlowEndPoint::handle_connect_to_peer (AVStreams::QoS & /* the_qos */,
- const char * /* address */,
- const char * /* use_flow_protocol */,
- CORBA::Environment &/* ACE_TRY_ENV */)
+int
+TAO_FlowEndPoint::set_protocol_object (const char *flowname,
+ TAO_AV_Protocol_Object *object)
{
+ ACE_DEBUG ((LM_DEBUG,"TAO_FlowEndPoint::set_protocol_object\n"));
+ this->protocol_object_ = object;
return 0;
}
-
// ------------------------------------------------------------
// TAO_FlowProducer class
// ------------------------------------------------------------
@@ -3132,6 +3528,13 @@ TAO_FlowProducer::TAO_FlowProducer (void)
{
}
+TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
+ AVStreams::protocolSpec protocols,
+ const char *format)
+{
+ this->open (flowname,protocols,format);
+}
+
// multicast is currently not supported
char *
TAO_FlowProducer::connect_mcast (AVStreams::QoS & /* the_qos */,
@@ -3199,6 +3602,13 @@ TAO_FlowConsumer::TAO_FlowConsumer (void)
{
}
+TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
+ AVStreams::protocolSpec protocols,
+ const char *format)
+{
+ this->open (flowname,protocols,format);
+}
+
// ------------------------------------------------------------
// TAO_FDev
// ------------------------------------------------------------
@@ -3208,6 +3618,27 @@ TAO_FDev::TAO_FDev (void)
{
}
+TAO_FDev::TAO_FDev (const char *flowname)
+ :flowname_ (flowname)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ CORBA::Any flowname_any;
+ flowname_any <<= flowname;
+ this->define_property ("Flow",
+ flowname_any,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_FDev::TAO_FDev");
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+}
+
AVStreams::FlowProducer_ptr
TAO_FDev::create_producer (AVStreams::FlowConnection_ptr the_requester,
AVStreams::QoS & the_qos,
@@ -3384,9 +3815,16 @@ TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
TAO_Tokenizer::~TAO_Tokenizer ()
{
- // CORBA::string_free (this->string_);
+// ACE_Array_Iterator<char*> iterator (this->token_array_);
+// char **entry = 0;
+// for (; iterator.next (entry) != 0; iterator.advance ())
+// {
+// ACE_DEBUG ((LM_DEBUG,"%s\n",*entry));
+// CORBA::string_free (*entry);
+// }
}
+
int
TAO_Tokenizer::parse (const char *string,char delimiter)
{
@@ -3447,7 +3885,7 @@ char*
TAO_Tokenizer::token (void)
{
if (count_ < num_tokens_)
- return this->token_array_[this->count_++];
+ return CORBA::string_dup (this->token_array_[this->count_++]);
else
return 0;
}
@@ -3463,7 +3901,7 @@ TAO_Tokenizer::operator [] (size_t index) const
{
if (index >= this->num_tokens_)
return 0;
- return this->token_array_[index];
+ return CORBA::string_dup (this->token_array_[index]);
}
//------------------------------------------------------------
@@ -3491,10 +3929,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (void)
// constructor.
TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname,
- char *direction,
- char *format_name,
- char *flow_protocol,
- char *carrier_protocol,
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *carrier_protocol,
ACE_Addr *address)
:address_ (address),
address_str_ (0),
@@ -3512,7 +3950,35 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname,
protocol_object_ (0)
{
this->set_protocol ();
- this->set_direction (direction);
+ this->set_direction (this->direction_str_);
+}
+
+TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname,
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *address)
+ :address_ (0),
+ address_str_ (CORBA::string_dup (address)),
+ format_ (CORBA::string_dup (format_name)),
+ direction_str_ (CORBA::string_dup (direction)),
+ flowname_ (CORBA::string_dup (flowname)),
+ carrier_protocol_ (0),
+ flow_protocol_ (CORBA::string_dup (flow_protocol)),
+ use_flow_protocol_ (0),
+ entry_ (0),
+ peer_addr_ (0),
+ local_addr_ (0),
+ transport_ (0),
+ handler_ (0),
+ protocol_object_ (0)
+{
+ ACE_CString cstring(this->address_str_,0,0);
+ int colon_pos = cstring.find (':');
+ if (colon_pos != cstring.npos)
+ cstring[colon_pos] = ';';
+ this->address_str_ = cstring.rep ();
+ this->parse_address (this->address_str_);
}
// Destructor.
@@ -3522,7 +3988,6 @@ TAO_FlowSpec_Entry::~TAO_FlowSpec_Entry (void)
CORBA::string_free (this->direction_str_);
CORBA::string_free (this->flowname_);
CORBA::string_free (this->carrier_protocol_);
- CORBA::string_free (this->flowname_);
}
int
@@ -3726,6 +4191,31 @@ TAO_FlowSpec_Entry::get_local_addr (void)
return this->local_addr_;
}
+char *
+TAO_FlowSpec_Entry::get_local_addr_str (void)
+{
+ switch (this->local_addr_->get_type ())
+ {
+ case AF_INET:
+ {
+ char *buf;
+ ACE_NEW_RETURN (buf,
+ char [BUFSIZ],
+ 0);
+ ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr *,this->local_addr_);
+ inet_addr->addr_to_string (buf,BUFSIZ);
+ ACE_CString cstring (buf,0,0);
+ int colon_pos = cstring.find (':');
+ if (colon_pos != cstring.npos)
+ cstring[colon_pos] = ';';
+ return cstring.rep ();
+ }
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,"Address family not supported"),0);
+ }
+}
+
TAO_AV_Transport*
TAO_FlowSpec_Entry::transport (void)
{
@@ -3773,10 +4263,10 @@ TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (void)
// constructor to construct the entry from the arguments.
TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (const char *flowname,
- char *direction,
- char *format_name ,
- char *flow_protocol ,
- char *carrier_protocol ,
+ const char *direction,
+ const char *format_name ,
+ const char *flow_protocol ,
+ const char *carrier_protocol ,
ACE_Addr *address )
:TAO_FlowSpec_Entry (flowname,
direction,
@@ -3787,6 +4277,20 @@ TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (const char *flowname,
{
}
+// constructor to construct the entry from the arguments.
+TAO_Forward_FlowSpec_Entry::TAO_Forward_FlowSpec_Entry (const char *flowname,
+ const char *direction,
+ const char *format_name ,
+ const char *flow_protocol ,
+ const char *address )
+ :TAO_FlowSpec_Entry (flowname,
+ direction,
+ format_name,
+ flow_protocol,
+ address)
+{
+}
+
int
TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry)
{
@@ -3876,11 +4380,11 @@ TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (void)
// constructor to construct an entry from the arguments.
TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname,
- char *direction,
- char *format_name ,
- char *flow_protocol ,
- char *carrier_protocol ,
- ACE_Addr *address )
+ const char *direction,
+ const char *format_name ,
+ const char *flow_protocol ,
+ const char *carrier_protocol,
+ ACE_Addr *address)
:TAO_FlowSpec_Entry (flowname,
direction,
format_name,
@@ -3890,6 +4394,20 @@ TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname,
{
}
+// constructor to construct an entry from the arguments.
+TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname,
+ const char *direction,
+ const char *format_name ,
+ const char *flow_protocol ,
+ const char *address)
+ :TAO_FlowSpec_Entry (flowname,
+ direction,
+ format_name,
+ flow_protocol,
+ address)
+{
+}
+
int
TAO_Reverse_FlowSpec_Entry::parse (const char *flowSpec_entry)
{
@@ -3988,7 +4506,7 @@ TAO_AV_QoS::set (AVStreams::streamQoS &stream_qos)
for (u_int j=0;j<this->stream_qos_.length ();j++)
{
- TAO_String_Hash_Key qos_key (this->stream_qos_[j].QoSType);
+ TAO_String_Hash_Key qos_key (CORBA::string_dup (this->stream_qos_[j].QoSType));
int result = this->qos_map_.bind (qos_key,this->stream_qos_[j]);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"qos_map::bind failed\n"),-1);
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
index 5777820f257..f4a6263afc0 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
+++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
@@ -195,6 +195,9 @@ public:
TAO_Basic_StreamCtrl (void);
// Default Constructor
+ virtual ~TAO_Basic_StreamCtrl (void);
+ // Destructor.
+
virtual void stop (const AVStreams::flowSpec &the_spec,
CORBA::Environment &env = CORBA::Environment::default_environment ())
ACE_THROW_SPEC ((CORBA::SystemException,
@@ -258,20 +261,19 @@ public:
// Not implemented in the light profile, will raise the notsupported
// exception
- virtual ~TAO_Basic_StreamCtrl (void);
- // Destructor
-
protected:
- AVStreams::VDev_ptr vdev_a_;
- AVStreams::VDev_ptr vdev_b_;
+ AVStreams::VDev_var vdev_a_;
+ AVStreams::VDev_var vdev_b_;
// The Virtual Devices for this stream
- AVStreams::StreamEndPoint_A_ptr sep_a_;
- AVStreams::StreamEndPoint_B_ptr sep_b_;
+ AVStreams::StreamEndPoint_A_var sep_a_;
+ AVStreams::StreamEndPoint_B_var sep_b_;
// The Endpoints for this stream
typedef ACE_Hash_Map_Manager <TAO_String_Hash_Key,AVStreams::FlowConnection_ptr,ACE_Null_Mutex> FlowConnection_Map;
+ typedef ACE_Hash_Map_Iterator <TAO_String_Hash_Key,AVStreams::FlowConnection_ptr,ACE_Null_Mutex> FlowConnection_Map_Iterator;
+ typedef ACE_Hash_Map_Entry <TAO_String_Hash_Key,AVStreams::FlowConnection_ptr> FlowConnection_Map_Entry;
FlowConnection_Map flow_connection_map_;
// Hash table for the flow names and its corresponding flowconnection object reference.
AVStreams::FlowConnection_seq flowConnections_;
@@ -339,6 +341,9 @@ public:
TAO_StreamCtrl (void);
// Default Constructor
+ virtual ~TAO_StreamCtrl (void);
+ // virtual destructor.
+
virtual CORBA::Boolean bind_devs (AVStreams::MMDevice_ptr a_party,
AVStreams::MMDevice_ptr b_party,
AVStreams::streamQoS &the_qos,
@@ -402,23 +407,24 @@ public:
// Changes the QoS associated with the stream
// Empty the_spec means apply operation to all flows
- virtual ~TAO_StreamCtrl (void);
- // Destructor.
-
protected:
struct MMDevice_Map_Entry
{
- AVStreams::StreamEndPoint_ptr sep_;
- AVStreams::VDev_ptr vdev_;
+ AVStreams::StreamEndPoint_var sep_;
+ AVStreams::VDev_var vdev_;
AVStreams::flowSpec flowspec_;
AVStreams::streamQoS qos_;
};
- ACE_Hash_Map_Manager <MMDevice_Map_Hash_Key,MMDevice_Map_Entry,ACE_Null_Mutex> mmdevice_a_map_;
- ACE_Hash_Map_Manager <MMDevice_Map_Hash_Key,MMDevice_Map_Entry,ACE_Null_Mutex> mmdevice_b_map_;
+ typedef ACE_Hash_Map_Manager <MMDevice_Map_Hash_Key,MMDevice_Map_Entry,ACE_Null_Mutex> MMDevice_Map;
+ typedef ACE_Hash_Map_Iterator <MMDevice_Map_Hash_Key,MMDevice_Map_Entry,ACE_Null_Mutex> MMDevice_Map_Iterator;
+
+ MMDevice_Map mmdevice_a_map_;
+ MMDevice_Map mmdevice_b_map_;
TAO_MCastConfigIf *mcastconfigif_;
- AVStreams::MCastConfigIf_ptr mcastconfigif_ptr_;
+ AVStreams::MCastConfigIf_var mcastconfigif_ptr_;
+ AVStreams::StreamCtrl_var streamctrl_;
};
class TAO_ORBSVCS_Export TAO_MCastConfigIf
@@ -430,7 +436,7 @@ public:
struct Peer_Info
{
- AVStreams::VDev_ptr peer_;
+ AVStreams::VDev_var peer_;
AVStreams::streamQoS qos_;
AVStreams::flowSpec flow_spec_;
};
@@ -518,14 +524,14 @@ public:
CORBA::Environment &env = CORBA::Environment::default_environment ());
// Application needs to define this
- virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec) = 0;
+ virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec);
// Application needs to define this
- virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec) = 0;
+ virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec);
// Application needs to define this
virtual CORBA::Boolean handle_connection_requested (AVStreams::flowSpec &the_spec,
- CORBA::Environment &env = CORBA::Environment::default_environment ()) = 0;
+ CORBA::Environment &env = CORBA::Environment::default_environment ());
// Application needs to define this
// virtual int make_tcp_flow_handler (TAO_AV_TCP_Flow_Handler *&handler);
@@ -673,15 +679,15 @@ public:
virtual ~TAO_StreamEndPoint (void);
// Destructor
- virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec);
- // Defined for backward compatibility.
+// virtual CORBA::Boolean handle_preconnect (AVStreams::flowSpec &the_spec);
+// // Defined for backward compatibility.
- virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec);
- // Defined for backward compatibility.
+// virtual CORBA::Boolean handle_postconnect (AVStreams::flowSpec &the_spec);
+// // Defined for backward compatibility.
- virtual CORBA::Boolean handle_connection_requested (AVStreams::flowSpec &the_spec,
- CORBA::Environment &env = CORBA::Environment::default_environment ());
- // Defined for backward compatibility.
+// virtual CORBA::Boolean handle_connection_requested (AVStreams::flowSpec &the_spec,
+// CORBA::Environment &env = CORBA::Environment::default_environment ());
+// // Defined for backward compatibility.
CORBA::Boolean multiconnect (AVStreams::streamQoS &the_qos,
AVStreams::flowSpec &the_spec,
@@ -707,7 +713,7 @@ protected:
CORBA::Long source_id_;
// source id used for multicast.
- AVStreams::Negotiator_ptr negotiator_;
+ AVStreams::Negotiator_var negotiator_;
// our local negotiator for QoS.
AVStreams::protocolSpec protocols_;
@@ -724,7 +730,7 @@ protected:
ACE_Hash_Map_Manager <TAO_String_Hash_Key, TAO_AV_UDP_MCast_Flow_Handler *,ACE_Null_Mutex> dgram_mcast_handler_map_;
TAO_AV_FlowSpecSet forward_flow_spec_set;
TAO_AV_FlowSpecSet reverse_flow_spec_set;
- AVStreams::StreamEndPoint_ptr peer_sep_;
+ AVStreams::StreamEndPoint_var peer_sep_;
AVStreams::SFPStatus *sfp_status_;
};
@@ -1017,6 +1023,8 @@ protected:
// current flow number used for system generation of flow names.
typedef ACE_Hash_Map_Manager <TAO_String_Hash_Key,AVStreams::FDev_ptr,ACE_Null_Mutex> FDev_Map;
+ typedef ACE_Hash_Map_Iterator <TAO_String_Hash_Key,AVStreams::FDev_ptr,ACE_Null_Mutex> FDev_Map_Iterator;
+ typedef ACE_Hash_Map_Entry <TAO_String_Hash_Key,AVStreams::FDev_ptr> FDev_Map_Entry;
FDev_Map fdev_map_;
// hash table for the flownames and its corresponding flowEndpoint
@@ -1024,6 +1032,8 @@ protected:
AVStreams::flowSpec flows_;
// sequence of supported flow names.
+
+ TAO_StreamCtrl *stream_ctrl_;
};
class TAO_FlowConsumer;
@@ -1122,18 +1132,24 @@ public:
// drops a flow endpoint from the flow.
protected:
- AVStreams::FlowProducer *producer_;
+ AVStreams::FlowProducer_var flow_producer_;
// The producer of this flow.
- AVStreams::FlowConsumer *consumer_;
+ AVStreams::FlowConsumer_var flow_consumer_;
// The consumer of this flow
- char * fp_name_;
- // name of the flow protocol to be used.
+ CORBA::String_var fp_name_;
+ CORBA::Any fp_settings_;
+ CORBA::String_var producer_address_;
+ // The multicast address returned by the producer.
+
+ int ip_multicast_;
+ // IP Multicasting is used.
};
class TAO_ORBSVCS_Export TAO_FlowEndPoint :
public virtual POA_AVStreams::FlowEndPoint,
public virtual TAO_PropertySet,
- public virtual PortableServer::RefCountServantBase
+ public virtual PortableServer::RefCountServantBase,
+ public virtual TAO_Base_StreamEndPoint
{
// = DESCRIPTION
// This class is used per flow e.g video flow and an audio flow
@@ -1144,6 +1160,19 @@ public:
TAO_FlowEndPoint (void);
//default constructor.
+ TAO_FlowEndPoint (const char *flowname,
+ AVStreams::protocolSpec &protocols,
+ const char *format);
+
+ int open (const char *flowname,
+ AVStreams::protocolSpec &protocols,
+ const char *format);
+
+ int set_flowname (const char *flowname);
+
+ virtual int set_protocol_object (const char *flowname,
+ TAO_AV_Protocol_Object *object);
+
virtual CORBA::Boolean lock (CORBA::Environment &env =
CORBA::Environment::default_environment ())
ACE_THROW_SPEC ((CORBA::SystemException));
@@ -1264,13 +1293,6 @@ public:
AVStreams::QoSRequestFailed));
// connect to the peer endpoint.
- virtual CORBA::Boolean handle_connect_to_peer (AVStreams::QoS & the_qos,
- const char * address,
- const char * use_flow_protocol,
- CORBA::Environment &env =
- CORBA::Environment::default_environment ());
- // hook method to be overridden by the application to handle the connection request.
-
virtual char * go_to_listen (AVStreams::QoS & the_qos,
CORBA::Boolean is_mcast,
AVStreams::FlowProducer_ptr peer,
@@ -1284,28 +1306,32 @@ public:
// listen request from the peer.
- virtual char * handle_go_to_listen (AVStreams::QoS & the_qos,
- CORBA::Boolean is_mcast,
- AVStreams::FlowProducer_ptr peer,
- char *& flowProtocol,
- CORBA::Environment &env = CORBA::Environment::default_environment ());
- // applications should override this method.
-
protected:
- AVStreams::StreamEndPoint_ptr related_sep_;
+ AVStreams::StreamEndPoint_var related_sep_;
// The related streamendpoint.
- AVStreams::FlowConnection_ptr related_flow_connection_;
+ AVStreams::FlowConnection_var related_flow_connection_;
// The related flow connection reference
- AVStreams::FlowEndPoint_ptr peer_fep_;
+ AVStreams::FlowEndPoint_var peer_fep_;
// The peer flowendpoint reference.
AVStreams::protocolSpec protocols_;
// Available protocols for this flowendpoint.
- AVStreams::MCastConfigIf_ptr mcast_peer_;
+ AVStreams::protocolSpec protocol_addresses_;
+ // Address information for the protocols.
+
+ AVStreams::MCastConfigIf_var mcast_peer_;
// The multicast peer endpoint.
+
+ CORBA::Boolean lock_;
+ // Lock.
+
+ CORBA::String_var format_;
+ CORBA::String_var flowname_;
+ CosPropertyService::Properties dev_params_;
+ TAO_AV_Protocol_Object *protocol_object_;
};
class TAO_ORBSVCS_Export TAO_FlowProducer:
@@ -1317,6 +1343,10 @@ public:
TAO_FlowProducer (void);
// default constructor
+ TAO_FlowProducer (const char *flowname,
+ AVStreams::protocolSpec protocols,
+ const char *format);
+
virtual char * connect_mcast (AVStreams::QoS & the_qos,
CORBA::Boolean_out is_met,
const char * address,
@@ -1360,7 +1390,9 @@ public:
TAO_FlowConsumer (void);
// default constructor.
-
+ TAO_FlowConsumer (const char *flowname,
+ AVStreams::protocolSpec protocols,
+ const char *format);
};
class TAO_ORBSVCS_Export TAO_FDev :
@@ -1372,6 +1404,9 @@ public:
TAO_FDev (void);
// default constructor
+ TAO_FDev (const char *flowname);
+ // constructor taking a flowname.
+
AVStreams::FlowProducer_ptr create_producer (AVStreams::FlowConnection_ptr the_requester,
AVStreams::QoS & the_qos,
CORBA::Boolean_out met_qos,
@@ -1443,6 +1478,7 @@ protected:
typedef ACE_DLList_Iterator <TAO_FlowProducer> PRODUCER_LIST_ITERATOR;
ACE_DLList <TAO_FlowConsumer> consumer_list_;
typedef ACE_DLList_Iterator <TAO_FlowConsumer> CONSUMER_LIST_ITERATOR;
+ CORBA::String_var flowname_;
};
class TAO_ORBSVCS_Export TAO_MediaControl
@@ -1542,59 +1578,69 @@ public:
// default constructor.
TAO_FlowSpec_Entry (const char *flowname,
- char *direction,
- char *format_name,
- char *flow_protocol,
- char *carrier_protocol,
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *carrier_protocol,
ACE_Addr *address);
// constructor to construct an entry from the arguments.
+ TAO_FlowSpec_Entry (const char *flowname,
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *address);
+
virtual int parse (const char* flowSpec_entry) = 0;
// construct the entry from a string specified by the flowSpec grammar.
virtual ~TAO_FlowSpec_Entry (void);
// virtual destructor.
- virtual int direction (void);
+ int direction (void);
// accessor to the direction.
- virtual char * direction_str (void);
+ char * direction_str (void);
// string version of direction .
- virtual char *flow_protocol_str (void);
+ char *flow_protocol_str (void);
// accesor to the flow protocol string.
- virtual ACE_Addr *address (void);
+ ACE_Addr *address (void);
// Address of the carrier protocol.
- virtual char * address_str (void);
+ char * address_str (void);
// Address in string format i. hostname:port.
- virtual TAO_AV_Core::Protocol carrier_protocol (void);
+ TAO_AV_Core::Protocol carrier_protocol (void);
// carrier protocol i.e TCP,UDP,RTP/UDP.
- virtual char * carrier_protocol_str (void);
+ char * carrier_protocol_str (void);
// string version of carrier protocol.
- virtual char *format (void);
+ char *format (void);
// format to be used for this flow.
- virtual const char *flowname (void);
+ const char *flowname (void);
// name of this flow.
virtual char *entry_to_string (void) = 0;
// converts the entry to a string.
- virtual int set_peer_addr (ACE_Addr *peer_addr);
- virtual ACE_Addr *get_peer_addr (void);
- virtual int set_local_addr (ACE_Addr *peer_addr);
- virtual ACE_Addr *get_local_addr (void);
- virtual TAO_AV_Transport *transport (void);
- virtual void transport (TAO_AV_Transport *transport);
- virtual TAO_AV_Flow_Handler* handler (void);
- virtual void handler (TAO_AV_Flow_Handler *handler);
- virtual TAO_AV_Protocol_Object* protocol_object (void);
- virtual void protocol_object (TAO_AV_Protocol_Object *object);
+ int set_peer_addr (ACE_Addr *peer_addr);
+ ACE_Addr *get_peer_addr (void);
+ int set_local_addr (ACE_Addr *peer_addr);
+ ACE_Addr *get_local_addr (void);
+ char *get_local_addr_str (void);
+ TAO_AV_Transport *transport (void);
+ void transport (TAO_AV_Transport *transport);
+ TAO_AV_Flow_Handler* handler (void);
+ void handler (TAO_AV_Flow_Handler *handler);
+ TAO_AV_Protocol_Object* protocol_object (void);
+ void protocol_object (TAO_AV_Protocol_Object *object);
+
+ int parse_address (char *format_string);
+ // sets the address for this flow.
protected:
int parse_flow_protocol_string (char *flow_options_string);
// parses the flow protocol string with tokens separated by :
@@ -1602,8 +1648,6 @@ protected:
int set_direction (char *direction_string);
// sets the direction flag.
- int parse_address (char *format_string);
- // sets the address for this flow.
int set_protocol (void);
// sets the protocol_ enum from the carrier_protocol_ string.
@@ -1660,13 +1704,19 @@ public:
// default constructor.
TAO_Forward_FlowSpec_Entry (const char *flowname,
- char *direction = "",
- char *format_name = "",
- char *flow_protocol = "",
- char *carrier_protocol = "",
- ACE_Addr *address = 0);
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *carrier_protocol,
+ ACE_Addr *address);
// constructor to construct an entry from the arguments.
+ TAO_Forward_FlowSpec_Entry (const char *flowname,
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *address);
+
virtual char *entry_to_string (void);
// converts the entry to a string.
@@ -1688,13 +1738,20 @@ public:
// default constructor.
TAO_Reverse_FlowSpec_Entry (const char *flowname,
- char *direction = "",
- char *format_name = "",
- char *flow_protocol = "",
- char *carrier_protocol = "",
- ACE_Addr *address = 0);
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *carrier_protocol,
+ ACE_Addr *address);
// constructor to construct an entry from the arguments.
+ TAO_Reverse_FlowSpec_Entry (const char *flowname,
+ const char *direction,
+ const char *format_name,
+ const char *flow_protocol,
+ const char *address);
+ // Takes the address in protocol=endpoint form.
+
virtual char *entry_to_string (void);
// converts the entry to a string.