summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
diff options
context:
space:
mode:
authornaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-31 23:17:58 +0000
committernaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-31 23:17:58 +0000
commitbf7c6eed6bfff6ffbf4f28809a0b9031e492e123 (patch)
treed65e282fbf3232635b7f6e8e4f1628dab8b39a94 /TAO/orbsvcs/orbsvcs/AV/Transport.cpp
parentb6f7377616988bc10dd543cd970120843ae2c65b (diff)
downloadATCD-bf7c6eed6bfff6ffbf4f28809a0b9031e492e123.tar.gz
Tue Aug 31 18:07:35 1999 Nagarajan Surendran <naga@cs.wustl.edu>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/AV/Transport.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Transport.cpp1249
1 files changed, 1249 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
new file mode 100644
index 00000000000..eea7446020b
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
@@ -0,0 +1,1249 @@
+// $Id$
+#include "tao/TAO.h"
+#include "AVStreams_i.h"
+#include "sfp.h"
+#include "MCast.h"
+#include "Nil.h"
+#include "RTP.h"
+#include "RTCP.h"
+#include "UDP.h"
+#include "TCP.h"
+#include "FlowSpec_Entry.h"
+
+#if !defined (__ACE_INLINE__)
+#include "Transport.i"
+#endif /* __ACE_INLINE__ */
+
+//------------------------------------------------------------
+// TAO_AV_Core
+//------------------------------------------------------------
+
+TAO_AV_Core::TAO_AV_Core (void)
+ :connector_registry_ (0),
+ acceptor_registry_ (0)
+{
+}
+
+TAO_AV_Core::~TAO_AV_Core (void)
+{
+ delete this->connector_registry_;
+ delete this->acceptor_registry_;
+}
+
+int
+TAO_AV_Core::init (int &argc,
+ char *argv [],
+ CORBA::Environment &env)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init "));
+ // Init the orb manager.
+ int result = this->orb_manager_.init (argc,argv,env);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"orb_manager::init"),result);
+ ACE_NEW_RETURN (this->connector_registry_,
+ TAO_AV_Connector_Registry,
+ -1);
+ ACE_NEW_RETURN (this->acceptor_registry_,
+ TAO_AV_Acceptor_Registry,
+ -1);
+ this->orb_ = this->orb_manager_.orb ();
+ this->reactor (this->orb_->orb_core ()->reactor ());
+ this->init_transport_factories ();
+ this->init_flow_protocol_factories ();
+ return 0;
+}
+
+int
+TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_FlowSpecSet &flow_spec_set,
+ TAO_AV_Core::EndPoint direction,
+ AVStreams::flowSpec &flow_spec)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows\n"));
+ TAO_AV_FlowSpecSet address_flow_set;
+ TAO_AV_FlowSpecSet flow_set;
+ TAO_AV_FlowSpecSetItor end = flow_spec_set.end ();
+ for (TAO_AV_FlowSpecSetItor start = flow_spec_set.begin ();
+ start != end; ++start)
+ {
+ TAO_FlowSpec_Entry *entry = (*start);
+ switch (direction)
+ {
+ case TAO_AV_Core::TAO_AV_ENDPOINT_B:
+ {
+ switch (entry->direction ())
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
+ entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
+ break;
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
+ entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
+ break;
+ }
+ }
+ default:
+ break;
+ }
+ ACE_Addr *address = entry->address ();
+ if (address != 0)
+ {
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"address given for flow %s",entry->flowname ()));
+ address_flow_set.insert (entry);
+ }
+ else
+ flow_set.insert (entry);
+ }
+ int result = -1;
+ switch (direction)
+ {
+ case TAO_AV_Core::TAO_AV_ENDPOINT_A:
+ if (address_flow_set.size () > 0)
+ {
+ result = this->acceptor_registry_->open (endpoint,
+ this,
+ address_flow_set);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_forward_flows::acceptor_registry::open failed\n"),-1);
+ TAO_AV_FlowSpecSetItor end = address_flow_set.end ();
+ for (TAO_AV_FlowSpecSetItor start = address_flow_set.begin ();
+ start != end; ++start)
+ {
+ TAO_FlowSpec_Entry *entry = (*start);
+ switch (entry->direction ())
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
+ {
+ if (entry->handler () != 0)
+ {
+ // For IN flows on the A side we should remove the handlers from the reactor.
+ ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
+ result = event_handler->reactor ()->remove_handler (event_handler,
+ ACE_Event_Handler::READ_MASK);
+ if (result < 0)
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n"));
+ }
+ }
+ default:
+ break;
+ }
+ // Now if the address_set has been changed due to the addition of a control entry we should
+ // add that to the flow_spec_set also.
+ if (flow_spec_set.find (entry) < 0)
+ {
+ // entry doesn't exist so add it.
+ flow_spec_set.insert (entry);
+// size_t len = flow_spec.length ();
+// flow_spec.length (len+1);
+// flow_spec [len] = entry->entry_to_string ();
+ }
+ }
+ }
+ break;
+ case TAO_AV_Core::TAO_AV_ENDPOINT_B:
+ {
+ if (address_flow_set.size () > 0)
+ {
+ result = this->connector_registry_->open (endpoint,
+ this,
+ address_flow_set);
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_Forward_flows: connector_registry open failed\n"),-1);
+ TAO_AV_FlowSpecSetItor end = address_flow_set.end ();
+ for (TAO_AV_FlowSpecSetItor start = address_flow_set.begin ();
+ start != end; ++start)
+ {
+ TAO_FlowSpec_Entry *entry = (*start);
+ switch (entry->direction ())
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
+ {
+ if (entry->handler () != 0)
+ {
+ // @@Naga: This wont be called in the case of Full Profile.
+ // For IN flows on the A side we should remove the handlers from the reactor.
+ ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
+ result = event_handler->reactor ()->remove_handler (event_handler,
+ ACE_Event_Handler::READ_MASK);
+ if (result < 0)
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n"));
+ }
+ }
+ default:
+ break;
+ }
+ // Now if the address_set has been changed due to the addition of a control entry we should
+ // add that to the flow_spec_set also.
+ if (flow_spec_set.find (entry) < 0)
+ {
+ // entry doesn't exist so add it.
+ flow_spec_set.insert (entry);
+ }
+ }
+ }
+ if (flow_set.size () > 0)
+ {
+ result = this->acceptor_registry_->open (endpoint,
+ this,
+ flow_set);
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_Forward_flows: Acceptor_registry open failed\n"),-1);
+ TAO_AV_FlowSpecSetItor end = address_flow_set.end ();
+ for (TAO_AV_FlowSpecSetItor start = address_flow_set.begin ();
+ start != end; ++start)
+ {
+ TAO_FlowSpec_Entry *entry = (*start);
+ switch (entry->direction ())
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
+ {
+ if (entry->handler () != 0)
+ {
+ // For IN flows on the A side we should remove the handlers from the reactor.
+ ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
+ result = event_handler->reactor ()->remove_handler (event_handler,
+ ACE_Event_Handler::READ_MASK);
+ if (result < 0)
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n"));
+ }
+ }
+ default:
+ break;
+ }
+ // Now if the address_set has been changed due to the addition of a control entry we should
+ // add that to the flow_spec_set also.
+ if (flow_spec_set.find (entry) < 0)
+ {
+ // entry doesn't exist so add it.
+ flow_spec_set.insert (entry);
+ }
+ }
+ }
+
+ AVStreams::flowSpec new_flowspec (flow_spec_set.size ());
+ int i=0;
+ TAO_AV_FlowSpecSetItor connect_end = address_flow_set.end ();
+ TAO_AV_FlowSpecSetItor connect = address_flow_set.begin ();
+ for (;connect != connect_end; ++connect)
+ {
+ ACE_Addr *local_addr;
+ local_addr = (*connect)->get_local_addr ();
+ if (result == 0)
+ {
+ TAO_Reverse_FlowSpec_Entry entry ((*connect)->flowname (),
+ (*connect)->direction_str (),
+ (*connect)->format (),
+ (*connect)->flow_protocol_str (),
+ (*connect)->carrier_protocol_str (),
+ local_addr);
+
+ int len = new_flowspec.length ();
+ if (i == len)
+ new_flowspec.length (len+1);
+ new_flowspec [i++] = entry.entry_to_string ();
+ }
+ }
+ connect_end = flow_set.end ();
+ for (connect = flow_set.begin ();
+ connect != connect_end; ++connect)
+ {
+ ACE_Addr *local_addr;
+ local_addr = (*connect)->get_local_addr ();
+ if (result == 0)
+ {
+ TAO_Reverse_FlowSpec_Entry entry ((*connect)->flowname (),
+ (*connect)->direction_str (),
+ (*connect)->format (),
+ (*connect)->flow_protocol_str (),
+ (*connect)->carrier_protocol_str (),
+ local_addr);
+
+ int len = new_flowspec.length ();
+ if (i == len)
+ new_flowspec.length (len+1);
+ new_flowspec [i++] = entry.entry_to_string ();
+ }
+ }
+ // Change the reverse flow spec to be sent.
+ flow_spec = new_flowspec;
+ }
+ break;
+ default:
+ break;
+ }
+ return 0;
+}
+
+int
+TAO_AV_Core::init_reverse_flows (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_FlowSpecSet &forward_flow_spec_set,
+ TAO_AV_FlowSpecSet &reverse_flow_spec_set,
+ TAO_AV_Core::EndPoint direction)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)TAO_AV_Core::init_reverse_flows\n"));
+ TAO_AV_FlowSpecSet acceptor_flow_set;
+ TAO_AV_FlowSpecSet connector_flow_set;
+ TAO_AV_FlowSpecSetItor end = reverse_flow_spec_set.end ();
+ TAO_AV_FlowSpecSetItor start = reverse_flow_spec_set.begin ();
+ for (;start != end; ++start)
+ {
+ TAO_FlowSpec_Entry *entry = (*start);
+ ACE_Addr *address = entry->address ();
+ if (address != 0)
+ {
+ if (this->get_acceptor (entry->flowname ())!= 0)
+ {
+ ACE_Addr *address = entry->address ();
+ TAO_FlowSpec_Entry *forward_entry =
+ this->get_flow_spec_entry (forward_flow_spec_set,
+ entry->flowname ());
+ if (forward_entry != 0)
+ forward_entry->set_peer_addr (address);
+ // Now we have to match the control and data flow objects.
+ // Check if there's a control object.
+ char control_flowname [BUFSIZ];
+ ACE_OS::sprintf (control_flowname,"%s_control",entry->flowname ());
+ TAO_FlowSpec_Entry *control_entry = this->get_flow_spec_entry (forward_flow_spec_set,
+ control_flowname);
+ if (control_entry != 0)
+ forward_entry->protocol_object ()->control_object (control_entry->protocol_object ());
+ }
+ else
+ connector_flow_set.insert (entry);
+ }
+ }
+ int result = -1;
+ switch (direction)
+ {
+ case TAO_AV_Core::TAO_AV_ENDPOINT_A:
+ result = this->connector_registry_->open (endpoint,
+ this,
+ connector_flow_set);
+ break;
+ default:
+ break;
+ }
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"acceptor_registry::open"),-1);
+ return 0;
+}
+
+TAO_FlowSpec_Entry *
+TAO_AV_Core::get_flow_spec_entry (TAO_AV_FlowSpecSet &flow_spec_set,
+ const char *flowname)
+{
+ TAO_AV_FlowSpecSetItor end = flow_spec_set.end ();
+ TAO_AV_FlowSpecSetItor begin = flow_spec_set.begin ();
+ for (;
+ begin != end;
+ ++begin)
+ {
+ if (ACE_OS::strcmp ((*begin)->flowname (),flowname) == 0)
+ return (*begin);
+ }
+ return 0;
+}
+
+TAO_AV_Acceptor*
+TAO_AV_Core::get_acceptor (const char *flowname)
+{
+ TAO_AV_AcceptorSetItor acceptor =
+ this->acceptor_registry_->begin ();
+ TAO_AV_AcceptorSetItor end =
+ this->acceptor_registry_->end ();
+
+ for (;acceptor != end; ++acceptor)
+ {
+ if (ACE_OS::strcmp ((*acceptor)->flowname (),flowname) == 0)
+ return *acceptor;
+ }
+ return 0;
+}
+
+TAO_AV_Connector*
+TAO_AV_Core::get_connector (const char *flowname)
+{
+ TAO_AV_ConnectorSetItor connector =
+ this->connector_registry_->begin ();
+ TAO_AV_ConnectorSetItor end =
+ this->connector_registry_->end ();
+
+ for (;connector != end; ++connector)
+ {
+ if (ACE_OS::strcmp ((*connector)->flowname (),flowname) == 0)
+ return *connector;
+ }
+ return 0;
+}
+
+int
+TAO_AV_Core::init_transport_factories (void)
+{
+ TAO_AV_TransportFactorySetItor end = this->transport_factories_.end ();
+ TAO_AV_TransportFactorySetItor factory = this->transport_factories_.begin ();
+
+ if (factory == end)
+ {
+ TAO_AV_Transport_Factory *udp_factory = 0;
+ TAO_AV_Transport_Item *udp_item = 0;
+
+ udp_factory =
+ ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance ("UDP_Factory");
+ if (udp_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "UDP Factory"));
+
+ ACE_NEW_RETURN (udp_factory,
+ TAO_AV_UDP_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (udp_item, TAO_AV_Transport_Item ("UDP_Factory"), -1);
+ udp_item->factory (udp_factory);
+
+ this->transport_factories_.insert (udp_item);
+
+ TAO_AV_Transport_Factory *tcp_factory = 0;
+ TAO_AV_Transport_Item *tcp_item = 0;
+
+ tcp_factory =
+ ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance ("TCP_Factory");
+ if (tcp_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "TCP Factory"));
+
+ ACE_NEW_RETURN (tcp_factory,
+ TAO_AV_TCP_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (tcp_item, TAO_AV_Transport_Item ("TCP_Factory"), -1);
+ tcp_item->factory (tcp_factory);
+
+ this->transport_factories_.insert (tcp_item);
+
+
+ }
+ return 0;
+}
+
+int
+TAO_AV_Core::init_flow_protocol_factories (void)
+{
+ TAO_AV_Flow_ProtocolFactorySetItor end = this->flow_protocol_factories_.end ();
+ TAO_AV_Flow_ProtocolFactorySetItor factory = this->flow_protocol_factories_.begin ();
+
+ if (factory == end)
+ {
+ TAO_AV_Flow_Protocol_Factory *udp_flow_factory = 0;
+ TAO_AV_Flow_Protocol_Item *udp_item = 0;
+
+ udp_flow_factory =
+ ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance ("UDP_Flow_Factory");
+ if (udp_flow_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "UDP Flow Factory"));
+
+ ACE_NEW_RETURN (udp_flow_factory,
+ TAO_AV_UDP_Flow_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (udp_item, TAO_AV_Flow_Protocol_Item ("UDP_Flow_Factory"), -1);
+ udp_item->factory (udp_flow_factory);
+
+ this->flow_protocol_factories_.insert (udp_item);
+
+ TAO_AV_Flow_Protocol_Factory *tcp_flow_factory = 0;
+ TAO_AV_Flow_Protocol_Item *tcp_item = 0;
+
+ tcp_flow_factory =
+ ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance ("TCP_Flow_Factory");
+ if (tcp_flow_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "TCP Flow Factory"));
+
+ ACE_NEW_RETURN (tcp_flow_factory,
+ TAO_AV_TCP_Flow_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (tcp_item, TAO_AV_Flow_Protocol_Item ("TCP_Flow_Factory"), -1);
+ tcp_item->factory (tcp_flow_factory);
+
+ this->flow_protocol_factories_.insert (tcp_item);
+
+ TAO_AV_Flow_Protocol_Factory *rtp_flow_factory = 0;
+ TAO_AV_Flow_Protocol_Item *rtp_item = 0;
+
+ rtp_flow_factory =
+ ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance ("RTP_Flow_Factory");
+ if (rtp_flow_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "RTP Flow Factory"));
+
+ ACE_NEW_RETURN (rtp_flow_factory,
+ TAO_AV_RTP_Flow_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (rtp_item, TAO_AV_Flow_Protocol_Item ("RTP_Flow_Factory"), -1);
+ rtp_item->factory (rtp_flow_factory);
+
+ this->flow_protocol_factories_.insert (rtp_item);
+
+ TAO_AV_Flow_Protocol_Factory *rtcp_flow_factory = 0;
+ TAO_AV_Flow_Protocol_Item *rtcp_item = 0;
+
+ rtcp_flow_factory =
+ ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance ("RTCP_Flow_Factory");
+ if (rtcp_flow_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "RTCP Flow Factory"));
+
+ ACE_NEW_RETURN (rtcp_flow_factory,
+ TAO_AV_RTCP_Flow_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (rtcp_item, TAO_AV_Flow_Protocol_Item ("RTCP_Flow_Factory"), -1);
+ rtcp_item->factory (rtcp_flow_factory);
+
+ this->flow_protocol_factories_.insert (rtcp_item);
+
+ TAO_AV_Flow_Protocol_Factory *sfp_flow_factory = 0;
+ TAO_AV_Flow_Protocol_Item *sfp_item = 0;
+
+ sfp_flow_factory =
+ ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance ("SFP_Flow_Factory");
+ if (sfp_flow_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "SFP Flow Factory"));
+
+ ACE_NEW_RETURN (sfp_flow_factory,
+ TAO_AV_SFP_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (sfp_item, TAO_AV_Flow_Protocol_Item ("SFP_Flow_Factory"), -1);
+ sfp_item->factory (sfp_flow_factory);
+
+ this->flow_protocol_factories_.insert (sfp_item);
+ }
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_Transport_Item
+//------------------------------------------------------------
+TAO_AV_Transport_Item::TAO_AV_Transport_Item (const ACE_CString &name)
+ : name_ (name),
+ factory_ (0)
+{
+}
+
+//------------------------------------------------------------
+// TAO_AV_Transport_Item
+//------------------------------------------------------------
+TAO_AV_Flow_Protocol_Item::TAO_AV_Flow_Protocol_Item (const ACE_CString &name)
+ : name_ (name),
+ factory_ (0)
+{
+}
+
+//------------------------------------------------------------
+// TAO_AV_Connector_Registry
+//------------------------------------------------------------
+
+TAO_AV_Connector_Registry::TAO_AV_Connector_Registry (void)
+{
+}
+
+int
+TAO_AV_Connector_Registry::open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_AV_FlowSpecSet &flow_spec_set)
+{
+ TAO_AV_FlowSpecSetItor last_flowspec =
+ flow_spec_set.end ();
+ for (TAO_AV_FlowSpecSetItor flow_spec = flow_spec_set.begin ();
+ flow_spec != last_flowspec;
+ ++flow_spec)
+ {
+ TAO_FlowSpec_Entry *entry = (*flow_spec);
+ ACE_Addr *address = entry->address ();
+ const char *flow_protocol = entry->flow_protocol_str ();
+ const char *transport_protocol = entry->carrier_protocol_str ();
+ if (ACE_OS::strcmp (flow_protocol,"") == 0)
+ flow_protocol = transport_protocol;
+ if (address == 0)
+ {
+ // Protocol was specified without an endpoint. According to
+ // the "iioploc" spec, this is valid. As such, we extend
+ // this feature to all pluggable protocols. All TAO
+ // pluggable protocols are expected to have the ability to
+ // create a default endpoint.
+
+ ACE_ERROR_RETURN ((LM_ERROR,"Protocol was specified without an endpoint\n"),-1);
+ }
+ else
+ {
+ TAO_AV_Flow_ProtocolFactorySetItor flow_factory_end =
+ av_core->flow_protocol_factories ()->end ();
+
+ for (TAO_AV_Flow_ProtocolFactorySetItor flow_factory =
+ av_core->flow_protocol_factories ()->begin ();
+ flow_factory != flow_factory_end;
+ ++flow_factory)
+ {
+ if ((*flow_factory)->factory ()->match_protocol (flow_protocol))
+ {
+ // @@Naga:Instead of making a new connector every time we should try and see if a connector exists
+ // for this transport already and hence we can reuse it.
+ TAO_AV_TransportFactorySetItor transport_factory_end =
+ av_core->transport_factories ()->end ();
+ for (TAO_AV_TransportFactorySetItor transport_factory =
+ av_core->transport_factories ()->begin ();
+ transport_factory != transport_factory_end;
+ ++transport_factory)
+ {
+ if ((*transport_factory)->factory ()->match_protocol (transport_protocol))
+ {
+ TAO_AV_Connector *connector =
+ (*transport_factory)->factory ()->make_connector ();
+ if (connector != 0)
+ {
+ // add connector to list.
+ this->connectors_.insert (connector);
+
+ if (connector->open (endpoint,
+ av_core,
+ (*flow_factory)->factory ()) == -1)
+ return -1;
+ TAO_AV_Transport *transport = 0;
+ if (connector->connect (entry,
+ transport) == -1)
+ return -1;
+ entry->transport (transport);
+ break;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Unable to create an "
+ "connector for <%s>\n",
+ entry->flowname ()),
+ -1);
+ }
+ else
+ continue;
+ }
+ // Now check if the flow factory has a control flow factory.
+ const char *control_factory_name
+ = (*flow_factory)->factory ()->control_flow_factory ();
+
+ if (control_factory_name != 0)
+ {
+ TAO_AV_Flow_ProtocolFactorySetItor control_factory_end =
+ av_core->flow_protocol_factories ()->end ();
+
+ for (TAO_AV_Flow_ProtocolFactorySetItor control_flow_factory =
+ av_core->flow_protocol_factories ()->begin ();
+ control_flow_factory != control_factory_end;
+ ++control_flow_factory)
+ {
+ if ((*control_flow_factory)->factory ()->match_protocol (control_factory_name))
+ {
+ char control_flowname [BUFSIZ];
+ ACE_OS::sprintf (control_flowname,"%s_control",entry->flowname ());
+ // Address will be one port number above the data port.
+ // @@ This requires a more generic solution. This is a hack.
+ TAO_Tokenizer address_str (CORBA::string_dup (entry->address_str ()),':');
+ int port = ACE_OS::atoi (address_str [1]);
+ // Increment the port.
+ port++;
+ char control_addr [BUFSIZ];
+ ACE_OS::sprintf (control_addr,"%s=%s:%d",
+ entry->carrier_protocol_str (),
+ address_str[0],port);
+ TAO_Forward_FlowSpec_Entry *control_entry = 0;
+ // We want to have the control entry as producer
+ // so timeout events will happen.
+ ACE_NEW_RETURN (control_entry,
+ TAO_Forward_FlowSpec_Entry (control_flowname,
+ "IN",
+ entry->format (),
+ entry->flow_protocol_str (),
+ control_addr),
+ -1);
+ // Add the control entry to the flow_spec_set that's passed so that the control entry
+ // will also be called during flow starts and stops. except that if the user specifies
+ // a flowspec in start then the control entry may not be in that but it has to be started
+ // if the flowspec has the associated data flow entry. @@ We'll leave this matter for now.
+ flow_spec_set.insert (control_entry);
+ for (TAO_AV_TransportFactorySetItor transport_factory =
+ av_core->transport_factories ()->begin ();
+ transport_factory != transport_factory_end;
+ ++transport_factory)
+ {
+ if ((*transport_factory)->factory ()->match_protocol (transport_protocol))
+ {
+ TAO_AV_Connector *connector =
+ (*transport_factory)->factory ()->make_connector ();
+ if (connector != 0)
+ {
+ // add connector to list.
+ this->connectors_.insert (connector);
+
+ if (connector->open (endpoint,
+ av_core,
+ (*control_flow_factory)->factory ()) == -1)
+ return -1;
+ TAO_AV_Transport *transport = 0;
+ if (connector->connect (control_entry,
+ transport) == -1)
+ return -1;
+ control_entry->transport (transport);
+ break;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Unable to create an "
+ "connector for <%s>\n",
+ control_entry->flowname ()),
+ -1);
+ }
+ else
+ continue;
+ }
+ // Now set the control object on the data flow object.
+ entry->protocol_object ()->control_object (control_entry->protocol_object ());
+ }
+ }
+ }
+ }
+ else
+ continue;
+ }
+ }
+ }
+ return 0;
+}
+
+int
+TAO_AV_Connector_Registry::close_all (void)
+{
+ TAO_AV_ConnectorSetItor end =
+ this->connectors_.end ();
+
+ for (TAO_AV_ConnectorSetItor i = this->connectors_.begin ();
+ i != end;
+ ++i)
+ {
+ if (*i == 0)
+ continue;
+
+ (*i)->close ();
+
+ delete *i;
+ }
+
+ this->connectors_.reset ();
+ return 0;
+}
+
+TAO_AV_Connector_Registry::~TAO_AV_Connector_Registry (void)
+{
+ this->close_all ();
+}
+
+//------------------------------------------------------------
+// TAO_AV_Acceptor_Registry
+//------------------------------------------------------------
+
+TAO_AV_Acceptor_Registry::TAO_AV_Acceptor_Registry (void)
+{
+}
+
+TAO_AV_Acceptor_Registry::~TAO_AV_Acceptor_Registry (void)
+{
+}
+
+int
+TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_AV_FlowSpecSet &flow_spec_set)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Acceptor_Registry::open"));
+ TAO_AV_FlowSpecSetItor last_flowspec =
+ flow_spec_set.end ();
+ for (TAO_AV_FlowSpecSetItor flow_spec = flow_spec_set.begin ();
+ flow_spec != last_flowspec;
+ ++flow_spec)
+ {
+ TAO_FlowSpec_Entry *entry = (*flow_spec);
+ ACE_Addr *address = entry->address ();
+ const char *flow_protocol = entry->flow_protocol_str ();
+ const char *transport_protocol = entry->carrier_protocol_str ();
+ if (ACE_OS::strcmp (flow_protocol,"") == 0)
+ flow_protocol = transport_protocol;
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Acceptor_Registry::protocol for flow %s is %d",
+ entry->flowname (),transport_protocol));
+ if (address == 0)
+ {
+ this->open_default (endpoint,av_core, entry);
+ continue;
+ }
+ else
+ {
+ // Now get the list of avaliable protocol factories.
+ TAO_AV_Flow_ProtocolFactorySetItor flow_factory_end =
+ av_core->flow_protocol_factories ()->end ();
+
+ for (TAO_AV_Flow_ProtocolFactorySetItor flow_factory =
+ av_core->flow_protocol_factories ()->begin ();
+ flow_factory != flow_factory_end;
+ ++flow_factory)
+ {
+ if ((*flow_factory)->factory ()->match_protocol (flow_protocol))
+ {
+ TAO_AV_TransportFactorySetItor transport_factory_end =
+ av_core->transport_factories ()->end ();
+ for (TAO_AV_TransportFactorySetItor transport_factory =
+ av_core->transport_factories ()->begin ();
+ transport_factory != transport_factory_end;
+ ++transport_factory)
+ {
+ if ((*transport_factory)->factory ()->match_protocol (transport_protocol))
+ {
+ TAO_AV_Acceptor *acceptor =
+ (*transport_factory)->factory ()->make_acceptor ();
+ if (acceptor != 0)
+ {
+ // add acceptor to list.
+ this->acceptors_.insert (acceptor);
+
+ if (acceptor->open (endpoint,
+ av_core,
+ entry,
+ (*flow_factory)->factory ()) == -1)
+ return -1;
+ break;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Unable to create an "
+ "acceptor for <%s>\n",
+ entry->flowname ()),
+ -1);
+ }
+ else
+ continue;
+ }
+ // Now check if the flow factory has a control flow factory.
+ const char *control_factory_name
+ = (*flow_factory)->factory ()->control_flow_factory ();
+
+ if (control_factory_name != 0)
+ {
+ TAO_AV_Flow_ProtocolFactorySetItor control_factory_end =
+ av_core->flow_protocol_factories ()->end ();
+
+ for (TAO_AV_Flow_ProtocolFactorySetItor control_flow_factory =
+ av_core->flow_protocol_factories ()->begin ();
+ control_flow_factory != control_factory_end;
+ ++control_flow_factory)
+ {
+ if ((*control_flow_factory)->factory ()->match_protocol (control_factory_name))
+ {
+ char control_flowname [BUFSIZ];
+ ACE_OS::sprintf (control_flowname,"%s_control",entry->flowname ());
+ // Address will be one port number above the data port.
+ // @@ This requires a more generic solution. This is a hack.
+ TAO_Tokenizer address_str (CORBA::string_dup (entry->address_str ()),':');
+ int port = ACE_OS::atoi (address_str [1]);
+ // Increment the port.
+ port++;
+ char control_addr [BUFSIZ];
+ ACE_OS::sprintf (control_addr,"%s=%s:%d",
+ entry->carrier_protocol_str (),
+ address_str[0],port);
+ TAO_Forward_FlowSpec_Entry *control_entry = 0;
+ // We want to have the control entry as producer
+ // so timeout events will happen.
+ ACE_NEW_RETURN (control_entry,
+ TAO_Forward_FlowSpec_Entry (control_flowname,
+ "IN",
+ entry->format (),
+ entry->flow_protocol_str (),
+ control_addr),
+ -1);
+ // Add the control entry to the flow_spec_set that's passed so that the control entry
+ // will also be called during flow starts and stops. except that if the user specifies
+ // a flowspec in start then the control entry may not be in that but it has to be started
+ // if the flowspec has the associated data flow entry. @@ We'll leave this matter for now.
+ flow_spec_set.insert (control_entry);
+ TAO_AV_TransportFactorySetItor transport_factory_end =
+ av_core->transport_factories ()->end ();
+ for (TAO_AV_TransportFactorySetItor transport_factory =
+ av_core->transport_factories ()->begin ();
+ transport_factory != transport_factory_end;
+ ++transport_factory)
+ {
+ if ((*transport_factory)->factory ()->match_protocol (transport_protocol))
+ {
+ TAO_AV_Acceptor *acceptor =
+ (*transport_factory)->factory ()->make_acceptor ();
+ if (acceptor != 0)
+ {
+ // add acceptor to list.
+ this->acceptors_.insert (acceptor);
+
+ if (acceptor->open (endpoint,
+ av_core,
+ control_entry,
+ (*control_flow_factory)->factory ()) == -1)
+ return -1;
+ break;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Unable to create an "
+ "acceptor for <%s>\n",
+ entry->flowname ()),
+ -1);
+ }
+ else
+ continue;
+ }
+ // Now set the control object on the data flow object.
+ entry->protocol_object ()->control_object (control_entry->protocol_object ());
+ }
+ }
+ }
+ }
+ else
+ continue;
+ }
+ }
+ }
+ return 0;
+}
+
+int
+TAO_AV_Acceptor_Registry::open_default (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Acceptor_Registry::open_default "));
+ // No endpoints were specified, we let each protocol pick its own
+ // default...
+
+ TAO_AV_Flow_ProtocolFactorySetItor flow_factory_end =
+ av_core->flow_protocol_factories ()->end ();
+
+ const char *flow_protocol = entry->flow_protocol_str ();
+ const char *transport_protocol = entry->carrier_protocol_str ();
+
+ if (ACE_OS::strcmp (flow_protocol,"") == 0)
+ flow_protocol = transport_protocol;
+
+ // loop through loaded protocols looking for protocol_prefix
+ TAO_AV_Flow_ProtocolFactorySetItor flow_factory = av_core->flow_protocol_factories ()->begin ();
+ TAO_AV_TransportFactorySetItor transport_factory = av_core->transport_factories ()->begin ();
+ for (;
+ flow_factory != flow_factory_end;
+ ++flow_factory)
+ {
+ if (!(*flow_factory)->factory ()->match_protocol (flow_protocol))
+ {
+ // If we have no matching protocol then keep searching
+ // for one until the entire list of protocols has been
+ // searched.
+
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) Unable to match protocol prefix "
+ "for <%s>\n",
+ flow_protocol));
+ continue;
+ }
+ else
+ {
+ TAO_AV_TransportFactorySetItor transport_factory_end =
+ av_core->transport_factories ()->end ();
+ for (;transport_factory != transport_factory_end;
+ ++transport_factory)
+ {
+ if (!(*transport_factory)->factory ()->match_protocol (transport_protocol))
+ {
+ // If we have no matching protocol then keep searching
+ // for one until the entire list of protocols has been
+ // searched.
+
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) Unable to match protocol prefix "
+ "for <%s>\n",
+ flow_protocol));
+ continue;
+ }
+
+
+
+ // got it, make an acceptor
+ TAO_AV_Acceptor *acceptor =
+ (*transport_factory)->factory ()->make_acceptor ();
+
+ if (acceptor == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) unable to create "
+ "an acceptor for <%d>\n",
+ transport_protocol));
+ continue;
+ }
+
+ if (acceptor->open_default (endpoint,
+ av_core,
+ entry,
+ (*flow_factory)->factory ()) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) unable to open "
+ "default acceptor for <%s>%p\n",
+ (*transport_factory)->name ().c_str (), ""));
+ continue;
+ }
+
+ this->acceptors_.insert (acceptor);
+ }
+ }
+ }
+ if (this->acceptors_.size () == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P%t) cannot create any default acceptor\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+TAO_AV_Acceptor_Registry::close_all (void)
+{
+ TAO_AV_AcceptorSetItor end =
+ this->acceptors_.end ();
+
+ for (TAO_AV_AcceptorSetItor i = this->acceptors_.begin ();
+ i != end;
+ ++i)
+ {
+ if (*i == 0)
+ continue;
+
+ (*i)->close ();
+
+ delete *i;
+ }
+
+ this->acceptors_.reset ();
+ return 0;
+}
+
+//----------------------------------------------------------------------
+// TAO_AV_Transport
+//----------------------------------------------------------------------
+
+TAO_AV_Transport::TAO_AV_Transport (void)
+{
+}
+
+// Virtual destructor.
+TAO_AV_Transport::~TAO_AV_Transport (void)
+{
+}
+
+ACE_Addr*
+TAO_AV_Transport::get_local_addr (void)
+{
+ return 0;
+}
+
+//----------------------------------------------------------------------
+// TAO_AV_Flow_Handler
+//----------------------------------------------------------------------
+
+//TAO_AV_Flow_Handler::TAO_AV_Flow_Handler (TAO_AV_Callback *callback)
+TAO_AV_Flow_Handler::TAO_AV_Flow_Handler (void)
+ :transport_ (0),
+ callback_ (0),
+ protocol_object_ (0)
+{
+}
+
+int
+TAO_AV_Flow_Handler::set_remote_address (ACE_Addr */* address */)
+{
+ return 0;
+}
+
+int
+TAO_AV_Flow_Handler::start (TAO_FlowSpec_Entry::Role role)
+{
+ this->callback_->handle_start ();
+ switch (role)
+ {
+ // only for producer we register for the timeout.
+ case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
+ {
+ this->schedule_timer ();
+ }
+ break;
+ default:
+ break;
+ }
+ return 0;
+}
+
+int
+TAO_AV_Flow_Handler::schedule_timer (void)
+{
+ ACE_Event_Handler *event_handler = this->event_handler ();
+ ACE_Time_Value *tv = 0;
+ this->callback_->get_timeout (tv,
+ this->timeout_arg_);
+ if (tv == 0)
+ return 0;
+ this->timer_id_ = event_handler->reactor ()->schedule_timer (event_handler,
+ 0,
+ *tv);
+ if (this->timer_id_ < 0)
+ return -1;
+}
+
+int
+TAO_AV_Flow_Handler::stop (TAO_FlowSpec_Entry::Role role)
+{
+ this->callback_->handle_stop ();
+ switch (role)
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
+ {
+ int result = this->event_handler ()->reactor ()->cancel_timer (this->timer_id_);
+ if (result < 0)
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Flow_Handler::stop:cancel_timer failed\n"));
+ }
+ break;
+ default:
+ break;
+ }
+ return 0;
+}
+
+int
+TAO_AV_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ this->callback_->handle_timeout (this->timeout_arg_);
+ ACE_Event_Handler *event_handler = this->event_handler ();
+ ACE_Time_Value *timeout = 0;
+ this->callback_->get_timeout (timeout,
+ this->timeout_arg_);
+ if (timeout == 0)
+ return 0;
+ this->timer_id_ = event_handler->reactor ()->schedule_timer (event_handler,
+ 0,
+ *timeout);
+ return 0;
+}
+
+TAO_AV_Transport*
+TAO_AV_Flow_Handler::transport (void)
+{
+ return this->transport_;
+}
+
+void
+TAO_AV_Flow_Handler::protocol_object (TAO_AV_Protocol_Object *protocol_object)
+{
+ this->protocol_object_ = protocol_object;
+}
+
+TAO_AV_Protocol_Object*
+TAO_AV_Flow_Handler::protocol_object (void)
+{
+ return this->protocol_object_;
+}
+
+void
+TAO_AV_Flow_Handler::callback (TAO_AV_Callback *callback)
+{
+ this->callback_ = callback;
+}
+
+// TAO_AV_Connector
+TAO_AV_Connector::TAO_AV_Connector (void)
+{
+}
+
+TAO_AV_Connector::~TAO_AV_Connector (void)
+{
+}
+
+// TAO_AV_Acceptor
+TAO_AV_Acceptor::TAO_AV_Acceptor (void)
+{
+}
+
+TAO_AV_Acceptor::~TAO_AV_Acceptor (void)
+{
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Node <TAO_AV_Connector*>;
+template class ACE_Node <TAO_AV_Acceptor*>;
+template class ACE_Unbounded_Set<TAO_AV_Acceptor*>;
+template class ACE_Unbounded_Set<TAO_AV_Connector*>;
+template class ACE_Unbounded_Set_Iterator<TAO_AV_Acceptor*>;
+template class ACE_Unbounded_Set_Iterator<TAO_AV_Connector*>;
+template class ACE_Singleton<TAO_AV_Core,ACE_Null_Mutex>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Node <TAO_AV_Connector*>
+#pragma instantiate ACE_Node <TAO_AV_Acceptor*>
+#pragma instantiate ACE_Unbounded_Set<TAO_AV_Connector*>
+#pragma instantiate ACE_Unbounded_Set<TAO_AV_Acceptor*>
+#pragma instantiate ACE_Unbounded_Set_Iterator<TAO_AV_Connector*>
+#pragma instantiate ACE_Unbounded_Set_Iterator<TAO_AV_Acceptor*>
+#pragma instantiate ACE_Singleton<TAO_AV_Core,ACE_Null_Mutex>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */