summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-20 21:22:17 +0000
committeryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-20 21:22:17 +0000
commit3948829007575024346e699431999ba225a36066 (patch)
treeaee62f7e7d6d86d6dc9cc0d16549a9fd21edcfaf
parent0a8ad5fdfe7bb383487aaffeb2a233d2f7839584 (diff)
downloadATCD-3948829007575024346e699431999ba225a36066.tar.gz
ChangeLogTag: Fri Apr 20 16:13:40 2001 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i16
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp134
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp1125
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h247
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i33
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Transport.cpp18
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/UDP.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile.av5
9 files changed, 1555 insertions, 30 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i
index 6476b9736ee..43e6b8db61f 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i
+++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.i
@@ -4,6 +4,13 @@
// AVStreams_i.i
ACE_INLINE
+TAO_AV_QoS &
+TAO_Base_StreamEndPoint::qos (void)
+{
+ return this->qos_;
+}
+
+ACE_INLINE
int
TAO_AV_QoS::set (AVStreams::streamQoS &stream_qos)
{
@@ -22,10 +29,15 @@ TAO_AV_QoS::set (AVStreams::streamQoS &stream_qos)
ACE_INLINE
int
-TAO_AV_QoS::get_flow_qos (const char *flowname,AVStreams::QoS &flow_qos)
+TAO_AV_QoS::get_flow_qos (const char *flowname,
+ AVStreams::QoS &flow_qos)
{
- int result = this->qos_map_.find (flowname, flow_qos);
+ int result = this->qos_map_.find (flowname,
+ flow_qos);
+
if (result < 0)
ACE_ERROR_RETURN ((LM_DEBUG,"qos_map::find failed\n"),-1);
return 0;
}
+
+
diff --git a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp
index cf3a2395a6a..cbe489743fe 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp
@@ -10,6 +10,10 @@
#include "orbsvcs/AV/RTCP.h"
#include "orbsvcs/AV/sfp.h"
+#ifdef ACE_HAS_RAPI
+#include "orbsvcs/AV/QoS_UDP.h"
+#endif /*ACE_HAS_RAPI*/
+
#include "tao/debug.h"
#include "tao/ORB_Core.h"
@@ -134,7 +138,10 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
TAO_AV_Core::EndPoint direction,
AVStreams::flowSpec &flow_spec)
{
- if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows\n"));
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_Core::init_forward_flows\n"));
+
TAO_AV_FlowSpecSet address_flow_set;
TAO_AV_FlowSpecSet flow_set;
TAO_AV_FlowSpecSetItor end = flow_spec_set.end ();
@@ -149,12 +156,17 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
switch (entry->direction ())
{
case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
- entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
- break;
+ {
+ entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
+ break;
+ }
case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
- entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
- break;
+ {
+ entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
+ break;
+ }
}
+ break;
}
case TAO_AV_Core::TAO_AV_ENDPOINT_A:
{
@@ -175,7 +187,11 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
ACE_Addr *address = entry->address ();
if (address != 0)
{
- if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"address given for flow %s",entry->flowname ()));
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "address given for flow %s",
+ entry->flowname ()));
+
address_flow_set.insert (entry);
}
else
@@ -193,7 +209,9 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
this,
address_flow_set);
if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Core::init_forward_flows::acceptor_registry::open failed\n"),-1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO_AV_Core::init_forward_flows::acceptor_registry::open failed\n"),
+ -1);
TAO_AV_FlowSpecSetItor end = address_flow_set.end ();
for (TAO_AV_FlowSpecSetItor start = address_flow_set.begin ();
start != end; ++start)
@@ -205,12 +223,18 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
{
if (entry->handler () != 0)
{
+ //Yamuna:PLEASE CHECK THIS LATER
+#ifndef ACE_HAS_RAPI
// For IN flows on the A side we should remove the handlers from the reactor.
- ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
- result = event_handler->reactor ()->remove_handler (event_handler,
- ACE_Event_Handler::READ_MASK);
- if (result < 0)
- if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n"));
+ ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
+ result = event_handler->reactor ()->remove_handler (event_handler,
+ ACE_Event_Handler::READ_MASK);
+ if (result < 0)
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_Core::init_forward_flows: remove_handler failed\n"));
+#endif /*ACE_HAS_RAPI*/
+
}
}
default:
@@ -222,9 +246,9 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
{
// entry doesn't exist so add it.
flow_spec_set.insert (entry);
-// size_t len = flow_spec.length ();
-// flow_spec.length (len+1);
-// flow_spec [len] = entry->entry_to_string ();
+ // size_t len = flow_spec.length ();
+ // flow_spec.length (len+1);
+ // flow_spec [len] = entry->entry_to_string ();
}
}
}
@@ -233,6 +257,9 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
{
if (address_flow_set.size () > 0)
{
+ ACE_DEBUG ((LM_DEBUG,
+ "This connector registry is called ONE\n"));
+
result = this->connector_registry_->open (endpoint,
this,
address_flow_set);
@@ -444,11 +471,14 @@ TAO_AV_Core::init_reverse_flows (TAO_Base_StreamEndPoint *endpoint,
int result = -1;
switch (direction)
{
+
case TAO_AV_Core::TAO_AV_ENDPOINT_A:
- result = this->connector_registry_->open (endpoint,
- this,
- connector_flow_set);
- break;
+ {
+ result = this->connector_registry_->open (endpoint,
+ this,
+ connector_flow_set);
+ }
+ break;
default:
break;
}
@@ -522,8 +552,8 @@ TAO_AV_Core::init_transport_factories (void)
TAO_AV_TransportFactorySetItor end = this->transport_factories_.end ();
TAO_AV_TransportFactorySetItor factory = this->transport_factories_.begin ();
- const char* foo = "UDP_Factory";
- const char * bar = "TCP_Factory";
+ const char *udp_factory_str = "UDP_Factory";
+ const char *tcp_factory_str = "TCP_Factory";
if (factory == end)
{
@@ -531,7 +561,7 @@ TAO_AV_Core::init_transport_factories (void)
TAO_AV_Transport_Item *udp_item = 0;
udp_factory =
- ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (foo);
+ ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_factory_str);
if (udp_factory == 0)
{
if (TAO_debug_level)
@@ -554,7 +584,7 @@ TAO_AV_Core::init_transport_factories (void)
TAO_AV_Transport_Item *tcp_item = 0;
tcp_factory =
- ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (bar);
+ ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (tcp_factory_str);
if (tcp_factory == 0)
{
if (TAO_debug_level)
@@ -573,6 +603,35 @@ TAO_AV_Core::init_transport_factories (void)
this->transport_factories_.insert (tcp_item);
+#ifdef ACE_HAS_RAPI
+ const char *udp_qos_factory_str = "UDP_QoS_Factory";
+
+ TAO_AV_Transport_Factory *udp_qos_factory = 0;
+ TAO_AV_Transport_Item *udp_qos_item = 0;
+
+ udp_qos_factory =
+ ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_qos_factory_str);
+ if (udp_qos_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "UDP QoS Factory"));
+
+ ACE_NEW_RETURN (udp_qos_factory,
+ TAO_AV_UDP_QoS_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (udp_qos_item,
+ TAO_AV_Transport_Item ("UDP_QoS_Factory"),
+ -1);
+
+ udp_qos_item->factory (udp_qos_factory);
+
+ this->transport_factories_.insert (udp_qos_item);
+#endif /*ACE_HAS_RAPI*/
}
return 0;
@@ -589,6 +648,7 @@ TAO_AV_Core::init_flow_protocol_factories (void)
const char *rtp_flow = "RTP_Flow_Factory";
const char *rtcp_flow = "RTCP_Flow_Factory";
const char *sfp_flow = "SFP_Flow_Factory";
+
if (factory == end)
{
TAO_AV_Flow_Protocol_Factory *udp_flow_factory = 0;
@@ -614,6 +674,34 @@ TAO_AV_Core::init_flow_protocol_factories (void)
this->flow_protocol_factories_.insert (udp_item);
+#ifdef ACE_HAS_RAPI
+
+ const char *udp_qos_flow = "UDP_QoS_Flow_Factory";
+ TAO_AV_Flow_Protocol_Factory *udp_qos_flow_factory = 0;
+ TAO_AV_Flow_Protocol_Item *udp_qos_flow_item = 0;
+
+ udp_qos_flow_factory =
+ ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (udp_qos_flow);
+ if (udp_qos_flow_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "UDP QoS Flow Factory"));
+
+ ACE_NEW_RETURN (udp_qos_flow_factory,
+ TAO_AV_UDP_QoS_Flow_Factory,
+ -1);
+ }
+
+ ACE_NEW_RETURN (udp_qos_flow_item, TAO_AV_Flow_Protocol_Item ("UDP_QoS_Flow_Factory"), -1);
+ udp_qos_flow_item->factory (udp_qos_flow_factory);
+
+ this->flow_protocol_factories_.insert (udp_qos_flow_item);
+
+#endif /*ACE_HAS_RAPI*/
+
TAO_AV_Flow_Protocol_Factory *tcp_flow_factory = 0;
TAO_AV_Flow_Protocol_Item *tcp_item = 0;
diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
index ad8f3577f43..09374995947 100644
--- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
@@ -110,6 +110,8 @@ TAO_FlowSpec_Entry::set_protocol (void)
this->protocol_ = TAO_AV_Core::TAO_AV_TCP;
else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"UDP") == 0)
this->protocol_ = TAO_AV_Core::TAO_AV_UDP;
+ else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"QoS_UDP") == 0)
+ this->protocol_ = TAO_AV_Core::TAO_AV_QOS_UDP;
else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"AAL5") == 0)
this->protocol_ = TAO_AV_Core::TAO_AV_AAL5;
else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"AAL3_4") == 0)
@@ -204,6 +206,7 @@ TAO_FlowSpec_Entry::parse_address (const char *address)
case TAO_AV_Core::TAO_AV_RTP_UDP:
case TAO_AV_Core::TAO_AV_TCP:
case TAO_AV_Core::TAO_AV_UDP:
+ case TAO_AV_Core::TAO_AV_QOS_UDP:
{
this->address_str_ = addr;
ACE_INET_Addr *inet_addr;
@@ -371,6 +374,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
case TAO_AV_Core::TAO_AV_RTP_UDP:
case TAO_AV_Core::TAO_AV_RTP_UDP_MCAST:
case TAO_AV_Core::TAO_AV_UDP:
+ case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
{
@@ -506,6 +510,7 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void)
{
case TAO_AV_Core::TAO_AV_RTP_UDP:
case TAO_AV_Core::TAO_AV_UDP:
+ case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
case TAO_AV_Core::TAO_AV_SFP_UDP:
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
new file mode 100644
index 00000000000..73eccfcd240
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
@@ -0,0 +1,1125 @@
+// $Id$
+
+#include "QoS_UDP.h"
+#include "UDP.h"
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/MCast.h"
+//#include "orbsvcs/AV/QoS_MCast.h"
+#include "orbsvcs/AV/Fill_ACE_QoS.h"
+
+#if !defined (__ACE_INLINE__)
+#include "orbsvcs/AV/QoS_UDP.i"
+#endif /* __ACE_INLINE__ */
+
+//------------------------------------------------------------
+// TAO_AV_UDP_Flow_Handler
+//------------------------------------------------------------
+
+
+int
+FillQoSParams (ACE_QoS_Params &qos_params,
+ iovec* iov,
+ ACE_QoS* qos)
+{
+ qos_params.callee_data (iov);
+ qos_params.caller_data (0);
+ qos_params.socket_qos (qos);
+ qos_params.group_socket_qos (0);
+ qos_params.flags (ACE_JL_BOTH);
+
+ return 0;
+}
+
+
+TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void)
+{
+ ACE_NEW (this->transport_,
+ TAO_AV_UDP_QoS_Transport (this));
+}
+
+TAO_AV_UDP_QoS_Flow_Handler::~TAO_AV_UDP_QoS_Flow_Handler (void)
+{
+ delete this->transport_;
+}
+
+TAO_AV_Transport *
+TAO_AV_UDP_QoS_Flow_Handler::transport (void)
+{
+ return this->transport_;
+}
+
+int
+TAO_AV_UDP_QoS_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
+{
+ this->protocol_object_->handle_input ();
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n"));
+
+ if (this->qos_session_->update_qos () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in updating QoS\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ " Updating QOS succeeds.\n"));
+
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
+}
+
+int
+TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Flow_Handler::set_remote_address\n"));
+
+ ACE_INET_Addr *inet_addr =
+ ACE_dynamic_cast (ACE_INET_Addr*,address);
+
+ this->peer_addr_ = *inet_addr;
+
+ TAO_AV_UDP_QoS_Transport *transport =
+ ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_);
+
+ return transport->set_remote_address (*inet_addr);
+}
+
+
+ACE_HANDLE
+TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n",
+ this->qos_sock_dgram_.get_handle ()));
+
+ return this->qos_sock_dgram_.get_handle () ;
+}
+
+//------------------------------------------------------------
+// TAO_AV_UDP_Transport
+//------------------------------------------------------------
+
+TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (void)
+ :handler_ (0)
+{
+}
+
+TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler)
+ :handler_ (handler),
+ addr_ (0)
+{
+}
+
+TAO_AV_UDP_QoS_Transport::~TAO_AV_UDP_QoS_Transport (void)
+{
+}
+
+int
+TAO_AV_UDP_QoS_Transport::set_remote_address (const ACE_INET_Addr &address)
+{
+ this->peer_addr_ = address;
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Transport::open (ACE_Addr * /*address*/)
+{
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Transport::close (void)
+{
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Transport::mtu (void)
+{
+ return ACE_MAX_DGRAM_SIZE;
+}
+
+ACE_Addr*
+TAO_AV_UDP_QoS_Transport::get_peer_addr (void)
+{
+ return &this->peer_addr_;
+}
+
+ssize_t
+TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
+ ACE_Time_Value *)
+{
+ // For the most part this was copied from GIOP::send_request and
+ // friends.
+
+ iovec iov[IOV_MAX];
+ int iovcnt = 0;
+ ssize_t n = 0;
+ ssize_t nbytes = 0;
+
+ for (const ACE_Message_Block *i = mblk;
+ i != 0;
+ i = i->cont ())
+ {
+ // Make sure there is something to send!
+ if (i->length () > 0)
+ {
+ iov[iovcnt].iov_base = i->rd_ptr ();
+ iov[iovcnt].iov_len = i->length ();
+ iovcnt++;
+
+ // The buffer is full make a OS call. @@ TODO this should
+ // be optimized on a per-platform basis, for instance, some
+ // platforms do not implement writev() there we should copy
+ // the data into a buffer and call send_n(). In other cases
+ // there may be some limits on the size of the iovec, there
+ // we should set IOV_MAX to that limit.
+
+ size_t bytes_sent = 0;
+
+ if (iovcnt == IOV_MAX)
+ {
+ if (this->handler_->get_socket ()->send (iov,
+ 1,
+ bytes_sent,
+ 0,
+ this->handler_->qos_session ()->dest_addr (),
+ 0,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in dgram_mcast.send ()\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
+
+ if (n < 1)
+ return n;
+
+ nbytes += bytes_sent;
+ iovcnt = 0;
+ }
+ }
+ }
+
+ size_t bytes_sent = 0;
+
+ // Check for remaining buffers to be sent!
+ if (iovcnt != 0)
+ {
+ if (this->handler_->get_socket ()->send (iov,
+ 1,
+ bytes_sent,
+ 0,
+ this->handler_->qos_session ()->dest_addr (),
+ 0,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in dgram_mcast.send ()\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
+
+ if (n < 1)
+ return n;
+
+ nbytes += bytes_sent;
+ }
+
+ return nbytes;
+}
+
+ssize_t
+TAO_AV_UDP_QoS_Transport::send (const char *buf,
+ size_t len,
+ ACE_Time_Value *)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Transport::send "));
+
+ char addr [BUFSIZ];
+ this->peer_addr_.addr_to_string (addr,BUFSIZ);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "to %s\n",
+ addr));
+
+ return this->handler_->get_socket ()->send (buf,
+ len,
+ this->handler_->qos_session ()->dest_addr (),
+ 0,
+ 0,
+ 0);
+}
+
+ssize_t
+TAO_AV_UDP_QoS_Transport::send (const iovec *iov,
+ int /*iovcnt*/,
+ ACE_Time_Value *)
+{
+ size_t bytes_sent = 0;
+ if (this->handler_->get_socket ()->send (iov,
+ 1,
+ bytes_sent,
+ 0,
+ this->handler_->qos_session ()->dest_addr (),
+ 0,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in dgram_mcast.send ()\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
+
+ return bytes_sent;
+}
+
+ssize_t
+TAO_AV_UDP_QoS_Transport::recv (char *buf,
+ size_t len,
+ ACE_Time_Value *)
+{
+ return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
+}
+
+ssize_t
+TAO_AV_UDP_QoS_Transport::recv (char *buf,
+ size_t len,
+ int flags,
+ ACE_Time_Value *timeout)
+{
+ return this->handler_->get_socket ()->recv (buf,
+ len,
+ this->peer_addr_,
+ flags,
+ timeout);
+}
+
+ssize_t
+TAO_AV_UDP_QoS_Transport::recv (iovec *iov,
+ int /*iovcnt*/,
+ ACE_Time_Value *timeout)
+{
+ return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
+}
+
+
+//------------------------------------------------------------
+// TAO_AV_UDP_Acceptor
+//------------------------------------------------------------
+
+TAO_AV_UDP_QoS_Acceptor::TAO_AV_UDP_QoS_Acceptor (void)
+{
+}
+
+TAO_AV_UDP_QoS_Acceptor::~TAO_AV_UDP_QoS_Acceptor (void)
+{
+}
+
+int
+TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
+{
+ int result = 0;
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Acceptor Svc Handler QOS ENABLED \n"));
+
+ ACE_QoS_Decorator* qos_decorator;
+
+ // Decorate the above handler with QoS functionality.
+ ACE_NEW_RETURN (qos_decorator,
+ ACE_QoS_Decorator (handler,
+ handler->qos_session (),
+ this->av_core_->reactor ()),
+ -1);
+
+ // Initialize the Decorator.
+ if (qos_decorator->init () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "QoS Decorator init () failed.\n"),
+ -1);
+
+ // Register the decorated Event Handler with the Reactor.
+ result = this->av_core_->reactor ()->register_handler (qos_decorator,
+ ACE_Event_Handler::QOS_MASK |
+ ACE_Event_Handler::READ_MASK);
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in registering the Decorator with the Reactor\n"),
+ -1);
+
+ return result;
+}
+
+int
+TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Acceptor::open "));
+
+ this->av_core_ = av_core;
+ this->endpoint_ = endpoint;
+ this->entry_ = entry;
+
+
+ this->flow_protocol_factory_ = factory;
+ this->flowname_ = entry->flowname ();
+ ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) entry->address ();
+// inet_addr->set (inet_addr->get_port_number (),
+// inet_addr->get_host_name ());
+ char buf[BUFSIZ];
+ inet_addr->addr_to_string (buf,
+ BUFSIZ);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Acceptor::open: %s",
+ buf));
+
+ int result = this->open_i (inet_addr);
+
+ if (result < 0)
+ return result;
+
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory)
+{
+ this->av_core_ = av_core;
+ this->endpoint_ = endpoint;
+ this->entry_ = entry;
+ this->flow_protocol_factory_ = factory;
+ this->flowname_ = entry->flowname ();
+ ACE_INET_Addr *address;
+ ACE_NEW_RETURN (address,
+ ACE_INET_Addr ("0"),
+ -1);
+ int result = this->open_i (address);
+ if (result < 0)
+ return result;
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Acceptor::translate (CosPropertyService::Properties &qos_params,
+ ACE_Flow_Spec *ace_flow_spec)
+{
+ for (unsigned int i = 0;
+ i < qos_params.length ();
+ i++)
+ {
+ if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
+ {
+ CORBA::Short type;
+ qos_params [i].property_value >>= type;
+ ace_flow_spec->service_type (type);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
+ {
+ CORBA::ULong tok_rate;
+ qos_params [i].property_value >>= tok_rate;
+ ace_flow_spec->token_rate (tok_rate);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Size") == 0)
+ {
+ CORBA::ULong tok_buck_size;
+ qos_params [i].property_value >>= tok_buck_size;
+ ace_flow_spec->token_bucket_size (tok_buck_size);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
+ {
+ CORBA::ULong peak_bw;
+ qos_params [i].property_value >>= peak_bw;
+ ace_flow_spec->peak_bandwidth (peak_bw);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
+ {
+ CORBA::ULong lat;
+ qos_params [i].property_value >>= lat;
+ ace_flow_spec->latency (lat);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
+ {
+ CORBA::ULong delay_var;
+ qos_params [i].property_value >>= delay_var;
+ ace_flow_spec->delay_variation (delay_var);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Max_SDU_Size") == 0)
+ {
+ CORBA::ULong max_sdu;
+ qos_params [i].property_value >>= max_sdu;
+ ace_flow_spec->delay_variation (max_sdu);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Minimum_Policed_Size") == 0)
+ {
+ CORBA::ULong min_pol_size;
+ qos_params [i].property_value >>= min_pol_size;
+ ace_flow_spec->delay_variation (min_pol_size);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "TTL") == 0)
+ {
+ CORBA::ULong ttl;
+ qos_params [i].property_value >>= ttl;
+ ace_flow_spec->delay_variation (ttl);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Priority") == 0)
+ {
+ CORBA::ULong priority;
+ qos_params [i].property_value >>= priority;
+ ace_flow_spec->delay_variation (priority);
+ }
+ }
+
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
+{
+ int result = -1;
+ // TAO_AV_Callback *callback = 0;
+ // this->endpoint_->get_callback (this->flowname_.c_str (),
+ // callback);
+ ACE_INET_Addr *local_addr;
+
+ AVStreams::QoS qos;
+ this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
+ qos);
+
+ ACE_Flow_Spec *ace_flow_spec;
+ ACE_NEW_RETURN (ace_flow_spec,
+ ACE_Flow_Spec,
+ -1);
+
+ this->translate (qos.QoSParams,
+ ace_flow_spec);
+
+ ACE_QoS* ace_qos;
+
+ ACE_NEW_RETURN (ace_qos,
+ ACE_QoS,
+ -1);
+
+ Fill_ACE_QoS fill_ace_qos;
+
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
+ ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex sender qos\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Sender QoS parameters\n"));
+ }
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
+ ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex receiver qos\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Receiver QoS parameters\n"));
+
+ }
+
+ ACE_QoS_Params qos_params;
+ FillQoSParams (qos_params,
+ 0,
+ ace_qos);
+
+ // Create a QoS Session Factory.
+ ACE_QoS_Session_Factory session_factory;
+
+ // Ask the factory to create a QoS session. This could be RAPI or
+ // GQoS based on the parameter passed.
+
+ //@@YAmuna : Later make this generic for GQoS
+ this->qos_session_ =
+ session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
+
+ // Create a destination address for the QoS session. The same
+ // address should be used for the subscribe call later. A copy
+ // is made below only to distinguish the two usages of the dest
+ // address.
+ ACE_INET_Addr dest_addr (*inet_addr);
+
+ // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
+ // Protocol]. Initialize the QoS session.
+ if (this->qos_session_->open (*inet_addr,
+ IPPROTO_UDP) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in opening the QoS session\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "QoS session opened successfully\n"));
+
+ TAO_AV_UDP_QoS_Flow_Handler* handler;
+ ACE_NEW_RETURN (handler,
+ TAO_AV_UDP_QoS_Flow_Handler,
+ -1);
+
+ TAO_AV_Flow_Handler *flow_handler = 0;
+ flow_handler = handler;
+
+ result = handler->get_socket ()->subscribe (*inet_addr,
+ qos_params,
+ 1,
+ 0,
+ AF_INET,
+ // ACE_FROM_PROTOCOL_INFO,
+ 0,
+ 0, // ACE_Protocol_Info,
+ 0,
+ ACE_OVERLAPPED_SOCKET_FLAG
+ | ACE_FLAG_MULTIPOINT_C_LEAF
+ | ACE_FLAG_MULTIPOINT_D_LEAF,
+ this->qos_session_);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
+ else ACE_DEBUG ((LM_DEBUG,
+ "Subscribe succeeded\n"));
+
+ handler->qos_session (this->qos_session_);
+
+ ACE_NEW_RETURN (local_addr,
+ ACE_INET_Addr (*inet_addr),
+ -1);
+
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ // This is a sender
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
+ }
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ // This is a receiver
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
+ }
+
+ this->qos_manager_
+ = handler->get_socket ()->qos_manager ();
+
+ // Set the QoS for the session. Replaces the ioctl () call that
+ // was being made previously.
+ if (this->qos_session_->qos (handler->get_socket (),
+ &this->qos_manager_,
+ *ace_qos) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to set QoS\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Setting QOS succeeds.\n"));
+
+ // ACE_Time_Value era (10);
+ //this->adaptor ().flowspec_entry (this->entry_);
+ //this->adaptor ().endpoint (this->endpoint_);
+// TAO_AV_CORE::instance ()->monitor ().register_session (this->qos_session_,
+ // adaptor,
+ // (void *) TAO_AV_CORE::instance ()->get_adaptor ((char*)this->flowname ()),
+ // era,
+ // QoS_Monitor::PEAK_BANDWIDTH);
+
+ //ll_ace_qos.map ().unbind (g_711);
+
+ TAO_AV_Protocol_Object *object =
+ this->flow_protocol_factory_->make_protocol_object (this->entry_,
+ this->endpoint_,
+ flow_handler,
+ flow_handler->transport ());
+ flow_handler->protocol_object (object);
+ // callback->protocol_object (object);
+// this->endpoint_->set_protocol_object (this->flowname_.c_str (),
+// object);
+ this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
+ this->entry_->protocol_object (object);
+
+ char buf[BUFSIZ];
+ local_addr->addr_to_string (buf,BUFSIZ);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_ACCEPTOR::open:%s \n",
+ buf));
+
+ this->entry_->set_local_addr (local_addr);
+ this->entry_->handler (flow_handler);
+
+ // call activate svc handler.
+ return this->activate_svc_handler (handler);
+}
+
+int
+TAO_AV_UDP_QoS_Acceptor::close (void)
+{
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_UDP_Connector
+//------------------------------------------------------------
+TAO_AV_UDP_QoS_Connector::TAO_AV_UDP_QoS_Connector (void)
+{
+}
+
+TAO_AV_UDP_QoS_Connector::~TAO_AV_UDP_QoS_Connector (void)
+{
+}
+
+int
+TAO_AV_UDP_QoS_Connector::open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_AV_Flow_Protocol_Factory *factory)
+
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Connector::open "));
+
+ this->endpoint_ = endpoint;
+ this->av_core_ = av_core;
+ this->flow_protocol_factory_ = factory;
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Connector::translate (CosPropertyService::Properties &qos_params,
+ ACE_Flow_Spec *ace_flow_spec)
+{
+ for (unsigned int i = 0;
+ i < qos_params.length ();
+ i++)
+ {
+ if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
+ {
+ CORBA::Short type;
+ qos_params [i].property_value >>= type;
+ ace_flow_spec->service_type (type);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
+ {
+ CORBA::ULong tok_rate;
+ qos_params [i].property_value >>= tok_rate;
+ ace_flow_spec->token_rate (tok_rate);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Rate") == 0)
+ {
+ CORBA::ULong tok_buck_size;
+ qos_params [i].property_value >>= tok_buck_size;
+ ace_flow_spec->token_bucket_size (tok_buck_size);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
+ {
+ CORBA::ULong peak_bw;
+ qos_params [i].property_value >>= peak_bw;
+ ace_flow_spec->peak_bandwidth (peak_bw);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
+ {
+ CORBA::ULong lat;
+ qos_params [i].property_value >>= lat;
+ ace_flow_spec->latency (lat);
+ }
+ else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
+ {
+ CORBA::ULong delay_var;
+ qos_params [i].property_value >>= delay_var;
+ ace_flow_spec->delay_variation (delay_var);
+ }
+
+ }
+
+ return 0;
+}
+
+
+int
+TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
+ TAO_AV_Transport *&transport)
+{
+ int result = -1;
+ this->entry_ = entry;
+ this->flowname_ = entry->flowname ();
+
+// ACE_INET_Addr *local_addr;
+// ACE_NEW_RETURN (local_addr,
+// ACE_INET_Addr ("0"),
+// -1);
+
+ Fill_ACE_QoS fill_ace_qos;
+
+ AVStreams::QoS qos;
+ this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
+ qos);
+ ACE_Flow_Spec *ace_flow_spec;
+ ACE_NEW_RETURN (ace_flow_spec,
+ ACE_Flow_Spec,
+ -1);
+
+ this->translate (qos.QoSParams,
+ ace_flow_spec);
+
+ ACE_QoS* ace_qos;
+
+ ACE_NEW_RETURN (ace_qos,
+ ACE_QoS,
+ -1);
+
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
+ ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex receiver qos\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Receiver QoS parameters\n"));
+ }
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
+ ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex sender qos\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Sender QoS parameters\n"));
+ }
+
+ ACE_QoS_Params qos_params;
+ FillQoSParams (qos_params,
+ 0,
+ ace_qos);
+
+ ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,
+ entry->address ());
+ TAO_AV_Flow_Handler *flow_handler = 0;
+
+ // Create a QoS Session Factory.
+ ACE_QoS_Session_Factory session_factory;
+
+ // Ask the factory to create a QoS session. This could be RAPI or
+ // GQoS based on the parameter passed.
+
+ //@@YAmuna : Later make this generic for GQoS
+ this->qos_session_ =
+ session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
+
+
+ // Create a destination address for the QoS session. The same
+ // address should be used for the subscribe call later. A copy
+ // is made below only to distinguish the two usages of the dest
+ // address.
+ ACE_INET_Addr dest_addr (*inet_addr);
+
+ // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
+ // Protocol]. Initialize the QoS session.
+ if (this->qos_session_->open (*inet_addr,
+ IPPROTO_UDP) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in opening the QoS session\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "QoS session opened successfully\n"));
+
+ TAO_AV_UDP_QoS_Flow_Handler *handler;
+ ACE_NEW_RETURN (handler,
+ TAO_AV_UDP_QoS_Flow_Handler,
+ -1);
+
+ handler->qos_session (this->qos_session_);
+
+ flow_handler = handler;
+
+
+
+
+ result = handler->get_socket ()->subscribe (*inet_addr,
+ qos_params,
+ 1,
+ 0,
+ AF_INET,
+ // ACE_FROM_PROTOCOL_INFO,
+ 0,
+ 0, // ACE_Protocol_Info,
+ 0,
+ ACE_OVERLAPPED_SOCKET_FLAG
+ | ACE_FLAG_MULTIPOINT_C_LEAF
+ | ACE_FLAG_MULTIPOINT_D_LEAF,
+ this->qos_session_);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
+ // Now disable Multicast loopback.
+ // @@Should we make this a policy?
+
+ ACE_INET_Addr *local_addr;
+
+ ACE_NEW_RETURN (local_addr,
+ ACE_INET_Addr (*inet_addr),
+ -1);
+
+
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ // This is a sender
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
+ }
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ // This is a receiver
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
+ }
+
+ this->qos_manager_ =
+ handler->get_socket ()->qos_manager ();
+
+ // Set the QoS for the session. Replaces the ioctl () call that
+ // was being made previously.
+ if (this->qos_session_->qos (handler->get_socket (),
+ &this->qos_manager_,
+ *ace_qos) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to set QoS\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Setting QOS succeeds.\n"));
+
+
+ TAO_AV_Protocol_Object *object =
+ this->flow_protocol_factory_->make_protocol_object (this->entry_,
+ this->endpoint_,
+ flow_handler,
+ flow_handler->transport ());
+ flow_handler->protocol_object (object);
+ // callback->protocol_object (object);
+ // this->endpoint_->set_protocol_object (this->flowname_.c_str (),
+ // object);
+ this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
+ this->entry_->protocol_object (object);
+
+ char buf[BUFSIZ];
+ local_addr->addr_to_string (buf,BUFSIZ);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_CONNECTOR::connect:%s \n",
+ buf));
+
+ entry->set_local_addr (local_addr);
+ entry->handler (flow_handler);
+ transport = flow_handler->transport ();
+ // call activate svc handler.
+ return this->activate_svc_handler (handler);
+}
+
+int
+TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
+{
+ int result = 0;
+ ACE_QoS_Decorator* qos_decorator;
+
+ // Decorate the above handler with QoS functionality.
+ ACE_NEW_RETURN (qos_decorator,
+ ACE_QoS_Decorator (handler,
+ handler->qos_session (),
+ this->av_core_->reactor ()),
+ -1);
+
+ // Initialize the Decorator.
+ if (qos_decorator->init () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "QoS Decorator init () failed.\n"),
+ -1);
+
+ // Register the decorated Event Handler with the Reactor.
+ result = this->av_core_->reactor ()->register_handler (qos_decorator,
+ ACE_Event_Handler::QOS_MASK |
+ ACE_Event_Handler::READ_MASK);
+
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in registering the Decorator with the Reactor\n"),
+ -1);
+
+ return result;
+}
+
+int
+TAO_AV_UDP_QoS_Connector::close (void)
+{
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_UDP_Protocol_Factory
+//------------------------------------------------------------
+
+TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory (void)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory\n"));
+}
+
+TAO_AV_UDP_QoS_Factory::~TAO_AV_UDP_QoS_Factory (void)
+{
+}
+
+int
+TAO_AV_UDP_QoS_Factory::match_protocol (const char *protocol_string)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Factory::match_protocol\n"));
+
+ if (ACE_OS::strcasecmp (protocol_string,"QoS_UDP") == 0)
+ return 1;
+ return 0;
+}
+
+TAO_AV_Acceptor*
+TAO_AV_UDP_QoS_Factory::make_acceptor (void)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Factory::make_acceptor "));
+
+ TAO_AV_Acceptor *acceptor = 0;
+ ACE_NEW_RETURN (acceptor,
+ TAO_AV_UDP_QoS_Acceptor,
+ 0);
+ return acceptor;
+}
+
+TAO_AV_Connector*
+TAO_AV_UDP_QoS_Factory::make_connector (void)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Factory::make_connector "));
+
+ TAO_AV_Connector *connector = 0;
+ ACE_NEW_RETURN (connector,
+ TAO_AV_UDP_QoS_Connector,
+ 0);
+ return connector;
+}
+
+int
+TAO_AV_UDP_QoS_Factory::init (int /* argc */,
+ char * /* argv */ [])
+{
+ return 0;
+}
+
+
+//------------------------------------------------------------
+// TAO_AV_UDP_Flow_Factory
+//------------------------------------------------------------
+TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory (void)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory\n"));
+}
+
+TAO_AV_UDP_QoS_Flow_Factory::~TAO_AV_UDP_QoS_Flow_Factory (void)
+{
+}
+
+int
+TAO_AV_UDP_QoS_Flow_Factory::init (int /* argc */,
+ char * /* argv */ [])
+{
+ return 0;
+}
+
+int
+TAO_AV_UDP_QoS_Flow_Factory::match_protocol (const char *flow_string)
+{
+ if (ACE_OS::strcasecmp (flow_string,"QoS_UDP") == 0)
+ return 1;
+ return 0;
+}
+
+TAO_AV_Protocol_Object*
+TAO_AV_UDP_QoS_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
+ TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Flow_Handler *handler,
+ TAO_AV_Transport *transport)
+{
+ TAO_AV_Callback *callback = 0;
+ endpoint->get_callback (entry->flowname (),
+ callback);
+
+
+ TAO_AV_UDP_Object *object = 0;
+ ACE_NEW_RETURN (object,
+ TAO_AV_UDP_Object (callback,
+ transport),
+ 0);
+ callback->open (object,
+ handler);
+ endpoint->set_protocol_object (entry->flowname (),
+ object);
+ return object;
+}
+
+ACE_FACTORY_DEFINE (AV, TAO_AV_UDP_QoS_Flow_Factory)
+ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Flow_Factory,
+ ACE_TEXT ("UDP_QoS_Flow_Factory"),
+ ACE_SVC_OBJ_T,
+ &ACE_SVC_NAME (TAO_AV_UDP_QoS_Flow_Factory),
+ ACE_Service_Type::DELETE_THIS |
+ ACE_Service_Type::DELETE_OBJ,
+ 0)
+
+ACE_FACTORY_DEFINE (AV, TAO_AV_UDP_QoS_Factory)
+
+ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Factory,
+ ACE_TEXT ("UDP_QoS_Factory"),
+ ACE_SVC_OBJ_T,
+ &ACE_SVC_NAME (TAO_AV_UDP_QoS_Factory),
+ ACE_Service_Type::DELETE_THIS |
+ ACE_Service_Type::DELETE_OBJ,
+ 0)
+
+
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h
new file mode 100644
index 00000000000..eb5ae43c455
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h
@@ -0,0 +1,247 @@
+/* -*- C++ -*- */
+
+// $Id$
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS AVStreams
+//
+// = FILENAME
+// UDP.h
+//
+// = AUTHOR
+// Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+//
+//
+// ============================================================================
+
+#ifndef TAO_AV_QOS_UDP_H
+#define TAO_AV_QOS_UDP_H
+#include "ace/pre.h"
+
+
+#include "ace/OS.h"
+#include "ace/QoS/QoS_Session_Factory.h"
+#include "ace/QoS/QoS_Decorator.h"
+#include "ace/QoS/SOCK_Dgram_Mcast_QoS.h"
+
+#include "orbsvcs/AV/Protocol_Factory.h"
+
+
+class TAO_AV_Export TAO_AV_UDP_QoS_Factory : public TAO_AV_Transport_Factory
+{
+public:
+ TAO_AV_UDP_QoS_Factory (void);
+ virtual ~TAO_AV_UDP_QoS_Factory (void);
+ virtual int init (int argc, char *argv[]);
+ // Initialization hook.
+ virtual int match_protocol (const char *protocol_string);
+ virtual TAO_AV_Acceptor *make_acceptor (void);
+ virtual TAO_AV_Connector *make_connector (void);
+};
+
+class TAO_AV_UDP_QoS_Flow_Handler;
+
+class TAO_AV_UDP_QoS_Transport
+ :public TAO_AV_Transport
+{
+ // = TITLE
+ // A transport abstraction for udp sockets.
+ //
+ // = DESCRIPTION
+ // Uses the ACE_SOCK_Dgram to send the data.
+public:
+ TAO_AV_UDP_QoS_Transport (void);
+
+ TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler);
+
+ virtual ~TAO_AV_UDP_QoS_Transport (void);
+
+ virtual int open (ACE_Addr *addr);
+
+ virtual int close (void);
+
+ virtual int mtu (void);
+
+ virtual ACE_Addr *get_peer_addr (void);
+
+ virtual int set_remote_address (const ACE_INET_Addr &address);
+
+ virtual ssize_t send (const ACE_Message_Block *mblk,
+ ACE_Time_Value *s = 0);
+ // Write the complete Message_Block chain to the connection.
+
+ virtual ssize_t send (const char *buf,
+ size_t len,
+ ACE_Time_Value *s = 0);
+ // Write the contents of the buffer of length len to the connection.
+
+ virtual ssize_t send (const iovec *iov,
+ int iovcnt,
+ ACE_Time_Value *s = 0);
+ // Write the contents of iovcnt iovec's to the connection.
+
+ virtual ssize_t recv (char *buf,
+ size_t len,
+ ACE_Time_Value *s = 0);
+ // Read len bytes from into buf.
+
+ virtual ssize_t recv (char *buf,
+ size_t len,
+ int flags,
+ ACE_Time_Value *s = 0);
+ // Read len bytes from into buf using flags.
+
+ virtual ssize_t recv (iovec *iov,
+ int iovcnt,
+ ACE_Time_Value *s = 0);
+ // Read received data into the iovec buffers.
+protected:
+ TAO_AV_UDP_QoS_Flow_Handler *handler_;
+ ACE_Addr *addr_;
+ ACE_INET_Addr peer_addr_;
+};
+
+class TAO_AV_UDP_QoS_Flow_Handler
+ :public virtual TAO_AV_Flow_Handler,
+ public virtual ACE_Event_Handler
+{
+public:
+ TAO_AV_UDP_QoS_Flow_Handler (void);
+ //Ctor
+ ~TAO_AV_UDP_QoS_Flow_Handler (void);
+ // Dtor
+ int open (ACE_Addr &address);
+ virtual TAO_AV_Transport *transport (void);
+ virtual int set_remote_address (ACE_Addr *address);
+ virtual ACE_HANDLE get_handle (void) const;
+ virtual int handle_input (ACE_HANDLE fd);
+ virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg = 0);
+ virtual int handle_qos (ACE_HANDLE fd);
+ // Handles a QoS event. Right now, just
+ // prints a message.
+ ACE_SOCK_Dgram_Mcast_QoS *get_socket (void);
+ virtual ACE_Event_Handler* event_handler (void){ return this; }
+ virtual ACE_QoS_Session* qos_session (void);
+ virtual void qos_session (ACE_QoS_Session *qos_session);
+protected:
+ TAO_AV_Core *av_core_;
+ ACE_INET_Addr peer_addr_;
+ ACE_SOCK_Dgram_Mcast_QoS qos_sock_dgram_;
+ ACE_QoS_Session *qos_session_;
+};
+
+class TAO_AV_UDP_QoS_Acceptor
+ :public TAO_AV_Acceptor
+{
+public:
+ TAO_AV_UDP_QoS_Acceptor (void);
+ virtual ~TAO_AV_UDP_QoS_Acceptor (void);
+ virtual int open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory);
+
+ virtual int open_default (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory);
+
+ virtual int open_i (ACE_INET_Addr *address);
+
+ virtual int close (void);
+ // virtual int activate_svc_handler (TAO_AV_Flow_Handler *handler);
+ virtual int activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler);
+
+ int translate (CosPropertyService::Properties &qos_params,
+ ACE_Flow_Spec *ace_flow_spec);
+
+protected:
+ TAO_Base_StreamEndPoint *endpoint_;
+ TAO_FlowSpec_Entry *entry_;
+ TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_;
+ ACE_QoS_Session *qos_session_;
+ ACE_QoS_Manager qos_manager_;
+};
+
+class TAO_AV_UDP_QoS_Connector
+ :public TAO_AV_Connector
+{
+public:
+ TAO_AV_UDP_QoS_Connector (void);
+ ~TAO_AV_UDP_QoS_Connector (void);
+ virtual int open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_AV_Flow_Protocol_Factory *factory);
+
+ virtual int connect (TAO_FlowSpec_Entry *entry,
+ TAO_AV_Transport *&transport);
+ virtual int activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler);
+ virtual int close (void);
+
+ int translate (CosPropertyService::Properties &qos_params,
+ ACE_Flow_Spec *ace_flow_spec);
+
+protected:
+ TAO_Base_StreamEndPoint *endpoint_;
+ TAO_AV_Core *av_core_;
+ TAO_FlowSpec_Entry *entry_;
+ TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_;
+ ACE_QoS_Session *qos_session_;
+ ACE_QoS_Manager qos_manager_;
+};
+
+// class TAO_AV_Export TAO_AV_UDP_Object : public TAO_AV_Protocol_Object
+// {
+// public:
+// TAO_AV_UDP_Object (TAO_AV_Callback *callback,
+// TAO_AV_Transport *transport = 0);
+
+// virtual ~TAO_AV_UDP_Object (void);
+// // Dtor
+
+// virtual int handle_input (void);
+
+// virtual int send_frame (ACE_Message_Block *frame,
+// TAO_AV_frame_info *frame_info = 0);
+// // send a data frame.
+
+// virtual int send_frame (const iovec *iov,
+// int iovcnt,
+// TAO_AV_frame_info *frame_info = 0);
+
+// virtual int destroy (void);
+// // end the stream.
+
+// private:
+// ACE_Message_Block frame_;
+// // Pre-allocated memory to receive the data...
+// };
+
+class TAO_AV_UDP_QoS_Flow_Factory : public TAO_AV_Flow_Protocol_Factory
+{
+public:
+ TAO_AV_UDP_QoS_Flow_Factory (void);
+ virtual ~TAO_AV_UDP_QoS_Flow_Factory (void);
+ virtual int init (int argc, char *argv[]);
+ // Initialization hook.
+ virtual int match_protocol (const char *flow_string);
+ TAO_AV_Protocol_Object* make_protocol_object (TAO_FlowSpec_Entry *entry,
+ TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Flow_Handler *handler,
+ TAO_AV_Transport *transport);
+};
+
+ACE_STATIC_SVC_DECLARE (TAO_AV_UDP_QoS_Flow_Factory)
+ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_UDP_QoS_Flow_Factory)
+
+ACE_STATIC_SVC_DECLARE (TAO_AV_UDP_QoS_Factory)
+ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_UDP_QoS_Factory)
+
+
+#if defined(__ACE_INLINE__)
+#include "QoS_UDP.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_AV_QOS_UDP_H */
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i
new file mode 100644
index 00000000000..b3e1a80b1ab
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.i
@@ -0,0 +1,33 @@
+/* -*- C++ -*- */
+
+// $Id$
+
+//----------------------------------------------------------------------
+// TAO_AV_UDP_Flow_Handler
+//----------------------------------------------------------------------
+ACE_INLINE
+ACE_SOCK_Dgram_Mcast_QoS *
+TAO_AV_UDP_QoS_Flow_Handler::get_socket (void)
+{
+ return &this->qos_sock_dgram_;
+}
+
+ACE_INLINE
+int
+TAO_AV_UDP_QoS_Flow_Handler::open (ACE_Addr &address)
+{
+ // return this->qos_sock_dgram_.open (address);
+ return 0;
+}
+
+ACE_INLINE ACE_QoS_Session*
+TAO_AV_UDP_QoS_Flow_Handler::qos_session (void)
+{
+ return this->qos_session_;
+}
+
+ACE_INLINE void
+TAO_AV_UDP_QoS_Flow_Handler::qos_session (ACE_QoS_Session *qos_session)
+{
+ this->qos_session_ = qos_session;
+}
diff --git a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
index 85a09a59eb5..068b519936a 100644
--- a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
@@ -11,6 +11,10 @@
#include "FlowSpec_Entry.h"
#include "AV_Core.h"
+#ifdef ACE_HAS_RAPI
+#include "QoS_UDP.h"
+#endif /*ACE_HAS_RAPI*/
+
#include "tao/debug.h"
#include "ace/Dynamic_Service.h"
@@ -62,7 +66,7 @@ TAO_AV_Connector_Registry::open (TAO_Base_StreamEndPoint *endpoint,
ACE_Addr *address = entry->address ();
const char *flow_protocol = entry->flow_protocol_str ();
const char *transport_protocol = entry->carrier_protocol_str ();
-
+
if (ACE_OS::strcmp (flow_protocol,"") == 0)
flow_protocol = transport_protocol;
@@ -115,6 +119,7 @@ TAO_AV_Connector_Registry::open (TAO_Base_StreamEndPoint *endpoint,
av_core,
(*flow_factory)->factory ()) == -1)
return -1;
+
TAO_AV_Transport *transport = 0;
if (connector->connect (entry,
transport) == -1)
@@ -271,7 +276,7 @@ TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint,
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_Acceptor_Registry::open"));
+ "TAO_AV_Acceptor_Registry::open \n"));
TAO_AV_FlowSpecSetItor last_flowspec
= flow_spec_set.end ();
@@ -289,13 +294,15 @@ TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint,
flow_protocol = transport_protocol;
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_Acceptor_Registry::protocol for flow %s is %d",
+ "TAO_AV_Acceptor_Registry::protocol for flow %s is %s\n",
entry->flowname (),
transport_protocol));
if (address == 0)
{
- this->open_default (endpoint,av_core, entry);
+ this->open_default (endpoint,
+ av_core,
+ entry);
continue;
}
else
@@ -313,11 +320,14 @@ TAO_AV_Acceptor_Registry::open (TAO_Base_StreamEndPoint *endpoint,
{
TAO_AV_TransportFactorySetItor transport_factory_end =
av_core->transport_factories ()->end ();
+
+ int i = 1;
for (TAO_AV_TransportFactorySetItor transport_factory =
av_core->transport_factories ()->begin ();
transport_factory != transport_factory_end;
++transport_factory)
{
+
if ((*transport_factory)->factory ()->match_protocol (transport_protocol))
{
TAO_AV_Acceptor *acceptor =
diff --git a/TAO/orbsvcs/orbsvcs/AV/UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/UDP.cpp
index 9b5f7b403f2..3d7008f00c0 100644
--- a/TAO/orbsvcs/orbsvcs/AV/UDP.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/UDP.cpp
@@ -609,7 +609,7 @@ TAO_AV_UDP_Factory::~TAO_AV_UDP_Factory (void)
int
TAO_AV_UDP_Factory::match_protocol (const char *protocol_string)
{
- if (ACE_OS::strstr (protocol_string,"UDP") != 0)
+ if (ACE_OS::strcasecmp (protocol_string,"UDP") == 0)
return 1;
return 0;
}
diff --git a/TAO/orbsvcs/orbsvcs/Makefile.av b/TAO/orbsvcs/orbsvcs/Makefile.av
index 481dda3d9fb..229924327d3 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile.av
+++ b/TAO/orbsvcs/orbsvcs/Makefile.av
@@ -60,6 +60,11 @@ CPP_SRCS = \
AV/RTP\
AV/sfp
+ifeq ($(rapi),1)
+ CPP_SRCS += AV/Fill_ACE_QoS\
+ AV/QoS_UDP
+endif # rapi
+
IDL_SRC = \
$(addsuffix S.cpp, $(IDL_FILES)) \
$(addsuffix C.cpp, $(IDL_FILES))