diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp | 1186 |
1 files changed, 1186 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp new file mode 100644 index 00000000000..830f8fe5e98 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp @@ -0,0 +1,1186 @@ +// $Id$ + +#include "orbsvcs/AV/AV_Core.h" +#include "orbsvcs/AV/FlowSpec_Entry.h" +#include "orbsvcs/AV/Transport.h" +#include "orbsvcs/AV/Protocol_Factory.h" +#include "orbsvcs/AV/UDP.h" +#include "orbsvcs/AV/TCP.h" +#include "orbsvcs/AV/RTP.h" +#include "orbsvcs/AV/RTCP.h" +#include "orbsvcs/AV/sfp.h" +#include "orbsvcs/AV/default_resource.h" + +#if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS) +#include "orbsvcs/AV/QoS_UDP.h" +#endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */ + +#if defined (ACE_HAS_SCTP) +#include "orbsvcs/AV/SCTP_SEQ.h" +#endif // ACE_HAS_SCTP + +#include "tao/debug.h" +#include "tao/ORB_Core.h" + +#include "ace/Dynamic_Service.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +//------------------------------------------------------------ +// TAO_AV_Core +//------------------------------------------------------------ + +TAO_AV_Core::TAO_AV_Core (void) + :connector_registry_ (0), + acceptor_registry_ (0) +{ + ACE_NEW (this->connector_registry_, + TAO_AV_Connector_Registry + ); + ACE_NEW (this->acceptor_registry_, + TAO_AV_Acceptor_Registry + ); +} + +TAO_AV_Core::~TAO_AV_Core (void) +{ + delete this->connector_registry_; + delete this->acceptor_registry_; + + TAO_AV_TransportFactorySetItor transport_iter = + this->transport_factories_.begin(); + + while (transport_iter != this->transport_factories_.end()) + { + if ((*transport_iter)->factory()->ref_count != 1) + { + delete (*transport_iter)->factory(); + } + delete (*transport_iter); + transport_iter++; + } + + TAO_AV_Flow_ProtocolFactorySetItor flow_iter = + this->flow_protocol_factories_.begin(); + + while (flow_iter != this->flow_protocol_factories_.end()) + { + if ((*flow_iter)->factory()->ref_count != 1) + { + delete (*flow_iter)->factory(); + } + delete (*flow_iter); + + flow_iter++; + } +} + +CORBA::ORB_ptr +TAO_AV_Core::orb (void) +{ + return this->orb_.in (); +} + +void +TAO_AV_Core::orb (CORBA::ORB_ptr orb) +{ + this->orb_ = orb; +} + +PortableServer::POA_ptr +TAO_AV_Core::poa (void) +{ + return this->poa_.in (); +} + +void +TAO_AV_Core::poa (PortableServer::POA_ptr poa) +{ + this->poa_ = poa; +} + +TAO_AV_Connector_Registry* +TAO_AV_Core::connector_registry (void) +{ + return this->connector_registry_; +} + +TAO_AV_Acceptor_Registry* +TAO_AV_Core::acceptor_registry (void) +{ + return this->acceptor_registry_; +} + +TAO_AV_TransportFactorySet * +TAO_AV_Core::transport_factories (void) +{ + return &this->transport_factories_; +} + +TAO_AV_Flow_ProtocolFactorySet* +TAO_AV_Core::flow_protocol_factories (void) +{ + return &this->flow_protocol_factories_; +} + +int +TAO_AV_Core::stop_run (void) +{ + this->stop_run_ = 1; + return 0; +} + +int +TAO_AV_Core::run (void) +{ + this->stop_run_ = 0; + while (!this->stop_run_ && this->orb_->work_pending ()) + this->orb_->perform_work (); + return 0; +} + +void +TAO_AV_Core::reactor (ACE_Reactor *r) +{ + this->reactor_ = r; +} + +ACE_Reactor * +TAO_AV_Core::reactor (void) +{ + return this->reactor_; +} + + +int +TAO_AV_Core::init (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa + ACE_ENV_ARG_DECL_NOT_USED) +{ + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init ")); + this->orb_ = CORBA::ORB::_duplicate (orb); + this->poa_ = PortableServer::POA::_duplicate (poa); + this->reactor (this->orb_->orb_core ()->reactor ()); + this->init_transport_factories (); + this->init_flow_protocol_factories (); + return 0; +} + +int +TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_FlowSpecSet &flow_spec_set, + TAO_AV_Core::EndPoint direction, + AVStreams::flowSpec &flow_spec) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_Core::init_forward_flows\n")); + + TAO_AV_FlowSpecSet address_flow_set; + TAO_AV_FlowSpecSet flow_set; + TAO_AV_FlowSpecSetItor end = flow_spec_set.end (); + for (TAO_AV_FlowSpecSetItor start = flow_spec_set.begin (); + start != end; ++start) + { + TAO_FlowSpec_Entry *entry = (*start); + switch (direction) + { + case TAO_AV_Core::TAO_AV_ENDPOINT_B: + { + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: + { + entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER); + break; + } + case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: + { + entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER); + break; + } + } + break; + } + case TAO_AV_Core::TAO_AV_ENDPOINT_A: + { + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: + entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER); + break; + case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: + entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER); + break; + } + break; + } + default: + break; + } + ACE_Addr *address = entry->address (); + if (address != 0) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "address given for flow %s\n", + entry->flowname ())); + + address_flow_set.insert (entry); + } + else + flow_set.insert (entry); + } //End of For Loop + + + int result = -1; + switch (direction) + { + case TAO_AV_Core::TAO_AV_ENDPOINT_A: + if (address_flow_set.size () > 0) + { + result = this->acceptor_registry_->open (endpoint, + this, + address_flow_set); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO_AV_Core::init_forward_flows::acceptor_registry::open failed\n"), + -1); + TAO_AV_FlowSpecSetItor end = address_flow_set.end (); + for (TAO_AV_FlowSpecSetItor start = address_flow_set.begin (); + start != end; ++start) + { + TAO_FlowSpec_Entry *entry = (*start); + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: + { + if (entry->handler () != 0) + { + //Yamuna:PLEASE CHECK THIS LATER +#if defined ACE_HAS_RAPI || defined (ACE_HAS_WINSOCK2_GQOS) + // For IN flows on the A side we should remove the handlers from the reactor. + ACE_Event_Handler *event_handler = entry->handler ()->event_handler (); + + if (event_handler->reactor () != 0) + { + result = event_handler->reactor ()->remove_handler (event_handler, + ACE_Event_Handler::READ_MASK); + + if (result < 0) + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_Core::init_forward_flows: remove_handler failed\n")); + } +#endif //ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS + } + } + default: + break; + } + // Now if the address_set has been changed due to the addition of a control entry we should + // add that to the flow_spec_set also. + if (flow_spec_set.find (entry) < 0) + { + // entry doesn't exist so add it. + flow_spec_set.insert (entry); + // size_t len = flow_spec.length (); + // flow_spec.length (len+1); + // flow_spec [len] = entry->entry_to_string (); + } + } + } + break; + case TAO_AV_Core::TAO_AV_ENDPOINT_B: + { + if (address_flow_set.size () > 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%N,%l) This connector registry is called\n")); + + result = this->connector_registry_->open (endpoint, + this, + address_flow_set); + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_Forward_flows: connector_registry open failed\n"),-1); + TAO_AV_FlowSpecSetItor end = address_flow_set.end (); + for (TAO_AV_FlowSpecSetItor start = address_flow_set.begin (); + start != end; ++start) + { + TAO_FlowSpec_Entry *entry = (*start); + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: + { + if (entry->handler () != 0) + { + // @@Naga: This wont be called in the case of Full Profile. + // For IN flows on the A side we should remove the handlers from the reactor. + ACE_Event_Handler *event_handler = entry->handler ()->event_handler (); + result = event_handler->reactor ()->remove_handler (event_handler, + ACE_Event_Handler::READ_MASK); + if (result < 0) + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n")); + } + } + default: + break; + } + // Now if the address_set has been changed due to the addition of a control entry we should + // add that to the flow_spec_set also. + if (flow_spec_set.find (entry) < 0) + { + // entry doesn't exist so add it. + flow_spec_set.insert (entry); + } + } + } + if (flow_set.size () > 0) + { + TAO_AV_FlowSpecSet tmp_flow_set (flow_set); + flow_set.reset (); + TAO_AV_FlowSpecSetItor end = tmp_flow_set.end (); + TAO_AV_FlowSpecSetItor start = tmp_flow_set.begin (); + for (; start != end; ++start) + { + TAO_FlowSpec_Entry *entry = *start; + TAO_FlowSpec_Entry *new_entry; + ACE_CString dir; + if (entry->direction () == 0) + dir += "IN"; + else if (entry->direction () == 1) + dir += "OUT"; + if (entry->get_peer_addr () != 0) + { + ACE_NEW_RETURN (new_entry, + TAO_Forward_FlowSpec_Entry (entry->flowname (), + dir.c_str (), + entry->format (), + entry->flow_protocol_str (), + entry->carrier_protocol_str (), + entry->get_peer_addr (), + entry->control_address ()), + -1); + } + else + { + ACE_NEW_RETURN (new_entry, + TAO_Forward_FlowSpec_Entry (entry->flowname (), + dir.c_str (), + entry->format (), + entry->flow_protocol_str (), + entry->carrier_protocol_str (), + entry->address (), + entry->control_address ()), + -1); + } + flow_set.insert (new_entry); + } + result = this->acceptor_registry_->open (endpoint, + this, + flow_set); + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_Forward_flows: Acceptor_registry open failed\n"),-1); + end = address_flow_set.end (); + start = address_flow_set.begin (); + for (; start != end; ++start) + { + TAO_FlowSpec_Entry *entry = (*start); + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: + { + if (entry->handler () != 0) + { + // For IN flows on the A side we should remove the handlers from the reactor. + ACE_Event_Handler *event_handler = entry->handler ()->event_handler (); + result = event_handler->reactor ()->remove_handler (event_handler, + ACE_Event_Handler::READ_MASK); + if (result < 0) + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n")); + } + } + default: + break; + } + // Now if the address_set has been changed due to the addition of a control entry we should + // add that to the flow_spec_set also. + if (flow_spec_set.find (entry) < 0) + { + // entry doesn't exist so add it. + flow_spec_set.insert (entry); + } + } + } + + AVStreams::flowSpec new_flowspec (static_cast<CORBA::ULong> (flow_spec_set.size ())); + int i=0; + TAO_AV_FlowSpecSetItor connect_end = address_flow_set.end (); + TAO_AV_FlowSpecSetItor connect = address_flow_set.begin (); + for (;connect != connect_end; ++connect) + { + ACE_Addr *local_addr; + ACE_Addr *local_control_addr; + local_addr = (*connect)->get_local_addr (); + local_control_addr = (*connect)->get_local_control_addr (); + if (local_addr != 0) + { + TAO_Reverse_FlowSpec_Entry entry ((*connect)->flowname (), + (*connect)->direction_str (), + (*connect)->format (), + (*connect)->flow_protocol_str (), + (*connect)->carrier_protocol_str (), + local_addr, + local_control_addr); + /* + ACE_Addr *addr; + if ((addr = (*connect)->get_peer_addr ()) != 0) + { + entry.set_peer_addr (addr); + }; + */ + int len = new_flowspec.length (); + if (i == len) + new_flowspec.length (len+1); + new_flowspec [i++] = entry.entry_to_string (); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "reverse Flow Spec Is %s\n", entry.entry_to_string ())); + } + } + connect_end = flow_set.end (); + for (connect = flow_set.begin (); + connect != connect_end; ++connect) + { + ACE_Addr *local_addr; + ACE_Addr *local_control_addr; + local_addr = (*connect)->get_local_addr (); + local_control_addr = (*connect)->get_local_control_addr (); + if (local_addr != 0) + { + TAO_Reverse_FlowSpec_Entry entry ((*connect)->flowname (), + (*connect)->direction_str (), + (*connect)->format (), + (*connect)->flow_protocol_str (), + (*connect)->carrier_protocol_str (), + local_addr, + local_control_addr); + + int len = new_flowspec.length (); + if (i == len) + new_flowspec.length (len+1); + new_flowspec [i++] = entry.entry_to_string (); + } + } + // Change the reverse flow spec to be sent. + // int index = flow_spec.length () + 1; + int index = new_flowspec.length (); + flow_spec.length (index); + for (i = 0; i < index; i++) + { + flow_spec [i] = new_flowspec [i]; + } + } + break; + default: + break; + } + return 0; +} + +int +TAO_AV_Core::init_reverse_flows (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_FlowSpecSet &forward_flow_spec_set, + TAO_AV_FlowSpecSet &reverse_flow_spec_set, + TAO_AV_Core::EndPoint direction) +{ + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)TAO_AV_Core::init_reverse_flows\n")); + TAO_AV_FlowSpecSet acceptor_flow_set; + TAO_AV_FlowSpecSet connector_flow_set; + TAO_AV_FlowSpecSetItor end = reverse_flow_spec_set.end (); + TAO_AV_FlowSpecSetItor start = reverse_flow_spec_set.begin (); + for (;start != end; ++start) + { + TAO_FlowSpec_Entry *entry = (*start); + ACE_Addr *address = entry->address (); + switch (direction) + { + case TAO_AV_Core::TAO_AV_ENDPOINT_B: + { + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: + entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER); + break; + case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: + entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER); + break; + } + break; + } + case TAO_AV_Core::TAO_AV_ENDPOINT_A: + { + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: + entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER); + break; + case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: + entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER); + break; + } + break; + } + default: break; + } + + if (address != 0) + { + if (this->get_acceptor (entry->flowname ())!= 0) + { + ACE_Addr *address = entry->address (); + TAO_FlowSpec_Entry *forward_entry = + this->get_flow_spec_entry (forward_flow_spec_set, + entry->flowname ()); + if (forward_entry != 0) + forward_entry->set_peer_addr (address); + } + else + connector_flow_set.insert (entry); + } + } + int result = -1; + switch (direction) + { + + case TAO_AV_Core::TAO_AV_ENDPOINT_A: + { + result = this->connector_registry_->open (endpoint, + this, + connector_flow_set); + } + break; + default: + break; + } + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR,"acceptor_registry::open"),-1); + return 0; +} + +TAO_FlowSpec_Entry * +TAO_AV_Core::get_flow_spec_entry (TAO_AV_FlowSpecSet &flow_spec_set, + const char *flowname) +{ + TAO_AV_FlowSpecSetItor end = flow_spec_set.end (); + TAO_AV_FlowSpecSetItor begin = flow_spec_set.begin (); + for (; + begin != end; + ++begin) + { + if (ACE_OS::strcmp ((*begin)->flowname (),flowname) == 0) + return (*begin); + } + return 0; +} + +TAO_AV_Acceptor* +TAO_AV_Core::get_acceptor (const char *flowname) +{ + + ACE_TRY_NEW_ENV + { + + TAO_AV_AcceptorSetItor acceptor = this->acceptor_registry_->begin (); + ACE_TRY_CHECK; + + TAO_AV_AcceptorSetItor end = + this->acceptor_registry_->end (); + + for (;acceptor != end; ++acceptor) + { + if (ACE_OS::strcmp ((*acceptor)->flowname (),flowname) == 0) + return *acceptor; + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_Core::get_acceptor"); + } + ACE_ENDTRY; + return 0; +} + +int +TAO_AV_Core::remove_acceptor (const char *flowname) +{ + + ACE_TRY_NEW_ENV + { + + TAO_AV_AcceptorSetItor acceptor = this->acceptor_registry_->begin (); + ACE_TRY_CHECK; + + TAO_AV_AcceptorSetItor end = + this->acceptor_registry_->end (); + + for (;acceptor != end; ++acceptor) + { + if (ACE_OS::strcmp ((*acceptor)->flowname (),flowname) == 0) + { + this->acceptor_registry_->close (*acceptor); + return 0; + } + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_Core::get_acceptor"); + } + ACE_ENDTRY; + return -1; +} + +TAO_AV_Connector* +TAO_AV_Core::get_connector (const char *flowname) +{ + TAO_AV_ConnectorSetItor connector = + this->connector_registry_->begin (); + TAO_AV_ConnectorSetItor end = + this->connector_registry_->end (); + + for (;connector != end; ++connector) + { + if (ACE_OS::strcmp ((*connector)->flowname (),flowname) == 0) + return *connector; + } + return 0; +} + +int +TAO_AV_Core::remove_connector (const char *flowname) +{ + TAO_AV_ConnectorSetItor connector = + this->connector_registry_->begin (); + TAO_AV_ConnectorSetItor end = + this->connector_registry_->end (); + + for (;connector != end; ++connector) + { + if (ACE_OS::strcmp ((*connector)->flowname (),flowname) == 0) + { + this->connector_registry_->close (*connector); + return 0; + } + } + return -1; +} + +TAO_AV_Flow_Protocol_Factory * +TAO_AV_Core::get_flow_protocol_factory(const char *flow_protocol) +{ + if (flow_protocol == 0) + return 0; + + for (TAO_AV_Flow_ProtocolFactorySetItor control_flow_factory = + this->flow_protocol_factories_.begin (); + control_flow_factory != + this->flow_protocol_factories_.end (); + ++control_flow_factory) + { + if ((*control_flow_factory)->factory ()->match_protocol (flow_protocol)) + { + return (*control_flow_factory)->factory (); + } + } + + // Not found. + return 0; +} + +TAO_AV_Transport_Factory * +TAO_AV_Core::get_transport_factory(const char *transport_protocol) +{ + if (transport_protocol == 0) + return 0; + + for (TAO_AV_TransportFactorySetItor transport_factory = + this->transport_factories_.begin (); + transport_factory != this->transport_factories_.end (); + ++transport_factory) + { + if ((*transport_factory)->factory ()->match_protocol (transport_protocol)) + { + return (*transport_factory)->factory (); + } + } + + // Not found. + return 0; +} + +int +TAO_AV_Core::load_default_transport_factories (void) +{ + const char *udp_factory_str = "UDP_Factory"; + const char *tcp_factory_str = "TCP_Factory"; + + TAO_AV_Transport_Factory *udp_factory = 0; + TAO_AV_Transport_Item *udp_item = 0; + + udp_factory = + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_factory_str); + if (udp_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "UDP Factory")); + + ACE_NEW_RETURN (udp_factory, + TAO_AV_UDP_Factory, + -1); + } + else udp_factory->ref_count = 1; + + ACE_NEW_RETURN (udp_item, TAO_AV_Transport_Item ("UDP_Factory"), -1); + udp_item->factory (udp_factory); + + this->transport_factories_.insert (udp_item); + + TAO_AV_Transport_Factory *tcp_factory = 0; + TAO_AV_Transport_Item *tcp_item = 0; + + tcp_factory = + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (tcp_factory_str); + if (tcp_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "TCP Factory")); + + ACE_NEW_RETURN (tcp_factory, + TAO_AV_TCP_Factory, + -1); + } + else tcp_factory->ref_count = 1; + + ACE_NEW_RETURN (tcp_item, TAO_AV_Transport_Item ("TCP_Factory"), -1); + tcp_item->factory (tcp_factory); + + this->transport_factories_.insert (tcp_item); + +#if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS) + const char *udp_qos_factory_str = "UDP_QoS_Factory"; + + TAO_AV_Transport_Factory *udp_qos_factory = 0; + TAO_AV_Transport_Item *udp_qos_item = 0; + + udp_qos_factory = + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_qos_factory_str); + if (udp_qos_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "UDP QoS Factory")); + + ACE_NEW_RETURN (udp_qos_factory, + TAO_AV_UDP_QoS_Factory, + -1); + } + else udp_qos_factory->ref_count = 1; + + ACE_NEW_RETURN (udp_qos_item, + TAO_AV_Transport_Item ("UDP_QoS_Factory"), + -1); + + udp_qos_item->factory (udp_qos_factory); + + this->transport_factories_.insert (udp_qos_item); +#endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */ + +#if defined ACE_HAS_SCTP + const char *sctp_seq_factory_str = "SCTP_SEQ_Factory"; + + TAO_AV_Transport_Factory *sctp_seq_factory = 0; + TAO_AV_Transport_Item *sctp_seq_item = 0; + + sctp_seq_factory = + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (sctp_seq_factory_str); + if (sctp_seq_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "SCTP SEQ Factory")); + + ACE_NEW_RETURN (sctp_seq_factory, + TAO_AV_SCTP_SEQ_Factory, + -1); + } + else sctp_seq_factory->ref_count = 1; + + ACE_NEW_RETURN (sctp_seq_item, + TAO_AV_Transport_Item ("SCTP_SEQ_Factory"), + -1); + + sctp_seq_item->factory (sctp_seq_factory); + + this->transport_factories_.insert (sctp_seq_item); +#endif /* ACE_HAS_SCTP */ + + return 0; +} + +int +TAO_AV_Core::init_transport_factories (void) +{ + TAO_AV_TransportFactorySetItor end = this->transport_factories_.end (); + TAO_AV_TransportFactorySetItor factory = this->transport_factories_.begin (); + + + if (factory == end) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Loading default transport protocols\n")); + this->load_default_transport_factories (); + } + else + { + for (; factory != end; factory++) + { + const ACE_CString &name = (*factory)->name (); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "%s \n", + name.c_str ())); + + (*factory)->factory ( + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (name.c_str ())); + if ((*factory)->factory () == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) Unable to load ") + ACE_TEXT ("protocol <%s>, %p\n"), + name.c_str (), ""), + -1); + } + (*factory)->factory ()->ref_count = 1; + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Loaded protocol <%s>\n"), + name.c_str ())); + } + } + } + + return 0; +} + +int +TAO_AV_Core::load_default_flow_protocol_factories (void) +{ + const char *udp_flow = "UDP_Flow_Factory"; + const char *tcp_flow = "TCP_Flow_Factory"; + const char *rtp_flow = "RTP_Flow_Factory"; + const char *rtcp_flow = "RTCP_Flow_Factory"; + const char *sfp_flow = "SFP_Flow_Factory"; + + TAO_AV_Flow_Protocol_Factory *udp_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *udp_item = 0; + + udp_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (udp_flow); + if (udp_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "UDP Flow Factory")); + + ACE_NEW_RETURN (udp_flow_factory, + TAO_AV_UDP_Flow_Factory, + -1); + } + else udp_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (udp_item, TAO_AV_Flow_Protocol_Item ("UDP_Flow_Factory"), -1); + udp_item->factory (udp_flow_factory); + + this->flow_protocol_factories_.insert (udp_item); + +#if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS) + + const char *udp_qos_flow = "UDP_QoS_Flow_Factory"; + TAO_AV_Flow_Protocol_Factory *udp_qos_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *udp_qos_flow_item = 0; + + udp_qos_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (udp_qos_flow); + if (udp_qos_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "UDP QoS Flow Factory")); + + ACE_NEW_RETURN (udp_qos_flow_factory, + TAO_AV_UDP_QoS_Flow_Factory, + -1); + } + else udp_qos_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (udp_qos_flow_item, TAO_AV_Flow_Protocol_Item ("UDP_QoS_Flow_Factory"), -1); + udp_qos_flow_item->factory (udp_qos_flow_factory); + + this->flow_protocol_factories_.insert (udp_qos_flow_item); + +#endif /* defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS) */ + +#if defined ACE_HAS_SCTP + + const char *sctp_seq_flow = "SCTP_SEQ_Flow_Factory"; + TAO_AV_Flow_Protocol_Factory *sctp_seq_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *sctp_seq_flow_item = 0; + + sctp_seq_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (sctp_seq_flow); + if (sctp_seq_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "SCTP SEQ Flow Factory")); + + ACE_NEW_RETURN (sctp_seq_flow_factory, + TAO_AV_SCTP_SEQ_Flow_Factory, + -1); + } + else sctp_seq_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (sctp_seq_flow_item, TAO_AV_Flow_Protocol_Item ("SCTP_SEQ_Flow_Factory"), -1); + sctp_seq_flow_item->factory (sctp_seq_flow_factory); + + this->flow_protocol_factories_.insert (sctp_seq_flow_item); + +#endif /* ACE_HAS_SCTP */ + + TAO_AV_Flow_Protocol_Factory *tcp_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *tcp_item = 0; + + tcp_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (tcp_flow); + if (tcp_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "TCP Flow Factory")); + + ACE_NEW_RETURN (tcp_flow_factory, + TAO_AV_TCP_Flow_Factory, + -1); + } + else tcp_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (tcp_item, TAO_AV_Flow_Protocol_Item ("TCP_Flow_Factory"), -1); + tcp_item->factory (tcp_flow_factory); + + this->flow_protocol_factories_.insert (tcp_item); + + TAO_AV_Flow_Protocol_Factory *rtp_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *rtp_item = 0; + + rtp_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (rtp_flow); + if (rtp_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "RTP Flow Factory")); + + ACE_NEW_RETURN (rtp_flow_factory, + TAO_AV_RTP_Flow_Factory, + -1); + } + else rtp_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (rtp_item, TAO_AV_Flow_Protocol_Item ("RTP_Flow_Factory"), -1); + rtp_item->factory (rtp_flow_factory); + + this->flow_protocol_factories_.insert (rtp_item); + + TAO_AV_Flow_Protocol_Factory *rtcp_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *rtcp_item = 0; + + rtcp_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (rtcp_flow); + if (rtcp_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "RTCP Flow Factory")); + + ACE_NEW_RETURN (rtcp_flow_factory, + TAO_AV_RTCP_Flow_Factory, + -1); + } + else rtcp_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (rtcp_item, TAO_AV_Flow_Protocol_Item ("RTCP_Flow_Factory"), -1); + rtcp_item->factory (rtcp_flow_factory); + + this->flow_protocol_factories_.insert (rtcp_item); + + TAO_AV_Flow_Protocol_Factory *sfp_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *sfp_item = 0; + + sfp_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (sfp_flow); + if (sfp_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "SFP Flow Factory")); + + ACE_NEW_RETURN (sfp_flow_factory, + TAO_AV_SFP_Factory, + -1); + } + else sfp_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (sfp_item, TAO_AV_Flow_Protocol_Item ("SFP_Flow_Factory"), -1); + sfp_item->factory (sfp_flow_factory); + + this->flow_protocol_factories_.insert (sfp_item); + + return 0; +} + +int +TAO_AV_Core::init_flow_protocol_factories (void) +{ + TAO_AV_Flow_ProtocolFactorySetItor end = this->flow_protocol_factories_.end (); + TAO_AV_Flow_ProtocolFactorySetItor factory = this->flow_protocol_factories_.begin (); + + if (factory == end) + { + ACE_DEBUG ((LM_DEBUG, + "Loading default flow protocol factories\n")); + + this->load_default_flow_protocol_factories (); + } + else + { + for (; factory != end; factory++) + { + const ACE_CString &name = (*factory)->name (); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "%s \n", + name.c_str ())); + + (*factory)->factory ( + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (name.c_str ())); + if ((*factory)->factory () == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) Unable to load ") + ACE_TEXT ("protocol <%s>, %p\n"), + name.c_str (), ""), + -1); + } + + (*factory)->factory ()->ref_count = 1; + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Loaded protocol <%s>\n"), + name.c_str ())); + } + } + } + + return 0; +} + +/* static */ +int +TAO_AV_Core::deactivate_servant (PortableServer::Servant servant) +{ + // Because of reference counting, the POA will automatically delete + // the servant when all pending requests on this servant are + // complete. + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + PortableServer::POA_var poa = servant->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::ObjectId_var id = poa->servant_to_id (servant + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + poa->deactivate_object (id.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "deactivate_servant"); + return -1; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (-1); + return 0; +} + +/* static */ +char * +TAO_AV_Core::get_flowname (const char *flow_spec_entry_str) +{ + ACE_CString flow_spec_entry (flow_spec_entry_str); + ACE_CString::size_type slash_pos = flow_spec_entry.find ('\\'); + ACE_CString flow_name; + if (slash_pos != flow_spec_entry.npos) + flow_name = flow_spec_entry.substring (0, slash_pos); + else + flow_name = flow_spec_entry_str; + return CORBA::string_dup (flow_name.c_str ()); +} + +ACE_CString +TAO_AV_Core::get_control_flowname(const char *flowname) +{ + ACE_CString control_flowname; + control_flowname = "c_"; + control_flowname = control_flowname + flowname; + + return flowname; +} + +#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION) +template ACE_Singleton<TAO_AV_Core, ACE_Null_Mutex> *ACE_Singleton<TAO_AV_Core, ACE_Null_Mutex>::singleton_; +#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */ + +TAO_END_VERSIONED_NAMESPACE_DECL |