summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-08-04 17:43:45 +0000
committeryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-08-04 17:43:45 +0000
commit9928c12e788aab257b0cd1ecbb4606735260d316 (patch)
treeb567e8fae8e408771ff34610f9686db7822a7fe1
parentff56c2606b0a35149a1533ed2afb337c43a26e92 (diff)
downloadATCD-9928c12e788aab257b0cd1ecbb4606735260d316.tar.gz
ChangeLogTag: ri Aug 04 3:33:31 2001 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp811
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h36
2 files changed, 466 insertions, 381 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
index 27e6888ee11..1b944625014 100644
--- a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
@@ -32,6 +32,154 @@ FillQoSParams (ACE_QoS_Params &qos_params,
return 0;
}
+TAO_AV_UDP_QoS_Session_Helper::TAO_AV_UDP_QoS_Session_Helper (void)
+{
+
+}
+
+TAO_AV_UDP_QoS_Session_Helper::~TAO_AV_UDP_QoS_Session_Helper (void)
+{
+}
+
+int
+TAO_AV_UDP_QoS_Session_Helper::set_qos (ACE_Flow_Spec &ace_flow_spec,
+ TAO_AV_UDP_QoS_Flow_Handler *handler)
+{
+ ACE_QoS* ace_qos = 0;
+
+ ACE_NEW_RETURN (ace_qos,
+ ACE_QoS,
+ -1);
+
+ Fill_ACE_QoS fill_ace_qos;
+
+ if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
+ &ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex sender qos (%N|%l)\n"),
+ -1);
+ else
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Sender QoS parameters\n"));
+ }
+ else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
+ &ace_flow_spec) !=0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to fill simplex receiver qos (%N|%l)\n"),
+ -1);
+ else
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Filled up the Receiver QoS parameters\n"));
+
+ }
+
+ ACE_QoS_Manager qos_manager = handler->get_socket ()->qos_manager ();
+
+ // Set the QoS for the session. Replaces the ioctl () call that
+ // was being made previously.
+ if (handler->qos_session ()->qos (handler->get_socket (),
+ &qos_manager,
+ *ace_qos) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to set QoS (%N|%l)\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Setting QOS succeeds\n"));
+
+ return 0;
+}
+
+ACE_QoS_Session *
+TAO_AV_UDP_QoS_Session_Helper::open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler,
+ ACE_INET_Addr &addr)
+{
+ ACE_QoS_Params qos_params;
+
+ ACE_QoS* ace_qos = 0;
+
+ FillQoSParams (qos_params,
+ 0,
+ ace_qos);
+
+
+ // Create a QoS Session Factory.
+ ACE_QoS_Session_Factory session_factory;
+
+ // Ask the factory to create a QoS session. This could be RAPI or
+ // GQoS based on the parameter passed.
+
+ //@@YAmuna : Later make this generic for GQoS
+ ACE_QoS_Session *qos_session = session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
+
+ // Create a destination address for the QoS session. The same
+ // address should be used for the subscribe call later. A copy
+ // is made below only to distinguish the two usages of the dest
+ // address.
+ ACE_INET_Addr dest_addr (addr);
+
+ // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
+ // Protocol]. Initialize the QoS session.
+ if (qos_session->open (dest_addr,
+ IPPROTO_UDP) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in opening the QoS session\n"),
+ 0);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "QoS session opened successfully\n"));
+
+ if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+ // This is a sender
+ qos_session->flags (ACE_QoS_Session::ACE_QOS_SENDER);
+ }
+ else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ // This is a receiver
+ qos_session->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
+ }
+
+ return qos_session;
+}
+
+int
+TAO_AV_UDP_QoS_Session_Helper::activate_qos_handler (ACE_QoS_Session *qos_session,
+ TAO_AV_UDP_QoS_Flow_Handler *handler)
+{
+ ACE_QoS_Decorator* qos_decorator;
+
+ // Decorate the above handler with QoS functionality.
+ ACE_NEW_RETURN (qos_decorator,
+ ACE_QoS_Decorator (handler,
+ qos_session,
+ handler->av_core ()->reactor ()),
+ -1);
+
+ // Initialize the Decorator.
+ if (qos_decorator->init () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "QoS Decorator init () failed (%N|%l)\n"),
+ -1);
+
+ // Register the decorated Event Handler with the Reactor.
+ int result = handler->av_core ()->reactor ()->register_handler (qos_decorator,
+ ACE_Event_Handler::QOS_MASK |
+ ACE_Event_Handler::READ_MASK);
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in registering the Decorator with the Reactor (%N|%l)\n"),
+ -1);
+
+ return 0;
+
+}
TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void)
{
@@ -134,35 +282,34 @@ int
TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec,
CosPropertyService::Properties &qos_params)
{
- int size = qos_params.length ();
- qos_params.length (size + 1);
+ qos_params.length (9);
- qos_params [size].property_name = CORBA::string_dup ("Service_Type");
- qos_params [size].property_value <<= (CORBA::Short) ace_flow_spec->service_type ();
+ qos_params [0].property_name = CORBA::string_dup ("Service_Type");
+ qos_params [0].property_value <<= (CORBA::Short) ace_flow_spec->service_type ();
- qos_params [size].property_name = CORBA::string_dup ("Token_Rate");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate ();
+ qos_params [1].property_name = CORBA::string_dup ("Token_Rate");
+ qos_params [1].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate ();
- qos_params [size].property_name = CORBA::string_dup ("Token_Bucket_Size");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size ();
+ qos_params [2].property_name = CORBA::string_dup ("Token_Bucket_Size");
+ qos_params [2].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size ();
- qos_params [size].property_name = CORBA::string_dup ("Peak_Bandwidth");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth ();
+ qos_params [3].property_name = CORBA::string_dup ("Peak_Bandwidth");
+ qos_params [3].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth ();
- qos_params [size].property_name = CORBA::string_dup ("Latency");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->latency ();
+ qos_params [4].property_name = CORBA::string_dup ("Latency");
+ qos_params [4].property_value <<= (CORBA::ULong) ace_flow_spec->latency ();
- qos_params [size].property_name = CORBA::string_dup ("Delay_Variation");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation ();
+ qos_params [5].property_name = CORBA::string_dup ("Delay_Variation");
+ qos_params [5].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation ();
- qos_params [size].property_name = CORBA::string_dup ("Max_SDU_Size");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size ();
+ qos_params [6].property_name = CORBA::string_dup ("Max_SDU_Size");
+ qos_params [6].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size ();
- qos_params [size].property_name = CORBA::string_dup ("Minimum_Policed_Size");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size ();
+ qos_params [7].property_name = CORBA::string_dup ("Minimum_Policed_Size");
+ qos_params [7].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size ();
- qos_params [size].property_name = CORBA::string_dup ("TTL");
- qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->ttl ();
+ qos_params [8].property_name = CORBA::string_dup ("TTL");
+ qos_params [8].property_value <<= (CORBA::ULong) ace_flow_spec->ttl ();
return 0;
}
@@ -216,7 +363,7 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
AVStreams::streamQoS new_qos;
ACE_Flow_Spec ace_flow_spec =
- this->qos_session_->qos ().receiving_flowspec ();
+ this->qos_session_->qos ().sending_flowspec ();
new_qos.length (1);
this->translate (&ace_flow_spec,
new_qos [0].QoSParams);
@@ -234,10 +381,12 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
{
ACE_QoS_Manager qos_manager =
this->get_socket ()->qos_manager ();
-
+
+ ACE_QoS ace_qos = this->qos_session_->qos ();
+
this->qos_session_->qos (this->get_socket (),
&qos_manager,
- this->qos_session_->qos ());
+ ace_qos);
}
}
@@ -337,7 +486,26 @@ TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
TAO_AV_UDP_QoS_Transport *transport =
ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_);
+
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ {
+
+ TAO_AV_UDP_QoS_Session_Helper helper;
+ this->qos_session_ = helper.open_qos_session (this,
+ *inet_addr);
+
+ if (this->qos_session_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "QoS Session Open Failed (%N|%l)\n"),
+ -1);
+
+ if (helper.activate_qos_handler (this->qos_session_,
+ this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Activating QoS Handler Failed (%N|%l)\n"),
+ -1);
+ }
return transport->set_remote_address (*inet_addr);
}
@@ -445,30 +613,25 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
0,
0) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Error in dgram_mcast.send ()\n"),
+ "Error in dgram_mcast.send () (%N|%l)\n"),
-1);
else
- {
- if( TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
- }
-
- }
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
- if (n < 1)
+ if (n < 1)
return n;
- nbytes += bytes_sent;
- iovcnt = 0;
- }
- }
+ nbytes += bytes_sent;
+ iovcnt = 0;
+ }
+ }
}
size_t bytes_sent = 0;
-
+
// Check for remaining buffers to be sent!
if (iovcnt != 0)
{
@@ -483,14 +646,10 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
"Error in dgram_mcast.send ()\n"),
-1);
else
- {
if( TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
- }
- }
+ ACE_DEBUG ((LM_DEBUG,
+ "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
+ bytes_sent));
if (n < 1)
return n;
@@ -548,6 +707,7 @@ TAO_AV_UDP_QoS_Transport::send (const iovec *iov,
"(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
bytes_sent));
}
+
return bytes_sent;
}
@@ -603,25 +763,10 @@ TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *hand
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) Acceptor Svc Handler QOS ENABLED \n"));
- ACE_QoS_Decorator* qos_decorator;
-
- // Decorate the above handler with QoS functionality.
- ACE_NEW_RETURN (qos_decorator,
- ACE_QoS_Decorator (handler,
- handler->qos_session (),
- this->av_core_->reactor ()),
- -1);
-
- // Initialize the Decorator.
- if (qos_decorator->init () != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "QoS Decorator init () failed.\n"),
- -1);
-
- // Register the decorated Event Handler with the Reactor.
- result = this->av_core_->reactor ()->register_handler (qos_decorator,
- ACE_Event_Handler::QOS_MASK |
- ACE_Event_Handler::READ_MASK);
+ TAO_AV_UDP_QoS_Session_Helper helper;
+
+ result = helper.activate_qos_handler (handler->qos_session (),
+ handler);
if (result == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in registering the Decorator with the Reactor\n"),
@@ -684,7 +829,7 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
addr += ":8000";
ACE_INET_Addr *address;
ACE_NEW_RETURN (address,
- ACE_INET_Addr (addr.c_str ()),
+ ACE_INET_Addr ("0"),
-1);
address->addr_to_string (buf,
@@ -703,18 +848,19 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
int
TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
{
-
ACE_DECLARE_NEW_CORBA_ENV;
int result = 0;
-
+
// TAO_AV_Callback *callback = 0;
// this->endpoint_->get_callback (this->flowname_.c_str (),
// callback);
ACE_INET_Addr *local_addr;
- ACE_QoS_Params qos_params;
+ ACE_NEW_RETURN (local_addr,
+ ACE_INET_Addr (*inet_addr),
+ -1);
- ACE_Flow_Spec ace_flow_spec;
+ ACE_QoS_Params qos_params;
ACE_QoS* ace_qos = 0;
@@ -723,173 +869,128 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
ace_qos);
- AVStreams::QoS qos;
-
- int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
- qos);
-
TAO_AV_UDP_QoS_Flow_Handler* handler;
ACE_NEW_RETURN (handler,
TAO_AV_UDP_QoS_Flow_Handler,
-1);
-
+
TAO_AV_Flow_Handler *flow_handler = 0;
flow_handler = handler;
-
- // Create a QoS Session Factory.
- ACE_QoS_Session_Factory session_factory;
-
- // Ask the factory to create a QoS session. This could be RAPI or
- // GQoS based on the parameter passed.
-
- //@@YAmuna : Later make this generic for GQoS
- this->qos_session_ =
- session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
-
- // Create a destination address for the QoS session. The same
- // address should be used for the subscribe call later. A copy
- // is made below only to distinguish the two usages of the dest
- // address.
- ACE_INET_Addr dest_addr (*inet_addr);
-
- // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
- // Protocol]. Initialize the QoS session.
- if (this->qos_session_->open (*inet_addr,
- IPPROTO_UDP) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in opening the QoS session\n"),
- -1);
- else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) QoS session opened successfully\n"));
- }
-
-
- result = handler->get_socket ()->subscribe (*inet_addr,
- qos_params,
- 1,
- 0,
- AF_INET,
- // ACE_FROM_PROTOCOL_INFO,
- 0,
- 0, // ACE_Protocol_Info,
- 0,
- ACE_OVERLAPPED_SOCKET_FLAG
- | ACE_FLAG_MULTIPOINT_C_LEAF
- | ACE_FLAG_MULTIPOINT_D_LEAF,
- this->qos_session_);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
- else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Subscribe succeeded\n"));
- }
+ handler->endpoint (this->endpoint_);
+ handler->flowspec_entry (this->entry_);
+ handler->av_core (this->av_core_);
- handler->qos_session (this->qos_session_);
-
- ACE_NEW_RETURN (local_addr,
- ACE_INET_Addr (*inet_addr),
- -1);
-
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
{
- // This is a sender
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- // This is a receiver
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
- }
+
+ TAO_AV_UDP_QoS_Session_Helper helper;
+
+ int result = handler->get_socket ()->open (*inet_addr,
+ qos_params,
+ AF_INET,
+ 0,
+ 0,
+ 0,
+ ACE_OVERLAPPED_SOCKET_FLAG
+ | ACE_FLAG_MULTIPOINT_C_LEAF
+ | ACE_FLAG_MULTIPOINT_D_LEAF,
+ 1);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
+ -1);
- if (qos_available == 0)
- {
-
- handler->translate (qos.QoSParams,
- &ace_flow_spec);
+ result = handler->get_socket ()->get_local_addr (*local_addr);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in getting Local Address (%N|%l)\n"),
+ -1);
+
+ // Create a destination address for the QoS session. The same
+ // address should be used for the subscribe call later. A copy
+ // is made below only to distinguish the two usages of the dest
+ // address.
+ ACE_INET_Addr dest_addr;
+ dest_addr.set (local_addr->get_port_number (),
+ local_addr->get_host_name ());
+ char dest_buf [BUFSIZ];
+ dest_addr.addr_to_string (dest_buf,
+ BUFSIZ);
- ACE_NEW_RETURN (ace_qos,
- ACE_QoS,
- -1);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Session Address is %s\n",
+ dest_buf));
- Fill_ACE_QoS fill_ace_qos;
+ this->qos_session_ = helper.open_qos_session (handler,
+ dest_addr);
+
+ if (this->qos_session_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "QoS Session Open Failed (%N|%l)\n"),
+ -1);
+
+ handler->qos_session (this->qos_session_);
+
+ if (this->activate_svc_handler (handler) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Activate Svc Handler Failed (%N|%l)\n"),
+ -1);
+
+ AVStreams::QoS qos;
+ int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
+ qos);
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
+ if (qos_available == 0)
{
- if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
- &ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex sender qos\n"),
- -1);
- else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Filled up the Sender QoS parameters\n"));
- }
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
- &ace_flow_spec) !=0)
+
+ ACE_Flow_Spec *ace_flow_spec = 0;
+ ACE_NEW_RETURN (ace_flow_spec,
+ ACE_Flow_Spec,
+ -1);
+
+ handler->translate (qos.QoSParams,
+ ace_flow_spec);
+
+ if (helper.set_qos (*ace_flow_spec,
+ handler) == -1)
+
ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex receiver qos\n"),
+ "Set QoS Failed (%N|%l)\n"),
-1);
- else
- {
- if(TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Filled up the Receiver QoS parameters\n"));
- }
-
}
+ }
+ else
+ {
-
-
- this->qos_manager_
- = handler->get_socket ()->qos_manager ();
-
- // Set the QoS for the session. Replaces the ioctl () call that
- // was being made previously.
- if (this->qos_session_->qos (handler->get_socket (),
- &this->qos_manager_,
- *ace_qos) == -1)
+ int result = handler->get_socket ()->open (*inet_addr,
+ qos_params,
+ AF_INET,
+ 0,
+ 0,
+ 0,
+ ACE_OVERLAPPED_SOCKET_FLAG
+ | ACE_FLAG_MULTIPOINT_C_LEAF
+ | ACE_FLAG_MULTIPOINT_D_LEAF,
+ 1);
+
+ if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to set QoS\n"),
+ "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
-1);
- else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Setting QOS succeeds.\n"));
- }
}
-
- // ACE_Time_Value era (10);
- //this->adaptor ().flowspec_entry (this->entry_);
- //this->adaptor ().endpoint (this->endpoint_);
-// TAO_AV_CORE::instance ()->monitor ().register_session (this->qos_session_,
- // adaptor,
- // (void *) TAO_AV_CORE::instance ()->get_adaptor ((char*)this->flowname ()),
- // era,
- // QoS_Monitor::PEAK_BANDWIDTH);
-
- //ll_ace_qos.map ().unbind (g_711);
TAO_AV_Protocol_Object *object =
this->flow_protocol_factory_->make_protocol_object (this->entry_,
- this->endpoint_,
- flow_handler,
- flow_handler->transport ());
+ this->endpoint_,
+ flow_handler,
+ flow_handler->transport ());
flow_handler->protocol_object (object);
-
+
AVStreams::Negotiator_ptr negotiator;
ACE_TRY_EX (negotiator)
@@ -908,37 +1009,31 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
"(%N,%l) Negotiator Not Found \n"));
}
ACE_ENDTRY;
-
- // callback->protocol_object (object);
- // this->endpoint_->set_protocol_object (this->flowname_.c_str (),
- // object);
+
this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
this->entry_->protocol_object (object);
- char buf[BUFSIZ];
- local_addr->addr_to_string (buf,BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_QoS_ACCEPTOR::open:%s \n",
- buf));
-
result = handler->get_socket ()->get_local_addr (*local_addr);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
local_addr->set (local_addr->get_port_number (),
local_addr->get_host_name ());
-
- local_addr->addr_to_string (buf,
- BUFSIZ);
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) REMOTE ADDRESS IS %s\n",
- buf));
+
+ if (TAO_debug_level > 0)
+ {
+ char buf [BUFSIZ];
+ local_addr->addr_to_string (buf,
+ BUFSIZ);
+ ACE_DEBUG ((LM_DEBUG,
+ "Local Address is %s\n",
+ buf));
+ }
this->entry_->set_local_addr (local_addr);
this->entry_->handler (flow_handler);
- // call activate svc handler.
- return this->activate_svc_handler (handler);
+ return 0;
+
}
int
@@ -1034,11 +1129,11 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
this->entry_ = entry;
this->flowname_ = entry->flowname ();
-// ACE_INET_Addr *local_addr;
-// ACE_NEW_RETURN (local_addr,
-// ACE_INET_Addr ("0"),
-// -1);
-
+ ACE_INET_Addr *local_addr;
+ ACE_NEW_RETURN (local_addr,
+ ACE_INET_Addr ("0"),
+ -1);
+
TAO_AV_Flow_Handler *flow_handler = 0;
@@ -1046,155 +1141,111 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
ACE_NEW_RETURN (handler,
TAO_AV_UDP_QoS_Flow_Handler,
-1);
-
-
+
flow_handler = handler;
+
+ handler->endpoint (this->endpoint_);
+ handler->flowspec_entry (this->entry_);
+ handler->av_core (this->av_core_);
+
+ ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,
+ entry->address ());
ACE_QoS_Params qos_params;
-
- ACE_Flow_Spec *ace_flow_spec = 0;
-
- ACE_QoS* ace_qos;
-
+
+ ACE_QoS* ace_qos = 0;
+
FillQoSParams (qos_params,
0,
ace_qos);
-
- AVStreams::QoS qos;
- int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
- qos);
-
- ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,
- entry->address ());
-
-
- // Create a QoS Session Factory.
- ACE_QoS_Session_Factory session_factory;
+ result = handler->get_socket ()->open (*local_addr,
+ qos_params,
+ AF_INET,
+ 0,
+ 0,
+ 0,
+ ACE_OVERLAPPED_SOCKET_FLAG
+ | ACE_FLAG_MULTIPOINT_C_LEAF
+ | ACE_FLAG_MULTIPOINT_D_LEAF,
+ 1);
- // Ask the factory to create a QoS session. This could be RAPI or
- // GQoS based on the parameter passed.
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Data socket open failed (%N|%l)\n"),
+ -1);
- //@@YAmuna : Later make this generic for GQoS
- this->qos_session_ =
- session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
+ result = handler->get_socket ()->get_local_addr (*local_addr);
+
+ ACE_INET_Addr *session_addr = 0;
+ if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
+ {
+ ACE_NEW_RETURN (session_addr,
+ ACE_INET_Addr,
+ -1);
+
+ session_addr->set (local_addr->get_port_number (),
+ local_addr->get_host_name ());
+
+ }
+ else
+ {
+ session_addr = inet_addr;
+ }
+ char sess_buf [BUFSIZ];
+ session_addr->addr_to_string (sess_buf,
+ BUFSIZ);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Session Address is %s\n",
+ sess_buf));
+
// Create a destination address for the QoS session. The same
// address should be used for the subscribe call later. A copy
// is made below only to distinguish the two usages of the dest
// address.
- ACE_INET_Addr dest_addr (*inet_addr);
-
- // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
- // Protocol]. Initialize the QoS session.
- if (this->qos_session_->open (*inet_addr,
- IPPROTO_UDP) == -1)
+ ACE_INET_Addr dest_addr (*session_addr);
+
+ TAO_AV_UDP_QoS_Session_Helper helper;
+
+ this->qos_session_ = helper.open_qos_session (handler,
+ *session_addr);
+
+ if (this->qos_session_ == 0)
ACE_ERROR_RETURN ((LM_ERROR,
- "Error in opening the QoS session\n"),
- -1);
+ "QoS Session Open Failed (%N|%l)\n"),
+ -1);
else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) QoS session opened successfully\n"));
- }
-
-
- result = handler->get_socket ()->subscribe (*inet_addr,
- qos_params,
- 1,
- 0,
- AF_INET,
- // ACE_FROM_PROTOCOL_INFO,
- 0,
- 0, // ACE_Protocol_Info,
- 0,
- ACE_OVERLAPPED_SOCKET_FLAG
- | ACE_FLAG_MULTIPOINT_C_LEAF
- | ACE_FLAG_MULTIPOINT_D_LEAF,
- this->qos_session_);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
- // Now disable Multicast loopback.
- // @@Should we make this a policy?
+ ACE_DEBUG ((LM_DEBUG,
+ "QoS session opened successfully\n"));
handler->qos_session (this->qos_session_);
- ACE_INET_Addr *local_addr;
-
- ACE_NEW_RETURN (local_addr,
- ACE_INET_Addr (*inet_addr),
- -1);
-
-
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- // This is a sender
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- // This is a receiver
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
- }
-
this->qos_manager_ =
handler->get_socket ()->qos_manager ();
+ AVStreams::QoS qos;
+
+ int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
+ qos);
if (qos_available == 0)
{
+
+ ACE_Flow_Spec* ace_flow_spec;
ACE_NEW_RETURN (ace_flow_spec,
ACE_Flow_Spec,
-1);
handler->translate (qos.QoSParams,
- ace_flow_spec);
-
- ACE_QoS* ace_qos;
-
- ACE_NEW_RETURN (ace_qos,
- ACE_QoS,
- -1);
-
- Fill_ACE_QoS fill_ace_qos;
+ ace_flow_spec);
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
- ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex receiver qos\n"),
- -1);
- else
- {
- if( TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Filled up the Receiver QoS parameters\n"));
- }
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
- ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex sender qos\n"),
- -1);
- else
- {
- if(TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) Filled up the Sender QoS parameters\n"));
- }
- }
-
- // Set the QoS for the session. Replaces the ioctl () call that
- // was being made previously.
- if (this->qos_session_->qos (handler->get_socket (),
- &this->qos_manager_,
- *ace_qos) == -1)
+ if (helper.set_qos (*ace_flow_spec,
+ handler) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to set QoS\n"),
+ "Unable to set QoS (%N|%l)\n"),
-1);
else
{
@@ -1224,27 +1275,41 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
}
ACE_CATCHANY
{
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_UDP_QoS_Connector::connect");
+ ACE_DEBUG ((LM_DEBUG,
+ "Negotiator not found for flow %s\n",
+ this->entry_->flowname ()));
}
ACE_ENDTRY;
flow_handler->protocol_object (object);
- // callback->protocol_object (object);
- // this->endpoint_->set_protocol_object (this->flowname_.c_str (),
- // object);
+
this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
this->entry_->protocol_object (object);
- char buf[BUFSIZ];
- local_addr->addr_to_string (buf,BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%N,%l) TAO_AV_UDP_CONNECTOR::connect:%s \n",
- buf));
+ result = handler->get_socket ()->get_local_addr (*local_addr);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Get local addr failed (%N|%l)\n"),
+ result);
+ local_addr->set (local_addr->get_port_number (),
+ local_addr->get_host_name ());
+
+ if (TAO_debug_level > 0)
+ {
+ char buf[BUFSIZ];
+ local_addr->addr_to_string (buf,
+ BUFSIZ);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Local Address is %s\n",
+ buf));
+ }
+
entry->set_local_addr (local_addr);
entry->handler (flow_handler);
transport = flow_handler->transport ();
+
// call activate svc handler.
return this->activate_svc_handler (handler);
}
@@ -1253,26 +1318,12 @@ int
TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
{
int result = 0;
- ACE_QoS_Decorator* qos_decorator;
-
- // Decorate the above handler with QoS functionality.
- ACE_NEW_RETURN (qos_decorator,
- ACE_QoS_Decorator (handler,
- handler->qos_session (),
- this->av_core_->reactor ()),
- -1);
-
- // Initialize the Decorator.
- if (qos_decorator->init () != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%N,%l) QoS Decorator init () failed.\n"),
- -1);
-
- // Register the decorated Event Handler with the Reactor.
- result = this->av_core_->reactor ()->register_handler (qos_decorator,
- ACE_Event_Handler::QOS_MASK |
- ACE_Event_Handler::READ_MASK);
+
+ TAO_AV_UDP_QoS_Session_Helper helper;
+ result = helper.activate_qos_handler (this->qos_session_,
+ handler);
+
if (result == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%N,%l) Error in registering the Decorator with the Reactor\n"),
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h
index 7c0a9f11437..981f1c5a2da 100644
--- a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h
+++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h
@@ -125,18 +125,30 @@ public:
virtual ACE_Event_Handler* event_handler (void){ return this; }
virtual ACE_QoS_Session* qos_session (void);
virtual void qos_session (ACE_QoS_Session *qos_session);
+
int translate (ACE_Flow_Spec *ace_flow_spec,
CosPropertyService::Properties &qos_params);
int translate (CosPropertyService::Properties &qos_params,
ACE_Flow_Spec *ace_flow_spec);
void negotiator (AVStreams::Negotiator_ptr);
+
+ void endpoint (TAO_Base_StreamEndPoint *endpoint);
+ TAO_Base_StreamEndPoint* endpoint (void);
+
+ void flowspec_entry (TAO_FlowSpec_Entry *entry);
+ TAO_FlowSpec_Entry* flowspec_entry (void);
+ void av_core (TAO_AV_Core *avcore);
+ TAO_AV_Core* av_core (void);
+
protected:
TAO_AV_Core *av_core_;
ACE_INET_Addr peer_addr_;
ACE_SOCK_Dgram_Mcast_QoS qos_sock_dgram_;
ACE_QoS_Session *qos_session_;
+ TAO_FlowSpec_Entry *entry_;
+ TAO_Base_StreamEndPoint *endpoint_;
AVStreams::Negotiator_ptr negotiator_;
};
@@ -159,7 +171,7 @@ public:
virtual int open_i (ACE_INET_Addr *address);
virtual int close (void);
- // virtual int activate_svc_handler (TAO_AV_Flow_Handler *handler);
+
virtual int activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler);
@@ -239,6 +251,28 @@ public:
TAO_AV_Transport *transport);
};
+/// Helper class to create qos sessions,
+/// activate qos handlers and set qos
+/// (For separation of concerns)
+class TAO_AV_UDP_QoS_Session_Helper
+{
+public:
+ TAO_AV_UDP_QoS_Session_Helper (void);
+ ~TAO_AV_UDP_QoS_Session_Helper (void);
+
+ /// Open a QoS Session with the specified address
+ ACE_QoS_Session* open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler,
+ ACE_INET_Addr &addr);
+
+ /// Activate the QoS handler to receive QoS events
+ int activate_qos_handler (ACE_QoS_Session *qos_session,
+ TAO_AV_UDP_QoS_Flow_Handler *handler);
+
+ /// Set the required QoS for the session
+ int set_qos (ACE_Flow_Spec& ace_flow_spec,
+ TAO_AV_UDP_QoS_Flow_Handler *handler);
+};
+
ACE_STATIC_SVC_DECLARE (TAO_AV_UDP_QoS_Flow_Factory)
ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_UDP_QoS_Flow_Factory)