summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp858
1 files changed, 376 insertions, 482 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
index 1b944625014..a5623aad6f8 100644
--- a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
@@ -32,154 +32,6 @@ FillQoSParams (ACE_QoS_Params &qos_params,
return 0;
}
-TAO_AV_UDP_QoS_Session_Helper::TAO_AV_UDP_QoS_Session_Helper (void)
-{
-
-}
-
-TAO_AV_UDP_QoS_Session_Helper::~TAO_AV_UDP_QoS_Session_Helper (void)
-{
-}
-
-int
-TAO_AV_UDP_QoS_Session_Helper::set_qos (ACE_Flow_Spec &ace_flow_spec,
- TAO_AV_UDP_QoS_Flow_Handler *handler)
-{
- ACE_QoS* ace_qos = 0;
-
- ACE_NEW_RETURN (ace_qos,
- ACE_QoS,
- -1);
-
- Fill_ACE_QoS fill_ace_qos;
-
- if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
- &ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex sender qos (%N|%l)\n"),
- -1);
- else
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Filled up the Sender QoS parameters\n"));
- }
- else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
- &ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex receiver qos (%N|%l)\n"),
- -1);
- else
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Filled up the Receiver QoS parameters\n"));
-
- }
-
- ACE_QoS_Manager qos_manager = handler->get_socket ()->qos_manager ();
-
- // Set the QoS for the session. Replaces the ioctl () call that
- // was being made previously.
- if (handler->qos_session ()->qos (handler->get_socket (),
- &qos_manager,
- *ace_qos) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to set QoS (%N|%l)\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Setting QOS succeeds\n"));
-
- return 0;
-}
-
-ACE_QoS_Session *
-TAO_AV_UDP_QoS_Session_Helper::open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler,
- ACE_INET_Addr &addr)
-{
- ACE_QoS_Params qos_params;
-
- ACE_QoS* ace_qos = 0;
-
- FillQoSParams (qos_params,
- 0,
- ace_qos);
-
-
- // Create a QoS Session Factory.
- ACE_QoS_Session_Factory session_factory;
-
- // Ask the factory to create a QoS session. This could be RAPI or
- // GQoS based on the parameter passed.
-
- //@@YAmuna : Later make this generic for GQoS
- ACE_QoS_Session *qos_session = session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
-
- // Create a destination address for the QoS session. The same
- // address should be used for the subscribe call later. A copy
- // is made below only to distinguish the two usages of the dest
- // address.
- ACE_INET_Addr dest_addr (addr);
-
- // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
- // Protocol]. Initialize the QoS session.
- if (qos_session->open (dest_addr,
- IPPROTO_UDP) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in opening the QoS session\n"),
- 0);
- else
- ACE_DEBUG ((LM_DEBUG,
- "QoS session opened successfully\n"));
-
- if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- // This is a sender
- qos_session->flags (ACE_QoS_Session::ACE_QOS_SENDER);
- }
- else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- // This is a receiver
- qos_session->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
- }
-
- return qos_session;
-}
-
-int
-TAO_AV_UDP_QoS_Session_Helper::activate_qos_handler (ACE_QoS_Session *qos_session,
- TAO_AV_UDP_QoS_Flow_Handler *handler)
-{
- ACE_QoS_Decorator* qos_decorator;
-
- // Decorate the above handler with QoS functionality.
- ACE_NEW_RETURN (qos_decorator,
- ACE_QoS_Decorator (handler,
- qos_session,
- handler->av_core ()->reactor ()),
- -1);
-
- // Initialize the Decorator.
- if (qos_decorator->init () != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "QoS Decorator init () failed (%N|%l)\n"),
- -1);
-
- // Register the decorated Event Handler with the Reactor.
- int result = handler->av_core ()->reactor ()->register_handler (qos_decorator,
- ACE_Event_Handler::QOS_MASK |
- ACE_Event_Handler::READ_MASK);
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in registering the Decorator with the Reactor (%N|%l)\n"),
- -1);
-
- return 0;
-
-}
TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void)
{
@@ -282,34 +134,35 @@ int
TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec,
CosPropertyService::Properties &qos_params)
{
- qos_params.length (9);
+ int size = qos_params.length ();
+ qos_params.length (size + 1);
- qos_params [0].property_name = CORBA::string_dup ("Service_Type");
- qos_params [0].property_value <<= (CORBA::Short) ace_flow_spec->service_type ();
+ qos_params [size].property_name = CORBA::string_dup ("Service_Type");
+ qos_params [size].property_value <<= (CORBA::Short) ace_flow_spec->service_type ();
- qos_params [1].property_name = CORBA::string_dup ("Token_Rate");
- qos_params [1].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate ();
+ qos_params [size].property_name = CORBA::string_dup ("Token_Rate");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate ();
- qos_params [2].property_name = CORBA::string_dup ("Token_Bucket_Size");
- qos_params [2].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size ();
+ qos_params [size].property_name = CORBA::string_dup ("Token_Bucket_Size");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size ();
- qos_params [3].property_name = CORBA::string_dup ("Peak_Bandwidth");
- qos_params [3].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth ();
+ qos_params [size].property_name = CORBA::string_dup ("Peak_Bandwidth");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth ();
- qos_params [4].property_name = CORBA::string_dup ("Latency");
- qos_params [4].property_value <<= (CORBA::ULong) ace_flow_spec->latency ();
+ qos_params [size].property_name = CORBA::string_dup ("Latency");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->latency ();
- qos_params [5].property_name = CORBA::string_dup ("Delay_Variation");
- qos_params [5].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation ();
+ qos_params [size].property_name = CORBA::string_dup ("Delay_Variation");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation ();
- qos_params [6].property_name = CORBA::string_dup ("Max_SDU_Size");
- qos_params [6].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size ();
+ qos_params [size].property_name = CORBA::string_dup ("Max_SDU_Size");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size ();
- qos_params [7].property_name = CORBA::string_dup ("Minimum_Policed_Size");
- qos_params [7].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size ();
+ qos_params [size].property_name = CORBA::string_dup ("Minimum_Policed_Size");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size ();
- qos_params [8].property_name = CORBA::string_dup ("TTL");
- qos_params [8].property_value <<= (CORBA::ULong) ace_flow_spec->ttl ();
+ qos_params [size].property_name = CORBA::string_dup ("TTL");
+ qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->ttl ();
return 0;
}
@@ -317,22 +170,21 @@ TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec,
int
TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
{
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n"));
ACE_DECLARE_NEW_CORBA_ENV;
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n"));
+ "TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n"));
if (this->qos_session_->update_qos () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in updating QoS\n"),
-1);
else
- {
- if(TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Updating QOS succeeds.\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ " Updating QOS succeeds.\n"));
if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_ERROR)
{
@@ -348,22 +200,16 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
{
if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_EVENT)
{
- if( TAO_debug_level > 0 )
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Resv Event Received\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "Resv Event Received\n"));
if (!CORBA::is_nil (this->negotiator_))
{
- if( TAO_debug_level > 0 )
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Negotiator Specified\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "Negotiator Specified\n"));
AVStreams::streamQoS new_qos;
ACE_Flow_Spec ace_flow_spec =
- this->qos_session_->qos ().sending_flowspec ();
+ this->qos_session_->qos ().receiving_flowspec ();
new_qos.length (1);
this->translate (&ace_flow_spec,
new_qos [0].QoSParams);
@@ -381,12 +227,10 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
{
ACE_QoS_Manager qos_manager =
this->get_socket ()->qos_manager ();
-
- ACE_QoS ace_qos = this->qos_session_->qos ();
-
+
this->qos_session_->qos (this->get_socket (),
&qos_manager,
- ace_qos);
+ this->qos_session_->qos ());
}
}
@@ -396,11 +240,8 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
int
TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos)
{
- if( TAO_debug_level > 0 )
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::change_qos\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_Flow_Handler::change_qos\n"));
ACE_QoS_Manager qos_manager =
this->get_socket ()->qos_manager ();
@@ -429,11 +270,8 @@ TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos)
"Unable to fill simplex sender qos\n"),
-1);
else
- {
- if( TAO_debug_level > 0 )
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Filled up the Sender QoS parameters\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Sender QoS parameters\n"));
}
else if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_RECEIVER)
{
@@ -443,11 +281,8 @@ TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos)
"Unable to fill simplex receiver qos\n"),
-1);
else
- {
- if( TAO_debug_level > 0 )
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Filled up the Receiver QoS parameters\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Receiver QoS parameters\n"));
}
@@ -477,7 +312,7 @@ TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::set_remote_address\n"));
+ "TAO_AV_UDP_QoS_Flow_Handler::set_remote_address\n"));
ACE_INET_Addr *inet_addr =
ACE_dynamic_cast (ACE_INET_Addr*,address);
@@ -486,26 +321,7 @@ TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
TAO_AV_UDP_QoS_Transport *transport =
ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_);
-
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- TAO_AV_UDP_QoS_Session_Helper helper;
-
- this->qos_session_ = helper.open_qos_session (this,
- *inet_addr);
-
- if (this->qos_session_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "QoS Session Open Failed (%N|%l)\n"),
- -1);
-
- if (helper.activate_qos_handler (this->qos_session_,
- this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Activating QoS Handler Failed (%N|%l)\n"),
- -1);
- }
return transport->set_remote_address (*inet_addr);
}
@@ -515,7 +331,7 @@ TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n",
+ "TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n",
this->qos_sock_dgram_.get_handle ()));
return this->qos_sock_dgram_.get_handle () ;
@@ -613,25 +429,24 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
0,
0) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Error in dgram_mcast.send () (%N|%l)\n"),
+ "Error in dgram_mcast.send ()\n"),
-1);
else
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
+ ACE_DEBUG ((LM_DEBUG,
+ "Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
- if (n < 1)
+ if (n < 1)
return n;
- nbytes += bytes_sent;
- iovcnt = 0;
- }
- }
+ nbytes += bytes_sent;
+ iovcnt = 0;
+ }
+ }
}
size_t bytes_sent = 0;
-
+
// Check for remaining buffers to be sent!
if (iovcnt != 0)
{
@@ -646,10 +461,9 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
"Error in dgram_mcast.send ()\n"),
-1);
else
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
+ ACE_DEBUG ((LM_DEBUG,
+ "Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
if (n < 1)
return n;
@@ -667,13 +481,13 @@ TAO_AV_UDP_QoS_Transport::send (const char *buf,
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_Transport::send "));
+ "TAO_AV_UDP_QoS_Transport::send "));
char addr [BUFSIZ];
this->peer_addr_.addr_to_string (addr,BUFSIZ);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) to %s\n",
+ "to %s\n",
addr));
return this->handler_->get_socket ()->send (buf,
@@ -701,13 +515,9 @@ TAO_AV_UDP_QoS_Transport::send (const iovec *iov,
"Error in dgram_mcast.send ()\n"),
-1);
else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
- }
-
+ ACE_DEBUG ((LM_DEBUG,
+ "Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
return bytes_sent;
}
@@ -761,12 +571,27 @@ TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *hand
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Acceptor Svc Handler QOS ENABLED \n"));
-
- TAO_AV_UDP_QoS_Session_Helper helper;
+ "Acceptor Svc Handler QOS ENABLED \n"));
- result = helper.activate_qos_handler (handler->qos_session (),
- handler);
+ ACE_QoS_Decorator* qos_decorator;
+
+ // Decorate the above handler with QoS functionality.
+ ACE_NEW_RETURN (qos_decorator,
+ ACE_QoS_Decorator (handler,
+ handler->qos_session (),
+ this->av_core_->reactor ()),
+ -1);
+
+ // Initialize the Decorator.
+ if (qos_decorator->init () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "QoS Decorator init () failed.\n"),
+ -1);
+
+ // Register the decorated Event Handler with the Reactor.
+ result = this->av_core_->reactor ()->register_handler (qos_decorator,
+ ACE_Event_Handler::QOS_MASK |
+ ACE_Event_Handler::READ_MASK);
if (result == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in registering the Decorator with the Reactor\n"),
@@ -783,7 +608,7 @@ TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open "));
+ "TAO_AV_UDP_QoS_Acceptor::open "));
this->av_core_ = av_core;
this->endpoint_ = endpoint;
@@ -800,7 +625,7 @@ TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
BUFSIZ);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open: %s",
+ "TAO_AV_UDP_QoS_Acceptor::open: %s",
buf));
int result = this->open_i (inet_addr);
@@ -829,13 +654,13 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
addr += ":8000";
ACE_INET_Addr *address;
ACE_NEW_RETURN (address,
- ACE_INET_Addr ("0"),
+ ACE_INET_Addr (addr.c_str ()),
-1);
address->addr_to_string (buf,
BUFSIZ);
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) ADDRESS IS %s\n",
+ "ADDRESS IS %s\n",
buf));
int result = this->open_i (address);
@@ -848,20 +673,19 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
int
TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
{
+
ACE_DECLARE_NEW_CORBA_ENV;
int result = 0;
-
+
// TAO_AV_Callback *callback = 0;
// this->endpoint_->get_callback (this->flowname_.c_str (),
// callback);
ACE_INET_Addr *local_addr;
- ACE_NEW_RETURN (local_addr,
- ACE_INET_Addr (*inet_addr),
- -1);
-
ACE_QoS_Params qos_params;
+ ACE_Flow_Spec ace_flow_spec;
+
ACE_QoS* ace_qos = 0;
FillQoSParams (qos_params,
@@ -869,128 +693,160 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
ace_qos);
+ AVStreams::QoS qos;
+
+ int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
+ qos);
+
TAO_AV_UDP_QoS_Flow_Handler* handler;
ACE_NEW_RETURN (handler,
TAO_AV_UDP_QoS_Flow_Handler,
-1);
-
+
TAO_AV_Flow_Handler *flow_handler = 0;
flow_handler = handler;
+
+ // Create a QoS Session Factory.
+ ACE_QoS_Session_Factory session_factory;
+
+ // Ask the factory to create a QoS session. This could be RAPI or
+ // GQoS based on the parameter passed.
+
+ //@@YAmuna : Later make this generic for GQoS
+ this->qos_session_ =
+ session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
+
+ // Create a destination address for the QoS session. The same
+ // address should be used for the subscribe call later. A copy
+ // is made below only to distinguish the two usages of the dest
+ // address.
+ ACE_INET_Addr dest_addr (*inet_addr);
- handler->endpoint (this->endpoint_);
- handler->flowspec_entry (this->entry_);
- handler->av_core (this->av_core_);
+ // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
+ // Protocol]. Initialize the QoS session.
+ if (this->qos_session_->open (*inet_addr,
+ IPPROTO_UDP) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in opening the QoS session\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "QoS session opened successfully\n"));
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
-
- TAO_AV_UDP_QoS_Session_Helper helper;
-
- int result = handler->get_socket ()->open (*inet_addr,
- qos_params,
- AF_INET,
- 0,
- 0,
- 0,
- ACE_OVERLAPPED_SOCKET_FLAG
- | ACE_FLAG_MULTIPOINT_C_LEAF
- | ACE_FLAG_MULTIPOINT_D_LEAF,
- 1);
-
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
- -1);
+
+ result = handler->get_socket ()->subscribe (*inet_addr,
+ qos_params,
+ 1,
+ 0,
+ AF_INET,
+ // ACE_FROM_PROTOCOL_INFO,
+ 0,
+ 0, // ACE_Protocol_Info,
+ 0,
+ ACE_OVERLAPPED_SOCKET_FLAG
+ | ACE_FLAG_MULTIPOINT_C_LEAF
+ | ACE_FLAG_MULTIPOINT_D_LEAF,
+ this->qos_session_);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
+ else ACE_DEBUG ((LM_DEBUG,
+ "Subscribe succeeded\n"));
- result = handler->get_socket ()->get_local_addr (*local_addr);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in getting Local Address (%N|%l)\n"),
- -1);
+ handler->qos_session (this->qos_session_);
- // Create a destination address for the QoS session. The same
- // address should be used for the subscribe call later. A copy
- // is made below only to distinguish the two usages of the dest
- // address.
- ACE_INET_Addr dest_addr;
- dest_addr.set (local_addr->get_port_number (),
- local_addr->get_host_name ());
+ ACE_NEW_RETURN (local_addr,
+ ACE_INET_Addr (*inet_addr),
+ -1);
+
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ // This is a sender
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
+ }
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ // This is a receiver
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
+ }
+
+ if (qos_available == 0)
+ {
+
+ handler->translate (qos.QoSParams,
+ &ace_flow_spec);
- char dest_buf [BUFSIZ];
- dest_addr.addr_to_string (dest_buf,
- BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Session Address is %s\n",
- dest_buf));
+ ACE_NEW_RETURN (ace_qos,
+ ACE_QoS,
+ -1);
- this->qos_session_ = helper.open_qos_session (handler,
- dest_addr);
-
- if (this->qos_session_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "QoS Session Open Failed (%N|%l)\n"),
- -1);
-
- handler->qos_session (this->qos_session_);
-
- if (this->activate_svc_handler (handler) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Activate Svc Handler Failed (%N|%l)\n"),
- -1);
-
- AVStreams::QoS qos;
- int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
- qos);
+ Fill_ACE_QoS fill_ace_qos;
- if (qos_available == 0)
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
{
-
- ACE_Flow_Spec *ace_flow_spec = 0;
- ACE_NEW_RETURN (ace_flow_spec,
- ACE_Flow_Spec,
- -1);
-
- handler->translate (qos.QoSParams,
- ace_flow_spec);
-
- if (helper.set_qos (*ace_flow_spec,
- handler) == -1)
-
+ if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
+ &ace_flow_spec) !=0)
ACE_ERROR_RETURN ((LM_ERROR,
- "Set QoS Failed (%N|%l)\n"),
+ "Unable to fill simplex sender qos\n"),
-1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Sender QoS parameters\n"));
}
- }
- else
- {
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
+ &ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex receiver qos\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Receiver QoS parameters\n"));
+
+ }
+
+
+
+ this->qos_manager_
+ = handler->get_socket ()->qos_manager ();
- int result = handler->get_socket ()->open (*inet_addr,
- qos_params,
- AF_INET,
- 0,
- 0,
- 0,
- ACE_OVERLAPPED_SOCKET_FLAG
- | ACE_FLAG_MULTIPOINT_C_LEAF
- | ACE_FLAG_MULTIPOINT_D_LEAF,
- 1);
-
- if (result < 0)
+ // Set the QoS for the session. Replaces the ioctl () call that
+ // was being made previously.
+ if (this->qos_session_->qos (handler->get_socket (),
+ &this->qos_manager_,
+ *ace_qos) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
+ "Unable to set QoS\n"),
-1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Setting QOS succeeds.\n"));
}
+
+ ACE_DEBUG ((LM_DEBUG,
+ "It reached here \n"));
+
+ // ACE_Time_Value era (10);
+ //this->adaptor ().flowspec_entry (this->entry_);
+ //this->adaptor ().endpoint (this->endpoint_);
+// TAO_AV_CORE::instance ()->monitor ().register_session (this->qos_session_,
+ // adaptor,
+ // (void *) TAO_AV_CORE::instance ()->get_adaptor ((char*)this->flowname ()),
+ // era,
+ // QoS_Monitor::PEAK_BANDWIDTH);
+
+ //ll_ace_qos.map ().unbind (g_711);
TAO_AV_Protocol_Object *object =
this->flow_protocol_factory_->make_protocol_object (this->entry_,
- this->endpoint_,
- flow_handler,
- flow_handler->transport ());
+ this->endpoint_,
+ flow_handler,
+ flow_handler->transport ());
flow_handler->protocol_object (object);
-
+
AVStreams::Negotiator_ptr negotiator;
ACE_TRY_EX (negotiator)
@@ -1006,34 +862,40 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
ACE_CATCHANY
{
ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Negotiator Not Found \n"));
+ "Negotiator Not Found \n"));
}
ACE_ENDTRY;
-
+
+ // callback->protocol_object (object);
+ // this->endpoint_->set_protocol_object (this->flowname_.c_str (),
+ // object);
this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
this->entry_->protocol_object (object);
+ char buf[BUFSIZ];
+ local_addr->addr_to_string (buf,BUFSIZ);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_QoS_ACCEPTOR::open:%s \n",
+ buf));
+
result = handler->get_socket ()->get_local_addr (*local_addr);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
local_addr->set (local_addr->get_port_number (),
local_addr->get_host_name ());
-
- if (TAO_debug_level > 0)
- {
- char buf [BUFSIZ];
- local_addr->addr_to_string (buf,
- BUFSIZ);
- ACE_DEBUG ((LM_DEBUG,
- "Local Address is %s\n",
- buf));
- }
+
+ local_addr->addr_to_string (buf,
+ BUFSIZ);
+ ACE_DEBUG ((LM_DEBUG,
+ "REMOTE ADDRESS IS %s\n",
+ buf));
this->entry_->set_local_addr (local_addr);
this->entry_->handler (flow_handler);
- return 0;
-
+ // call activate svc handler.
+ return this->activate_svc_handler (handler);
}
int
@@ -1129,11 +991,11 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
this->entry_ = entry;
this->flowname_ = entry->flowname ();
- ACE_INET_Addr *local_addr;
- ACE_NEW_RETURN (local_addr,
- ACE_INET_Addr ("0"),
- -1);
-
+// ACE_INET_Addr *local_addr;
+// ACE_NEW_RETURN (local_addr,
+// ACE_INET_Addr ("0"),
+// -1);
+
TAO_AV_Flow_Handler *flow_handler = 0;
@@ -1141,118 +1003,150 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
ACE_NEW_RETURN (handler,
TAO_AV_UDP_QoS_Flow_Handler,
-1);
-
- flow_handler = handler;
-
- handler->endpoint (this->endpoint_);
- handler->flowspec_entry (this->entry_);
- handler->av_core (this->av_core_);
-
- ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,
- entry->address ());
- ACE_QoS_Params qos_params;
- ACE_QoS* ace_qos = 0;
+ flow_handler = handler;
+ ACE_QoS_Params qos_params;
+
+ ACE_Flow_Spec *ace_flow_spec = 0;
+
+ ACE_QoS* ace_qos;
+
FillQoSParams (qos_params,
0,
ace_qos);
-
- result = handler->get_socket ()->open (*local_addr,
- qos_params,
- AF_INET,
- 0,
- 0,
- 0,
- ACE_OVERLAPPED_SOCKET_FLAG
- | ACE_FLAG_MULTIPOINT_C_LEAF
- | ACE_FLAG_MULTIPOINT_D_LEAF,
- 1);
+
+ AVStreams::QoS qos;
+ int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
+ qos);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Data socket open failed (%N|%l)\n"),
- -1);
- result = handler->get_socket ()->get_local_addr (*local_addr);
+ ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,
+ entry->address ());
-
- ACE_INET_Addr *session_addr = 0;
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- ACE_NEW_RETURN (session_addr,
- ACE_INET_Addr,
- -1);
-
- session_addr->set (local_addr->get_port_number (),
- local_addr->get_host_name ());
-
- }
- else
- {
- session_addr = inet_addr;
- }
-
- char sess_buf [BUFSIZ];
- session_addr->addr_to_string (sess_buf,
- BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Session Address is %s\n",
- sess_buf));
+ // Create a QoS Session Factory.
+ ACE_QoS_Session_Factory session_factory;
+ // Ask the factory to create a QoS session. This could be RAPI or
+ // GQoS based on the parameter passed.
+
+ //@@YAmuna : Later make this generic for GQoS
+ this->qos_session_ =
+ session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
+
+
// Create a destination address for the QoS session. The same
// address should be used for the subscribe call later. A copy
// is made below only to distinguish the two usages of the dest
// address.
- ACE_INET_Addr dest_addr (*session_addr);
-
- TAO_AV_UDP_QoS_Session_Helper helper;
-
- this->qos_session_ = helper.open_qos_session (handler,
- *session_addr);
-
- if (this->qos_session_ == 0)
+ ACE_INET_Addr dest_addr (*inet_addr);
+
+ // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
+ // Protocol]. Initialize the QoS session.
+ if (this->qos_session_->open (*inet_addr,
+ IPPROTO_UDP) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "QoS Session Open Failed (%N|%l)\n"),
- -1);
+ "Error in opening the QoS session\n"),
+ -1);
else
ACE_DEBUG ((LM_DEBUG,
"QoS session opened successfully\n"));
+
+ result = handler->get_socket ()->subscribe (*inet_addr,
+ qos_params,
+ 1,
+ 0,
+ AF_INET,
+ // ACE_FROM_PROTOCOL_INFO,
+ 0,
+ 0, // ACE_Protocol_Info,
+ 0,
+ ACE_OVERLAPPED_SOCKET_FLAG
+ | ACE_FLAG_MULTIPOINT_C_LEAF
+ | ACE_FLAG_MULTIPOINT_D_LEAF,
+ this->qos_session_);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
+ // Now disable Multicast loopback.
+ // @@Should we make this a policy?
+
handler->qos_session (this->qos_session_);
+ ACE_INET_Addr *local_addr;
+
+ ACE_NEW_RETURN (local_addr,
+ ACE_INET_Addr (*inet_addr),
+ -1);
+
+
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ // This is a sender
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
+ }
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ // This is a receiver
+ this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
+ }
+
this->qos_manager_ =
handler->get_socket ()->qos_manager ();
- AVStreams::QoS qos;
-
- int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
- qos);
if (qos_available == 0)
{
-
- ACE_Flow_Spec* ace_flow_spec;
ACE_NEW_RETURN (ace_flow_spec,
ACE_Flow_Spec,
-1);
handler->translate (qos.QoSParams,
- ace_flow_spec);
+ ace_flow_spec);
+
+ ACE_QoS* ace_qos;
+
+ ACE_NEW_RETURN (ace_qos,
+ ACE_QoS,
+ -1);
+
+ Fill_ACE_QoS fill_ace_qos;
- if (helper.set_qos (*ace_flow_spec,
- handler) == -1)
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
+ ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex receiver qos\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Receiver QoS parameters\n"));
+ }
+ else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
+ ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex sender qos\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Sender QoS parameters\n"));
+ }
+
+ // Set the QoS for the session. Replaces the ioctl () call that
+ // was being made previously.
+ if (this->qos_session_->qos (handler->get_socket (),
+ &this->qos_manager_,
+ *ace_qos) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to set QoS (%N|%l)\n"),
+ "Unable to set QoS\n"),
-1);
else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Setting QOS succeeds.\n"));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "Setting QOS succeeds.\n"));
}
TAO_AV_Protocol_Object *object =
@@ -1275,41 +1169,27 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
}
ACE_CATCHANY
{
- ACE_DEBUG ((LM_DEBUG,
- "Negotiator not found for flow %s\n",
- this->entry_->flowname ()));
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_UDP_QoS_Connector::connect");
}
ACE_ENDTRY;
flow_handler->protocol_object (object);
-
+ // callback->protocol_object (object);
+ // this->endpoint_->set_protocol_object (this->flowname_.c_str (),
+ // object);
this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
this->entry_->protocol_object (object);
- result = handler->get_socket ()->get_local_addr (*local_addr);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Get local addr failed (%N|%l)\n"),
- result);
-
- local_addr->set (local_addr->get_port_number (),
- local_addr->get_host_name ());
+ char buf[BUFSIZ];
+ local_addr->addr_to_string (buf,BUFSIZ);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_UDP_CONNECTOR::connect:%s \n",
+ buf));
- if (TAO_debug_level > 0)
- {
- char buf[BUFSIZ];
- local_addr->addr_to_string (buf,
- BUFSIZ);
-
- ACE_DEBUG ((LM_DEBUG,
- "Local Address is %s\n",
- buf));
- }
-
entry->set_local_addr (local_addr);
entry->handler (flow_handler);
transport = flow_handler->transport ();
-
// call activate svc handler.
return this->activate_svc_handler (handler);
}
@@ -1318,15 +1198,29 @@ int
TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
{
int result = 0;
-
- TAO_AV_UDP_QoS_Session_Helper helper;
+ ACE_QoS_Decorator* qos_decorator;
+
+ // Decorate the above handler with QoS functionality.
+ ACE_NEW_RETURN (qos_decorator,
+ ACE_QoS_Decorator (handler,
+ handler->qos_session (),
+ this->av_core_->reactor ()),
+ -1);
+
+ // Initialize the Decorator.
+ if (qos_decorator->init () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "QoS Decorator init () failed.\n"),
+ -1);
+
+ // Register the decorated Event Handler with the Reactor.
+ result = this->av_core_->reactor ()->register_handler (qos_decorator,
+ ACE_Event_Handler::QOS_MASK |
+ ACE_Event_Handler::READ_MASK);
- result = helper.activate_qos_handler (this->qos_session_,
- handler);
-
if (result == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "(%N,%l) Error in registering the Decorator with the Reactor\n"),
+ "Error in registering the Decorator with the Reactor\n"),
-1);
return result;