diff options
author | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-20 21:22:17 +0000 |
---|---|---|
committer | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-20 21:22:17 +0000 |
commit | 3948829007575024346e699431999ba225a36066 (patch) | |
tree | aee62f7e7d6d86d6dc9cc0d16549a9fd21edcfaf | |
parent | 0a8ad5fdfe7bb383487aaffeb2a233d2f7839584 (diff) | |
download | ATCD-3948829007575024346e699431999ba225a36066.tar.gz |
ChangeLogTag: Fri Apr 20 16:13:40 2001 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i | 16 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp | 134 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp | 5 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp | 1125 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h | 247 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i | 33 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/Transport.cpp | 18 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/UDP.cpp | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Makefile.av | 5 |
9 files changed, 1555 insertions, 30 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i index 6476b9736ee..43e6b8db61f 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i +++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i @@ -4,6 +4,13 @@ // AVStreams_i.i ACE_INLINE +TAO_AV_QoS & +TAO_Base_StreamEndPoint::qos (void) +{ + return this->qos_; +} + +ACE_INLINE int TAO_AV_QoS::set (AVStreams::streamQoS &stream_qos) { @@ -22,10 +29,15 @@ TAO_AV_QoS::set (AVStreams::streamQoS &stream_qos) ACE_INLINE int -TAO_AV_QoS::get_flow_qos (const char *flowname,AVStreams::QoS &flow_qos) +TAO_AV_QoS::get_flow_qos (const char *flowname, + AVStreams::QoS &flow_qos) { - int result = this->qos_map_.find (flowname, flow_qos); + int result = this->qos_map_.find (flowname, + flow_qos); + if (result < 0) ACE_ERROR_RETURN ((LM_DEBUG,"qos_map::find failed\n"),-1); return 0; } + + diff --git a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp index cf3a2395a6a..cbe489743fe 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp @@ -10,6 +10,10 @@ #include "orbsvcs/AV/RTCP.h" #include "orbsvcs/AV/sfp.h" +#ifdef ACE_HAS_RAPI +#include "orbsvcs/AV/QoS_UDP.h" +#endif /*ACE_HAS_RAPI*/ + #include "tao/debug.h" #include "tao/ORB_Core.h" @@ -134,7 +138,10 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, TAO_AV_Core::EndPoint direction, AVStreams::flowSpec &flow_spec) { - if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows\n")); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_Core::init_forward_flows\n")); + TAO_AV_FlowSpecSet address_flow_set; TAO_AV_FlowSpecSet flow_set; TAO_AV_FlowSpecSetItor end = flow_spec_set.end (); @@ -149,12 +156,17 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, switch (entry->direction ()) { case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: - entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER); - break; + { + entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER); + break; + } case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: - entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER); - break; + { + entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER); + break; + } } + break; } case TAO_AV_Core::TAO_AV_ENDPOINT_A: { @@ -175,7 +187,11 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, ACE_Addr *address = entry->address (); if (address != 0) { - if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"address given for flow %s",entry->flowname ())); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "address given for flow %s", + entry->flowname ())); + address_flow_set.insert (entry); } else @@ -193,7 +209,9 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, this, address_flow_set); if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_forward_flows::acceptor_registry::open failed\n"),-1); + ACE_ERROR_RETURN ((LM_ERROR, + "TAO_AV_Core::init_forward_flows::acceptor_registry::open failed\n"), + -1); TAO_AV_FlowSpecSetItor end = address_flow_set.end (); for (TAO_AV_FlowSpecSetItor start = address_flow_set.begin (); start != end; ++start) @@ -205,12 +223,18 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, { if (entry->handler () != 0) { + //Yamuna:PLEASE CHECK THIS LATER +#ifndef ACE_HAS_RAPI // For IN flows on the A side we should remove the handlers from the reactor. - ACE_Event_Handler *event_handler = entry->handler ()->event_handler (); - result = event_handler->reactor ()->remove_handler (event_handler, - ACE_Event_Handler::READ_MASK); - if (result < 0) - if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n")); + ACE_Event_Handler *event_handler = entry->handler ()->event_handler (); + result = event_handler->reactor ()->remove_handler (event_handler, + ACE_Event_Handler::READ_MASK); + if (result < 0) + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_Core::init_forward_flows: remove_handler failed\n")); +#endif /*ACE_HAS_RAPI*/ + } } default: @@ -222,9 +246,9 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, { // entry doesn't exist so add it. flow_spec_set.insert (entry); -// size_t len = flow_spec.length (); -// flow_spec.length (len+1); -// flow_spec [len] = entry->entry_to_string (); + // size_t len = flow_spec.length (); + // flow_spec.length (len+1); + // flow_spec [len] = entry->entry_to_string (); } } } @@ -233,6 +257,9 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, { if (address_flow_set.size () > 0) { + ACE_DEBUG ((LM_DEBUG, + "This connector registry is called ONE\n")); + result = this->connector_registry_->open (endpoint, this, address_flow_set); @@ -444,11 +471,14 @@ TAO_AV_Core::init_reverse_flows (TAO_Base_StreamEndPoint *endpoint, int result = -1; switch (direction) { + case TAO_AV_Core::TAO_AV_ENDPOINT_A: - result = this->connector_registry_->open (endpoint, - this, - connector_flow_set); - break; + { + result = this->connector_registry_->open (endpoint, + this, + connector_flow_set); + } + break; default: break; } @@ -522,8 +552,8 @@ TAO_AV_Core::init_transport_factories (void) TAO_AV_TransportFactorySetItor end = this->transport_factories_.end (); TAO_AV_TransportFactorySetItor factory = this->transport_factories_.begin (); - const char* foo = "UDP_Factory"; - const char * bar = "TCP_Factory"; + const char *udp_factory_str = "UDP_Factory"; + const char *tcp_factory_str = "TCP_Factory"; if (factory == end) { @@ -531,7 +561,7 @@ TAO_AV_Core::init_transport_factories (void) TAO_AV_Transport_Item *udp_item = 0; udp_factory = - ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (foo); + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_factory_str); if (udp_factory == 0) { if (TAO_debug_level) @@ -554,7 +584,7 @@ TAO_AV_Core::init_transport_factories (void) TAO_AV_Transport_Item *tcp_item = 0; tcp_factory = - ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (bar); + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (tcp_factory_str); if (tcp_factory == 0) { if (TAO_debug_level) @@ -573,6 +603,35 @@ TAO_AV_Core::init_transport_factories (void) this->transport_factories_.insert (tcp_item); +#ifdef ACE_HAS_RAPI + const char *udp_qos_factory_str = "UDP_QoS_Factory"; + + TAO_AV_Transport_Factory *udp_qos_factory = 0; + TAO_AV_Transport_Item *udp_qos_item = 0; + + udp_qos_factory = + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_qos_factory_str); + if (udp_qos_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "UDP QoS Factory")); + + ACE_NEW_RETURN (udp_qos_factory, + TAO_AV_UDP_QoS_Factory, + -1); + } + + ACE_NEW_RETURN (udp_qos_item, + TAO_AV_Transport_Item ("UDP_QoS_Factory"), + -1); + + udp_qos_item->factory (udp_qos_factory); + + this->transport_factories_.insert (udp_qos_item); +#endif /*ACE_HAS_RAPI*/ } return 0; @@ -589,6 +648,7 @@ TAO_AV_Core::init_flow_protocol_factories (void) const char *rtp_flow = "RTP_Flow_Factory"; const char *rtcp_flow = "RTCP_Flow_Factory"; const char *sfp_flow = "SFP_Flow_Factory"; + if (factory == end) { TAO_AV_Flow_Protocol_Factory *udp_flow_factory = 0; @@ -614,6 +674,34 @@ TAO_AV_Core::init_flow_protocol_factories (void) this->flow_protocol_factories_.insert (udp_item); +#ifdef ACE_HAS_RAPI + + const char *udp_qos_flow = "UDP_QoS_Flow_Factory"; + TAO_AV_Flow_Protocol_Factory *udp_qos_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *udp_qos_flow_item = 0; + + udp_qos_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (udp_qos_flow); + if (udp_qos_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "UDP QoS Flow Factory")); + + ACE_NEW_RETURN (udp_qos_flow_factory, + TAO_AV_UDP_QoS_Flow_Factory, + -1); + } + + ACE_NEW_RETURN (udp_qos_flow_item, TAO_AV_Flow_Protocol_Item ("UDP_QoS_Flow_Factory"), -1); + udp_qos_flow_item->factory (udp_qos_flow_factory); + + this->flow_protocol_factories_.insert (udp_qos_flow_item); + +#endif /*ACE_HAS_RAPI*/ + TAO_AV_Flow_Protocol_Factory *tcp_flow_factory = 0; TAO_AV_Flow_Protocol_Item *tcp_item = 0; diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp index ad8f3577f43..09374995947 100644 --- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp @@ -110,6 +110,8 @@ TAO_FlowSpec_Entry::set_protocol (void) this->protocol_ = TAO_AV_Core::TAO_AV_TCP; else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"UDP") == 0) this->protocol_ = TAO_AV_Core::TAO_AV_UDP; + else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"QoS_UDP") == 0) + this->protocol_ = TAO_AV_Core::TAO_AV_QOS_UDP; else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"AAL5") == 0) this->protocol_ = TAO_AV_Core::TAO_AV_AAL5; else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"AAL3_4") == 0) @@ -204,6 +206,7 @@ TAO_FlowSpec_Entry::parse_address (const char *address) case TAO_AV_Core::TAO_AV_RTP_UDP: case TAO_AV_Core::TAO_AV_TCP: case TAO_AV_Core::TAO_AV_UDP: + case TAO_AV_Core::TAO_AV_QOS_UDP: { this->address_str_ = addr; ACE_INET_Addr *inet_addr; @@ -371,6 +374,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) case TAO_AV_Core::TAO_AV_RTP_UDP: case TAO_AV_Core::TAO_AV_RTP_UDP_MCAST: case TAO_AV_Core::TAO_AV_UDP: + case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: { @@ -506,6 +510,7 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void) { case TAO_AV_Core::TAO_AV_RTP_UDP: case TAO_AV_Core::TAO_AV_UDP: + case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: case TAO_AV_Core::TAO_AV_SFP_UDP: diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp new file mode 100644 index 00000000000..73eccfcd240 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp @@ -0,0 +1,1125 @@ +// $Id$ + +#include "QoS_UDP.h" +#include "UDP.h" +#include "orbsvcs/AV/AVStreams_i.h" +#include "orbsvcs/AV/MCast.h" +//#include "orbsvcs/AV/QoS_MCast.h" +#include "orbsvcs/AV/Fill_ACE_QoS.h" + +#if !defined (__ACE_INLINE__) +#include "orbsvcs/AV/QoS_UDP.i" +#endif /* __ACE_INLINE__ */ + +//------------------------------------------------------------ +// TAO_AV_UDP_Flow_Handler +//------------------------------------------------------------ + + +int +FillQoSParams (ACE_QoS_Params &qos_params, + iovec* iov, + ACE_QoS* qos) +{ + qos_params.callee_data (iov); + qos_params.caller_data (0); + qos_params.socket_qos (qos); + qos_params.group_socket_qos (0); + qos_params.flags (ACE_JL_BOTH); + + return 0; +} + + +TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void) +{ + ACE_NEW (this->transport_, + TAO_AV_UDP_QoS_Transport (this)); +} + +TAO_AV_UDP_QoS_Flow_Handler::~TAO_AV_UDP_QoS_Flow_Handler (void) +{ + delete this->transport_; +} + +TAO_AV_Transport * +TAO_AV_UDP_QoS_Flow_Handler::transport (void) +{ + return this->transport_; +} + +int +TAO_AV_UDP_QoS_Flow_Handler::handle_input (ACE_HANDLE /*fd*/) +{ + this->protocol_object_->handle_input (); + return 0; +} + +int +TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n")); + + if (this->qos_session_->update_qos () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in updating QoS\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + " Updating QOS succeeds.\n")); + + return 0; +} + +int +TAO_AV_UDP_QoS_Flow_Handler::handle_timeout (const ACE_Time_Value &tv, + const void *arg) +{ + return TAO_AV_Flow_Handler::handle_timeout (tv,arg); +} + +int +TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Flow_Handler::set_remote_address\n")); + + ACE_INET_Addr *inet_addr = + ACE_dynamic_cast (ACE_INET_Addr*,address); + + this->peer_addr_ = *inet_addr; + + TAO_AV_UDP_QoS_Transport *transport = + ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_); + + return transport->set_remote_address (*inet_addr); +} + + +ACE_HANDLE +TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n", + this->qos_sock_dgram_.get_handle ())); + + return this->qos_sock_dgram_.get_handle () ; +} + +//------------------------------------------------------------ +// TAO_AV_UDP_Transport +//------------------------------------------------------------ + +TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (void) + :handler_ (0) +{ +} + +TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler) + :handler_ (handler), + addr_ (0) +{ +} + +TAO_AV_UDP_QoS_Transport::~TAO_AV_UDP_QoS_Transport (void) +{ +} + +int +TAO_AV_UDP_QoS_Transport::set_remote_address (const ACE_INET_Addr &address) +{ + this->peer_addr_ = address; + return 0; +} + +int +TAO_AV_UDP_QoS_Transport::open (ACE_Addr * /*address*/) +{ + return 0; +} + +int +TAO_AV_UDP_QoS_Transport::close (void) +{ + return 0; +} + +int +TAO_AV_UDP_QoS_Transport::mtu (void) +{ + return ACE_MAX_DGRAM_SIZE; +} + +ACE_Addr* +TAO_AV_UDP_QoS_Transport::get_peer_addr (void) +{ + return &this->peer_addr_; +} + +ssize_t +TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk, + ACE_Time_Value *) +{ + // For the most part this was copied from GIOP::send_request and + // friends. + + iovec iov[IOV_MAX]; + int iovcnt = 0; + ssize_t n = 0; + ssize_t nbytes = 0; + + for (const ACE_Message_Block *i = mblk; + i != 0; + i = i->cont ()) + { + // Make sure there is something to send! + if (i->length () > 0) + { + iov[iovcnt].iov_base = i->rd_ptr (); + iov[iovcnt].iov_len = i->length (); + iovcnt++; + + // The buffer is full make a OS call. @@ TODO this should + // be optimized on a per-platform basis, for instance, some + // platforms do not implement writev() there we should copy + // the data into a buffer and call send_n(). In other cases + // there may be some limits on the size of the iovec, there + // we should set IOV_MAX to that limit. + + size_t bytes_sent = 0; + + if (iovcnt == IOV_MAX) + { + if (this->handler_->get_socket ()->send (iov, + 1, + bytes_sent, + 0, + this->handler_->qos_session ()->dest_addr (), + 0, + 0) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in dgram_mcast.send ()\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); + + if (n < 1) + return n; + + nbytes += bytes_sent; + iovcnt = 0; + } + } + } + + size_t bytes_sent = 0; + + // Check for remaining buffers to be sent! + if (iovcnt != 0) + { + if (this->handler_->get_socket ()->send (iov, + 1, + bytes_sent, + 0, + this->handler_->qos_session ()->dest_addr (), + 0, + 0) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in dgram_mcast.send ()\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); + + if (n < 1) + return n; + + nbytes += bytes_sent; + } + + return nbytes; +} + +ssize_t +TAO_AV_UDP_QoS_Transport::send (const char *buf, + size_t len, + ACE_Time_Value *) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Transport::send ")); + + char addr [BUFSIZ]; + this->peer_addr_.addr_to_string (addr,BUFSIZ); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "to %s\n", + addr)); + + return this->handler_->get_socket ()->send (buf, + len, + this->handler_->qos_session ()->dest_addr (), + 0, + 0, + 0); +} + +ssize_t +TAO_AV_UDP_QoS_Transport::send (const iovec *iov, + int /*iovcnt*/, + ACE_Time_Value *) +{ + size_t bytes_sent = 0; + if (this->handler_->get_socket ()->send (iov, + 1, + bytes_sent, + 0, + this->handler_->qos_session ()->dest_addr (), + 0, + 0) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in dgram_mcast.send ()\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); + + return bytes_sent; +} + +ssize_t +TAO_AV_UDP_QoS_Transport::recv (char *buf, + size_t len, + ACE_Time_Value *) +{ + return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_); +} + +ssize_t +TAO_AV_UDP_QoS_Transport::recv (char *buf, + size_t len, + int flags, + ACE_Time_Value *timeout) +{ + return this->handler_->get_socket ()->recv (buf, + len, + this->peer_addr_, + flags, + timeout); +} + +ssize_t +TAO_AV_UDP_QoS_Transport::recv (iovec *iov, + int /*iovcnt*/, + ACE_Time_Value *timeout) +{ + return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout); +} + + +//------------------------------------------------------------ +// TAO_AV_UDP_Acceptor +//------------------------------------------------------------ + +TAO_AV_UDP_QoS_Acceptor::TAO_AV_UDP_QoS_Acceptor (void) +{ +} + +TAO_AV_UDP_QoS_Acceptor::~TAO_AV_UDP_QoS_Acceptor (void) +{ +} + +int +TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler) +{ + int result = 0; + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Acceptor Svc Handler QOS ENABLED \n")); + + ACE_QoS_Decorator* qos_decorator; + + // Decorate the above handler with QoS functionality. + ACE_NEW_RETURN (qos_decorator, + ACE_QoS_Decorator (handler, + handler->qos_session (), + this->av_core_->reactor ()), + -1); + + // Initialize the Decorator. + if (qos_decorator->init () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "QoS Decorator init () failed.\n"), + -1); + + // Register the decorated Event Handler with the Reactor. + result = this->av_core_->reactor ()->register_handler (qos_decorator, + ACE_Event_Handler::QOS_MASK | + ACE_Event_Handler::READ_MASK); + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in registering the Decorator with the Reactor\n"), + -1); + + return result; +} + +int +TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Acceptor::open ")); + + this->av_core_ = av_core; + this->endpoint_ = endpoint; + this->entry_ = entry; + + + this->flow_protocol_factory_ = factory; + this->flowname_ = entry->flowname (); + ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) entry->address (); +// inet_addr->set (inet_addr->get_port_number (), +// inet_addr->get_host_name ()); + char buf[BUFSIZ]; + inet_addr->addr_to_string (buf, + BUFSIZ); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Acceptor::open: %s", + buf)); + + int result = this->open_i (inet_addr); + + if (result < 0) + return result; + + return 0; +} + +int +TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory) +{ + this->av_core_ = av_core; + this->endpoint_ = endpoint; + this->entry_ = entry; + this->flow_protocol_factory_ = factory; + this->flowname_ = entry->flowname (); + ACE_INET_Addr *address; + ACE_NEW_RETURN (address, + ACE_INET_Addr ("0"), + -1); + int result = this->open_i (address); + if (result < 0) + return result; + return 0; +} + +int +TAO_AV_UDP_QoS_Acceptor::translate (CosPropertyService::Properties &qos_params, + ACE_Flow_Spec *ace_flow_spec) +{ + for (unsigned int i = 0; + i < qos_params.length (); + i++) + { + if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0) + { + CORBA::Short type; + qos_params [i].property_value >>= type; + ace_flow_spec->service_type (type); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0) + { + CORBA::ULong tok_rate; + qos_params [i].property_value >>= tok_rate; + ace_flow_spec->token_rate (tok_rate); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Size") == 0) + { + CORBA::ULong tok_buck_size; + qos_params [i].property_value >>= tok_buck_size; + ace_flow_spec->token_bucket_size (tok_buck_size); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0) + { + CORBA::ULong peak_bw; + qos_params [i].property_value >>= peak_bw; + ace_flow_spec->peak_bandwidth (peak_bw); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0) + { + CORBA::ULong lat; + qos_params [i].property_value >>= lat; + ace_flow_spec->latency (lat); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0) + { + CORBA::ULong delay_var; + qos_params [i].property_value >>= delay_var; + ace_flow_spec->delay_variation (delay_var); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Max_SDU_Size") == 0) + { + CORBA::ULong max_sdu; + qos_params [i].property_value >>= max_sdu; + ace_flow_spec->delay_variation (max_sdu); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Minimum_Policed_Size") == 0) + { + CORBA::ULong min_pol_size; + qos_params [i].property_value >>= min_pol_size; + ace_flow_spec->delay_variation (min_pol_size); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "TTL") == 0) + { + CORBA::ULong ttl; + qos_params [i].property_value >>= ttl; + ace_flow_spec->delay_variation (ttl); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Priority") == 0) + { + CORBA::ULong priority; + qos_params [i].property_value >>= priority; + ace_flow_spec->delay_variation (priority); + } + } + + return 0; +} + +int +TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr) +{ + int result = -1; + // TAO_AV_Callback *callback = 0; + // this->endpoint_->get_callback (this->flowname_.c_str (), + // callback); + ACE_INET_Addr *local_addr; + + AVStreams::QoS qos; + this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), + qos); + + ACE_Flow_Spec *ace_flow_spec; + ACE_NEW_RETURN (ace_flow_spec, + ACE_Flow_Spec, + -1); + + this->translate (qos.QoSParams, + ace_flow_spec); + + ACE_QoS* ace_qos; + + ACE_NEW_RETURN (ace_qos, + ACE_QoS, + -1); + + Fill_ACE_QoS fill_ace_qos; + + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, + ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex sender qos\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Sender QoS parameters\n")); + } + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, + ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex receiver qos\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Receiver QoS parameters\n")); + + } + + ACE_QoS_Params qos_params; + FillQoSParams (qos_params, + 0, + ace_qos); + + // Create a QoS Session Factory. + ACE_QoS_Session_Factory session_factory; + + // Ask the factory to create a QoS session. This could be RAPI or + // GQoS based on the parameter passed. + + //@@YAmuna : Later make this generic for GQoS + this->qos_session_ = + session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); + + // Create a destination address for the QoS session. The same + // address should be used for the subscribe call later. A copy + // is made below only to distinguish the two usages of the dest + // address. + ACE_INET_Addr dest_addr (*inet_addr); + + // A QoS session is defined by the 3-tuple [DestAddr, DestPort, + // Protocol]. Initialize the QoS session. + if (this->qos_session_->open (*inet_addr, + IPPROTO_UDP) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in opening the QoS session\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "QoS session opened successfully\n")); + + TAO_AV_UDP_QoS_Flow_Handler* handler; + ACE_NEW_RETURN (handler, + TAO_AV_UDP_QoS_Flow_Handler, + -1); + + TAO_AV_Flow_Handler *flow_handler = 0; + flow_handler = handler; + + result = handler->get_socket ()->subscribe (*inet_addr, + qos_params, + 1, + 0, + AF_INET, + // ACE_FROM_PROTOCOL_INFO, + 0, + 0, // ACE_Protocol_Info, + 0, + ACE_OVERLAPPED_SOCKET_FLAG + | ACE_FLAG_MULTIPOINT_C_LEAF + | ACE_FLAG_MULTIPOINT_D_LEAF, + this->qos_session_); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1); + else ACE_DEBUG ((LM_DEBUG, + "Subscribe succeeded\n")); + + handler->qos_session (this->qos_session_); + + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr (*inet_addr), + -1); + + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + // This is a sender + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER); + } + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + // This is a receiver + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); + } + + this->qos_manager_ + = handler->get_socket ()->qos_manager (); + + // Set the QoS for the session. Replaces the ioctl () call that + // was being made previously. + if (this->qos_session_->qos (handler->get_socket (), + &this->qos_manager_, + *ace_qos) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to set QoS\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Setting QOS succeeds.\n")); + + // ACE_Time_Value era (10); + //this->adaptor ().flowspec_entry (this->entry_); + //this->adaptor ().endpoint (this->endpoint_); +// TAO_AV_CORE::instance ()->monitor ().register_session (this->qos_session_, + // adaptor, + // (void *) TAO_AV_CORE::instance ()->get_adaptor ((char*)this->flowname ()), + // era, + // QoS_Monitor::PEAK_BANDWIDTH); + + //ll_ace_qos.map ().unbind (g_711); + + TAO_AV_Protocol_Object *object = + this->flow_protocol_factory_->make_protocol_object (this->entry_, + this->endpoint_, + flow_handler, + flow_handler->transport ()); + flow_handler->protocol_object (object); + // callback->protocol_object (object); +// this->endpoint_->set_protocol_object (this->flowname_.c_str (), +// object); + this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler); + this->entry_->protocol_object (object); + + char buf[BUFSIZ]; + local_addr->addr_to_string (buf,BUFSIZ); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_ACCEPTOR::open:%s \n", + buf)); + + this->entry_->set_local_addr (local_addr); + this->entry_->handler (flow_handler); + + // call activate svc handler. + return this->activate_svc_handler (handler); +} + +int +TAO_AV_UDP_QoS_Acceptor::close (void) +{ + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_UDP_Connector +//------------------------------------------------------------ +TAO_AV_UDP_QoS_Connector::TAO_AV_UDP_QoS_Connector (void) +{ +} + +TAO_AV_UDP_QoS_Connector::~TAO_AV_UDP_QoS_Connector (void) +{ +} + +int +TAO_AV_UDP_QoS_Connector::open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_AV_Flow_Protocol_Factory *factory) + +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Connector::open ")); + + this->endpoint_ = endpoint; + this->av_core_ = av_core; + this->flow_protocol_factory_ = factory; + return 0; +} + +int +TAO_AV_UDP_QoS_Connector::translate (CosPropertyService::Properties &qos_params, + ACE_Flow_Spec *ace_flow_spec) +{ + for (unsigned int i = 0; + i < qos_params.length (); + i++) + { + if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0) + { + CORBA::Short type; + qos_params [i].property_value >>= type; + ace_flow_spec->service_type (type); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0) + { + CORBA::ULong tok_rate; + qos_params [i].property_value >>= tok_rate; + ace_flow_spec->token_rate (tok_rate); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Rate") == 0) + { + CORBA::ULong tok_buck_size; + qos_params [i].property_value >>= tok_buck_size; + ace_flow_spec->token_bucket_size (tok_buck_size); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0) + { + CORBA::ULong peak_bw; + qos_params [i].property_value >>= peak_bw; + ace_flow_spec->peak_bandwidth (peak_bw); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0) + { + CORBA::ULong lat; + qos_params [i].property_value >>= lat; + ace_flow_spec->latency (lat); + } + else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0) + { + CORBA::ULong delay_var; + qos_params [i].property_value >>= delay_var; + ace_flow_spec->delay_variation (delay_var); + } + + } + + return 0; +} + + +int +TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry, + TAO_AV_Transport *&transport) +{ + int result = -1; + this->entry_ = entry; + this->flowname_ = entry->flowname (); + +// ACE_INET_Addr *local_addr; +// ACE_NEW_RETURN (local_addr, +// ACE_INET_Addr ("0"), +// -1); + + Fill_ACE_QoS fill_ace_qos; + + AVStreams::QoS qos; + this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), + qos); + ACE_Flow_Spec *ace_flow_spec; + ACE_NEW_RETURN (ace_flow_spec, + ACE_Flow_Spec, + -1); + + this->translate (qos.QoSParams, + ace_flow_spec); + + ACE_QoS* ace_qos; + + ACE_NEW_RETURN (ace_qos, + ACE_QoS, + -1); + + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, + ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex receiver qos\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Receiver QoS parameters\n")); + } + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, + ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex sender qos\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Sender QoS parameters\n")); + } + + ACE_QoS_Params qos_params; + FillQoSParams (qos_params, + 0, + ace_qos); + + ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*, + entry->address ()); + TAO_AV_Flow_Handler *flow_handler = 0; + + // Create a QoS Session Factory. + ACE_QoS_Session_Factory session_factory; + + // Ask the factory to create a QoS session. This could be RAPI or + // GQoS based on the parameter passed. + + //@@YAmuna : Later make this generic for GQoS + this->qos_session_ = + session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); + + + // Create a destination address for the QoS session. The same + // address should be used for the subscribe call later. A copy + // is made below only to distinguish the two usages of the dest + // address. + ACE_INET_Addr dest_addr (*inet_addr); + + // A QoS session is defined by the 3-tuple [DestAddr, DestPort, + // Protocol]. Initialize the QoS session. + if (this->qos_session_->open (*inet_addr, + IPPROTO_UDP) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in opening the QoS session\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "QoS session opened successfully\n")); + + TAO_AV_UDP_QoS_Flow_Handler *handler; + ACE_NEW_RETURN (handler, + TAO_AV_UDP_QoS_Flow_Handler, + -1); + + handler->qos_session (this->qos_session_); + + flow_handler = handler; + + + + + result = handler->get_socket ()->subscribe (*inet_addr, + qos_params, + 1, + 0, + AF_INET, + // ACE_FROM_PROTOCOL_INFO, + 0, + 0, // ACE_Protocol_Info, + 0, + ACE_OVERLAPPED_SOCKET_FLAG + | ACE_FLAG_MULTIPOINT_C_LEAF + | ACE_FLAG_MULTIPOINT_D_LEAF, + this->qos_session_); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1); + // Now disable Multicast loopback. + // @@Should we make this a policy? + + ACE_INET_Addr *local_addr; + + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr (*inet_addr), + -1); + + + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + // This is a sender + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER); + } + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + // This is a receiver + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); + } + + this->qos_manager_ = + handler->get_socket ()->qos_manager (); + + // Set the QoS for the session. Replaces the ioctl () call that + // was being made previously. + if (this->qos_session_->qos (handler->get_socket (), + &this->qos_manager_, + *ace_qos) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to set QoS\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Setting QOS succeeds.\n")); + + + TAO_AV_Protocol_Object *object = + this->flow_protocol_factory_->make_protocol_object (this->entry_, + this->endpoint_, + flow_handler, + flow_handler->transport ()); + flow_handler->protocol_object (object); + // callback->protocol_object (object); + // this->endpoint_->set_protocol_object (this->flowname_.c_str (), + // object); + this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler); + this->entry_->protocol_object (object); + + char buf[BUFSIZ]; + local_addr->addr_to_string (buf,BUFSIZ); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_CONNECTOR::connect:%s \n", + buf)); + + entry->set_local_addr (local_addr); + entry->handler (flow_handler); + transport = flow_handler->transport (); + // call activate svc handler. + return this->activate_svc_handler (handler); +} + +int +TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler) +{ + int result = 0; + ACE_QoS_Decorator* qos_decorator; + + // Decorate the above handler with QoS functionality. + ACE_NEW_RETURN (qos_decorator, + ACE_QoS_Decorator (handler, + handler->qos_session (), + this->av_core_->reactor ()), + -1); + + // Initialize the Decorator. + if (qos_decorator->init () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "QoS Decorator init () failed.\n"), + -1); + + // Register the decorated Event Handler with the Reactor. + result = this->av_core_->reactor ()->register_handler (qos_decorator, + ACE_Event_Handler::QOS_MASK | + ACE_Event_Handler::READ_MASK); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in registering the Decorator with the Reactor\n"), + -1); + + return result; +} + +int +TAO_AV_UDP_QoS_Connector::close (void) +{ + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_UDP_Protocol_Factory +//------------------------------------------------------------ + +TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory (void) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory\n")); +} + +TAO_AV_UDP_QoS_Factory::~TAO_AV_UDP_QoS_Factory (void) +{ +} + +int +TAO_AV_UDP_QoS_Factory::match_protocol (const char *protocol_string) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Factory::match_protocol\n")); + + if (ACE_OS::strcasecmp (protocol_string,"QoS_UDP") == 0) + return 1; + return 0; +} + +TAO_AV_Acceptor* +TAO_AV_UDP_QoS_Factory::make_acceptor (void) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Factory::make_acceptor ")); + + TAO_AV_Acceptor *acceptor = 0; + ACE_NEW_RETURN (acceptor, + TAO_AV_UDP_QoS_Acceptor, + 0); + return acceptor; +} + +TAO_AV_Connector* +TAO_AV_UDP_QoS_Factory::make_connector (void) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Factory::make_connector ")); + + TAO_AV_Connector *connector = 0; + ACE_NEW_RETURN (connector, + TAO_AV_UDP_QoS_Connector, + 0); + return connector; +} + +int +TAO_AV_UDP_QoS_Factory::init (int /* argc */, + char * /* argv */ []) +{ + return 0; +} + + +//------------------------------------------------------------ +// TAO_AV_UDP_Flow_Factory +//------------------------------------------------------------ +TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory (void) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory\n")); +} + +TAO_AV_UDP_QoS_Flow_Factory::~TAO_AV_UDP_QoS_Flow_Factory (void) +{ +} + +int +TAO_AV_UDP_QoS_Flow_Factory::init (int /* argc */, + char * /* argv */ []) +{ + return 0; +} + +int +TAO_AV_UDP_QoS_Flow_Factory::match_protocol (const char *flow_string) +{ + if (ACE_OS::strcasecmp (flow_string,"QoS_UDP") == 0) + return 1; + return 0; +} + +TAO_AV_Protocol_Object* +TAO_AV_UDP_QoS_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry, + TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Flow_Handler *handler, + TAO_AV_Transport *transport) +{ + TAO_AV_Callback *callback = 0; + endpoint->get_callback (entry->flowname (), + callback); + + + TAO_AV_UDP_Object *object = 0; + ACE_NEW_RETURN (object, + TAO_AV_UDP_Object (callback, + transport), + 0); + callback->open (object, + handler); + endpoint->set_protocol_object (entry->flowname (), + object); + return object; +} + +ACE_FACTORY_DEFINE (AV, TAO_AV_UDP_QoS_Flow_Factory) +ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Flow_Factory, + ACE_TEXT ("UDP_QoS_Flow_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_AV_UDP_QoS_Flow_Factory), + ACE_Service_Type::DELETE_THIS | + ACE_Service_Type::DELETE_OBJ, + 0) + +ACE_FACTORY_DEFINE (AV, TAO_AV_UDP_QoS_Factory) + +ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Factory, + ACE_TEXT ("UDP_QoS_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_AV_UDP_QoS_Factory), + ACE_Service_Type::DELETE_THIS | + ACE_Service_Type::DELETE_OBJ, + 0) + + diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h new file mode 100644 index 00000000000..eb5ae43c455 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h @@ -0,0 +1,247 @@ +/* -*- C++ -*- */ + +// $Id$ +// ============================================================================ +// +// = LIBRARY +// ORBSVCS AVStreams +// +// = FILENAME +// UDP.h +// +// = AUTHOR +// Yamuna Krishnamurthy <yamuna@cs.wustl.edu> +// +// +// ============================================================================ + +#ifndef TAO_AV_QOS_UDP_H +#define TAO_AV_QOS_UDP_H +#include "ace/pre.h" + + +#include "ace/OS.h" +#include "ace/QoS/QoS_Session_Factory.h" +#include "ace/QoS/QoS_Decorator.h" +#include "ace/QoS/SOCK_Dgram_Mcast_QoS.h" + +#include "orbsvcs/AV/Protocol_Factory.h" + + +class TAO_AV_Export TAO_AV_UDP_QoS_Factory : public TAO_AV_Transport_Factory +{ +public: + TAO_AV_UDP_QoS_Factory (void); + virtual ~TAO_AV_UDP_QoS_Factory (void); + virtual int init (int argc, char *argv[]); + // Initialization hook. + virtual int match_protocol (const char *protocol_string); + virtual TAO_AV_Acceptor *make_acceptor (void); + virtual TAO_AV_Connector *make_connector (void); +}; + +class TAO_AV_UDP_QoS_Flow_Handler; + +class TAO_AV_UDP_QoS_Transport + :public TAO_AV_Transport +{ + // = TITLE + // A transport abstraction for udp sockets. + // + // = DESCRIPTION + // Uses the ACE_SOCK_Dgram to send the data. +public: + TAO_AV_UDP_QoS_Transport (void); + + TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler); + + virtual ~TAO_AV_UDP_QoS_Transport (void); + + virtual int open (ACE_Addr *addr); + + virtual int close (void); + + virtual int mtu (void); + + virtual ACE_Addr *get_peer_addr (void); + + virtual int set_remote_address (const ACE_INET_Addr &address); + + virtual ssize_t send (const ACE_Message_Block *mblk, + ACE_Time_Value *s = 0); + // Write the complete Message_Block chain to the connection. + + virtual ssize_t send (const char *buf, + size_t len, + ACE_Time_Value *s = 0); + // Write the contents of the buffer of length len to the connection. + + virtual ssize_t send (const iovec *iov, + int iovcnt, + ACE_Time_Value *s = 0); + // Write the contents of iovcnt iovec's to the connection. + + virtual ssize_t recv (char *buf, + size_t len, + ACE_Time_Value *s = 0); + // Read len bytes from into buf. + + virtual ssize_t recv (char *buf, + size_t len, + int flags, + ACE_Time_Value *s = 0); + // Read len bytes from into buf using flags. + + virtual ssize_t recv (iovec *iov, + int iovcnt, + ACE_Time_Value *s = 0); + // Read received data into the iovec buffers. +protected: + TAO_AV_UDP_QoS_Flow_Handler *handler_; + ACE_Addr *addr_; + ACE_INET_Addr peer_addr_; +}; + +class TAO_AV_UDP_QoS_Flow_Handler + :public virtual TAO_AV_Flow_Handler, + public virtual ACE_Event_Handler +{ +public: + TAO_AV_UDP_QoS_Flow_Handler (void); + //Ctor + ~TAO_AV_UDP_QoS_Flow_Handler (void); + // Dtor + int open (ACE_Addr &address); + virtual TAO_AV_Transport *transport (void); + virtual int set_remote_address (ACE_Addr *address); + virtual ACE_HANDLE get_handle (void) const; + virtual int handle_input (ACE_HANDLE fd); + virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg = 0); + virtual int handle_qos (ACE_HANDLE fd); + // Handles a QoS event. Right now, just + // prints a message. + ACE_SOCK_Dgram_Mcast_QoS *get_socket (void); + virtual ACE_Event_Handler* event_handler (void){ return this; } + virtual ACE_QoS_Session* qos_session (void); + virtual void qos_session (ACE_QoS_Session *qos_session); +protected: + TAO_AV_Core *av_core_; + ACE_INET_Addr peer_addr_; + ACE_SOCK_Dgram_Mcast_QoS qos_sock_dgram_; + ACE_QoS_Session *qos_session_; +}; + +class TAO_AV_UDP_QoS_Acceptor + :public TAO_AV_Acceptor +{ +public: + TAO_AV_UDP_QoS_Acceptor (void); + virtual ~TAO_AV_UDP_QoS_Acceptor (void); + virtual int open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory); + + virtual int open_default (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory); + + virtual int open_i (ACE_INET_Addr *address); + + virtual int close (void); + // virtual int activate_svc_handler (TAO_AV_Flow_Handler *handler); + virtual int activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler); + + int translate (CosPropertyService::Properties &qos_params, + ACE_Flow_Spec *ace_flow_spec); + +protected: + TAO_Base_StreamEndPoint *endpoint_; + TAO_FlowSpec_Entry *entry_; + TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_; + ACE_QoS_Session *qos_session_; + ACE_QoS_Manager qos_manager_; +}; + +class TAO_AV_UDP_QoS_Connector + :public TAO_AV_Connector +{ +public: + TAO_AV_UDP_QoS_Connector (void); + ~TAO_AV_UDP_QoS_Connector (void); + virtual int open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_AV_Flow_Protocol_Factory *factory); + + virtual int connect (TAO_FlowSpec_Entry *entry, + TAO_AV_Transport *&transport); + virtual int activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler); + virtual int close (void); + + int translate (CosPropertyService::Properties &qos_params, + ACE_Flow_Spec *ace_flow_spec); + +protected: + TAO_Base_StreamEndPoint *endpoint_; + TAO_AV_Core *av_core_; + TAO_FlowSpec_Entry *entry_; + TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_; + ACE_QoS_Session *qos_session_; + ACE_QoS_Manager qos_manager_; +}; + +// class TAO_AV_Export TAO_AV_UDP_Object : public TAO_AV_Protocol_Object +// { +// public: +// TAO_AV_UDP_Object (TAO_AV_Callback *callback, +// TAO_AV_Transport *transport = 0); + +// virtual ~TAO_AV_UDP_Object (void); +// // Dtor + +// virtual int handle_input (void); + +// virtual int send_frame (ACE_Message_Block *frame, +// TAO_AV_frame_info *frame_info = 0); +// // send a data frame. + +// virtual int send_frame (const iovec *iov, +// int iovcnt, +// TAO_AV_frame_info *frame_info = 0); + +// virtual int destroy (void); +// // end the stream. + +// private: +// ACE_Message_Block frame_; +// // Pre-allocated memory to receive the data... +// }; + +class TAO_AV_UDP_QoS_Flow_Factory : public TAO_AV_Flow_Protocol_Factory +{ +public: + TAO_AV_UDP_QoS_Flow_Factory (void); + virtual ~TAO_AV_UDP_QoS_Flow_Factory (void); + virtual int init (int argc, char *argv[]); + // Initialization hook. + virtual int match_protocol (const char *flow_string); + TAO_AV_Protocol_Object* make_protocol_object (TAO_FlowSpec_Entry *entry, + TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Flow_Handler *handler, + TAO_AV_Transport *transport); +}; + +ACE_STATIC_SVC_DECLARE (TAO_AV_UDP_QoS_Flow_Factory) +ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_UDP_QoS_Flow_Factory) + +ACE_STATIC_SVC_DECLARE (TAO_AV_UDP_QoS_Factory) +ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_UDP_QoS_Factory) + + +#if defined(__ACE_INLINE__) +#include "QoS_UDP.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_AV_QOS_UDP_H */ diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i new file mode 100644 index 00000000000..b3e1a80b1ab --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i @@ -0,0 +1,33 @@ +/* -*- C++ -*- */ + +// $Id$ + +//---------------------------------------------------------------------- +// TAO_AV_UDP_Flow_Handler +//---------------------------------------------------------------------- +ACE_INLINE +ACE_SOCK_Dgram_Mcast_QoS * +TAO_AV_UDP_QoS_Flow_Handler::get_socket (void) +{ + return &this->qos_sock_dgram_; +} + +ACE_INLINE +int +TAO_AV_UDP_QoS_Flow_Handler::open (ACE_Addr &address) +{ + // return this->qos_sock_dgram_.open (address); + return 0; +} + +ACE_INLINE ACE_QoS_Session* +TAO_AV_UDP_QoS_Flow_Handler::qos_session (void) +{ + return this->qos_session_; +} + +ACE_INLINE void +TAO_AV_UDP_QoS_Flow_Handler::qos_session (ACE_QoS_Session *qos_session) +{ + this->qos_session_ = qos_session; +} diff --git a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp index 85a09a59eb5..068b519936a 100644 --- a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp @@ -11,6 +11,10 @@ #include "FlowSpec_Entry.h" #include "AV_Core.h" +#ifdef ACE_HAS_RAPI +#include "QoS_UDP.h" +#endif /*ACE_HAS_RAPI*/ + #include "tao/debug.h" #include "ace/Dynamic_Service.h" @@ -62,7 +66,7 @@ TAO_AV_Connector_Registry::open (TAO_Base_StreamEndPoint *endpoint, ACE_Addr *address = entry->address (); const char *flow_protocol = entry->flow_protocol_str (); const char *transport_protocol = entry->carrier_protocol_str (); - + if (ACE_OS::strcmp (flow_protocol,"") == 0) flow_protocol = transport_protocol; @@ -115,6 +119,7 @@ TAO_AV_Connector_Registry::open (TAO_Base_StreamEndPoint *endpoint, av_core, (*flow_factory)->factory ()) == -1) return -1; + TAO_AV_Transport *transport = 0; if (connector->connect (entry, transport) == -1) @@ -271,7 +276,7 @@ TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint, { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "TAO_AV_Acceptor_Registry::open")); + "TAO_AV_Acceptor_Registry::open \n")); TAO_AV_FlowSpecSetItor last_flowspec = flow_spec_set.end (); @@ -289,13 +294,15 @@ TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint, flow_protocol = transport_protocol; if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "TAO_AV_Acceptor_Registry::protocol for flow %s is %d", + "TAO_AV_Acceptor_Registry::protocol for flow %s is %s\n", entry->flowname (), transport_protocol)); if (address == 0) { - this->open_default (endpoint,av_core, entry); + this->open_default (endpoint, + av_core, + entry); continue; } else @@ -313,11 +320,14 @@ TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint, { TAO_AV_TransportFactorySetItor transport_factory_end = av_core->transport_factories ()->end (); + + int i = 1; for (TAO_AV_TransportFactorySetItor transport_factory = av_core->transport_factories ()->begin (); transport_factory != transport_factory_end; ++transport_factory) { + if ((*transport_factory)->factory ()->match_protocol (transport_protocol)) { TAO_AV_Acceptor *acceptor = diff --git a/TAO/orbsvcs/orbsvcs/AV/UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/UDP.cpp index 9b5f7b403f2..3d7008f00c0 100644 --- a/TAO/orbsvcs/orbsvcs/AV/UDP.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/UDP.cpp @@ -609,7 +609,7 @@ TAO_AV_UDP_Factory::~TAO_AV_UDP_Factory (void) int TAO_AV_UDP_Factory::match_protocol (const char *protocol_string) { - if (ACE_OS::strstr (protocol_string,"UDP") != 0) + if (ACE_OS::strcasecmp (protocol_string,"UDP") == 0) return 1; return 0; } diff --git a/TAO/orbsvcs/orbsvcs/Makefile.av b/TAO/orbsvcs/orbsvcs/Makefile.av index 481dda3d9fb..229924327d3 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile.av +++ b/TAO/orbsvcs/orbsvcs/Makefile.av @@ -60,6 +60,11 @@ CPP_SRCS = \ AV/RTP\ AV/sfp +ifeq ($(rapi),1) + CPP_SRCS += AV/Fill_ACE_QoS\ + AV/QoS_UDP +endif # rapi + IDL_SRC = \ $(addsuffix S.cpp, $(IDL_FILES)) \ $(addsuffix C.cpp, $(IDL_FILES)) |