diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp | 858 |
1 files changed, 376 insertions, 482 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp index 1b944625014..a5623aad6f8 100644 --- a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp @@ -32,154 +32,6 @@ FillQoSParams (ACE_QoS_Params &qos_params, return 0; } -TAO_AV_UDP_QoS_Session_Helper::TAO_AV_UDP_QoS_Session_Helper (void) -{ - -} - -TAO_AV_UDP_QoS_Session_Helper::~TAO_AV_UDP_QoS_Session_Helper (void) -{ -} - -int -TAO_AV_UDP_QoS_Session_Helper::set_qos (ACE_Flow_Spec &ace_flow_spec, - TAO_AV_UDP_QoS_Flow_Handler *handler) -{ - ACE_QoS* ace_qos = 0; - - ACE_NEW_RETURN (ace_qos, - ACE_QoS, - -1); - - Fill_ACE_QoS fill_ace_qos; - - if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) - { - if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, - &ace_flow_spec) !=0) - ACE_ERROR_RETURN ((LM_ERROR, - "Unable to fill simplex sender qos (%N|%l)\n"), - -1); - else - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Filled up the Sender QoS parameters\n")); - } - else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, - &ace_flow_spec) !=0) - ACE_ERROR_RETURN ((LM_ERROR, - "Unable to fill simplex receiver qos (%N|%l)\n"), - -1); - else - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Filled up the Receiver QoS parameters\n")); - - } - - ACE_QoS_Manager qos_manager = handler->get_socket ()->qos_manager (); - - // Set the QoS for the session. Replaces the ioctl () call that - // was being made previously. - if (handler->qos_session ()->qos (handler->get_socket (), - &qos_manager, - *ace_qos) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Unable to set QoS (%N|%l)\n"), - -1); - else - ACE_DEBUG ((LM_DEBUG, - "Setting QOS succeeds\n")); - - return 0; -} - -ACE_QoS_Session * -TAO_AV_UDP_QoS_Session_Helper::open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler, - ACE_INET_Addr &addr) -{ - ACE_QoS_Params qos_params; - - ACE_QoS* ace_qos = 0; - - FillQoSParams (qos_params, - 0, - ace_qos); - - - // Create a QoS Session Factory. - ACE_QoS_Session_Factory session_factory; - - // Ask the factory to create a QoS session. This could be RAPI or - // GQoS based on the parameter passed. - - //@@YAmuna : Later make this generic for GQoS - ACE_QoS_Session *qos_session = session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); - - // Create a destination address for the QoS session. The same - // address should be used for the subscribe call later. A copy - // is made below only to distinguish the two usages of the dest - // address. - ACE_INET_Addr dest_addr (addr); - - // A QoS session is defined by the 3-tuple [DestAddr, DestPort, - // Protocol]. Initialize the QoS session. - if (qos_session->open (dest_addr, - IPPROTO_UDP) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error in opening the QoS session\n"), - 0); - else - ACE_DEBUG ((LM_DEBUG, - "QoS session opened successfully\n")); - - if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) - { - // This is a sender - qos_session->flags (ACE_QoS_Session::ACE_QOS_SENDER); - } - else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - // This is a receiver - qos_session->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); - } - - return qos_session; -} - -int -TAO_AV_UDP_QoS_Session_Helper::activate_qos_handler (ACE_QoS_Session *qos_session, - TAO_AV_UDP_QoS_Flow_Handler *handler) -{ - ACE_QoS_Decorator* qos_decorator; - - // Decorate the above handler with QoS functionality. - ACE_NEW_RETURN (qos_decorator, - ACE_QoS_Decorator (handler, - qos_session, - handler->av_core ()->reactor ()), - -1); - - // Initialize the Decorator. - if (qos_decorator->init () != 0) - ACE_ERROR_RETURN ((LM_ERROR, - "QoS Decorator init () failed (%N|%l)\n"), - -1); - - // Register the decorated Event Handler with the Reactor. - int result = handler->av_core ()->reactor ()->register_handler (qos_decorator, - ACE_Event_Handler::QOS_MASK | - ACE_Event_Handler::READ_MASK); - if (result == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error in registering the Decorator with the Reactor (%N|%l)\n"), - -1); - - return 0; - -} TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void) { @@ -282,34 +134,35 @@ int TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec, CosPropertyService::Properties &qos_params) { - qos_params.length (9); + int size = qos_params.length (); + qos_params.length (size + 1); - qos_params [0].property_name = CORBA::string_dup ("Service_Type"); - qos_params [0].property_value <<= (CORBA::Short) ace_flow_spec->service_type (); + qos_params [size].property_name = CORBA::string_dup ("Service_Type"); + qos_params [size].property_value <<= (CORBA::Short) ace_flow_spec->service_type (); - qos_params [1].property_name = CORBA::string_dup ("Token_Rate"); - qos_params [1].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate (); + qos_params [size].property_name = CORBA::string_dup ("Token_Rate"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate (); - qos_params [2].property_name = CORBA::string_dup ("Token_Bucket_Size"); - qos_params [2].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size (); + qos_params [size].property_name = CORBA::string_dup ("Token_Bucket_Size"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size (); - qos_params [3].property_name = CORBA::string_dup ("Peak_Bandwidth"); - qos_params [3].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth (); + qos_params [size].property_name = CORBA::string_dup ("Peak_Bandwidth"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth (); - qos_params [4].property_name = CORBA::string_dup ("Latency"); - qos_params [4].property_value <<= (CORBA::ULong) ace_flow_spec->latency (); + qos_params [size].property_name = CORBA::string_dup ("Latency"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->latency (); - qos_params [5].property_name = CORBA::string_dup ("Delay_Variation"); - qos_params [5].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation (); + qos_params [size].property_name = CORBA::string_dup ("Delay_Variation"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation (); - qos_params [6].property_name = CORBA::string_dup ("Max_SDU_Size"); - qos_params [6].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size (); + qos_params [size].property_name = CORBA::string_dup ("Max_SDU_Size"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size (); - qos_params [7].property_name = CORBA::string_dup ("Minimum_Policed_Size"); - qos_params [7].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size (); + qos_params [size].property_name = CORBA::string_dup ("Minimum_Policed_Size"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size (); - qos_params [8].property_name = CORBA::string_dup ("TTL"); - qos_params [8].property_value <<= (CORBA::ULong) ace_flow_spec->ttl (); + qos_params [size].property_name = CORBA::string_dup ("TTL"); + qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->ttl (); return 0; } @@ -317,22 +170,21 @@ TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec, int TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/) { + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n")); ACE_DECLARE_NEW_CORBA_ENV; if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n")); + "TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n")); if (this->qos_session_->update_qos () == -1) ACE_ERROR_RETURN ((LM_ERROR, "Error in updating QoS\n"), -1); else - { - if(TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Updating QOS succeeds.\n")); - } + ACE_DEBUG ((LM_DEBUG, + " Updating QOS succeeds.\n")); if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_ERROR) { @@ -348,22 +200,16 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/) { if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_EVENT) { - if( TAO_debug_level > 0 ) - { - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Resv Event Received\n")); - } + ACE_DEBUG ((LM_DEBUG, + "Resv Event Received\n")); if (!CORBA::is_nil (this->negotiator_)) { - if( TAO_debug_level > 0 ) - { - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Negotiator Specified\n")); - } + ACE_DEBUG ((LM_DEBUG, + "Negotiator Specified\n")); AVStreams::streamQoS new_qos; ACE_Flow_Spec ace_flow_spec = - this->qos_session_->qos ().sending_flowspec (); + this->qos_session_->qos ().receiving_flowspec (); new_qos.length (1); this->translate (&ace_flow_spec, new_qos [0].QoSParams); @@ -381,12 +227,10 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/) { ACE_QoS_Manager qos_manager = this->get_socket ()->qos_manager (); - - ACE_QoS ace_qos = this->qos_session_->qos (); - + this->qos_session_->qos (this->get_socket (), &qos_manager, - ace_qos); + this->qos_session_->qos ()); } } @@ -396,11 +240,8 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/) int TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos) { - if( TAO_debug_level > 0 ) - { - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::change_qos\n")); - } + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_Flow_Handler::change_qos\n")); ACE_QoS_Manager qos_manager = this->get_socket ()->qos_manager (); @@ -429,11 +270,8 @@ TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos) "Unable to fill simplex sender qos\n"), -1); else - { - if( TAO_debug_level > 0 ) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Filled up the Sender QoS parameters\n")); - } + ACE_DEBUG ((LM_DEBUG, + "Filled up the Sender QoS parameters\n")); } else if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_RECEIVER) { @@ -443,11 +281,8 @@ TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos) "Unable to fill simplex receiver qos\n"), -1); else - { - if( TAO_debug_level > 0 ) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Filled up the Receiver QoS parameters\n")); - } + ACE_DEBUG ((LM_DEBUG, + "Filled up the Receiver QoS parameters\n")); } @@ -477,7 +312,7 @@ TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::set_remote_address\n")); + "TAO_AV_UDP_QoS_Flow_Handler::set_remote_address\n")); ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,address); @@ -486,26 +321,7 @@ TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address) TAO_AV_UDP_QoS_Transport *transport = ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_); - - if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) - { - TAO_AV_UDP_QoS_Session_Helper helper; - - this->qos_session_ = helper.open_qos_session (this, - *inet_addr); - - if (this->qos_session_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "QoS Session Open Failed (%N|%l)\n"), - -1); - - if (helper.activate_qos_handler (this->qos_session_, - this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Activating QoS Handler Failed (%N|%l)\n"), - -1); - } return transport->set_remote_address (*inet_addr); } @@ -515,7 +331,7 @@ TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n", + "TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n", this->qos_sock_dgram_.get_handle ())); return this->qos_sock_dgram_.get_handle () ; @@ -613,25 +429,24 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk, 0, 0) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error in dgram_mcast.send () (%N|%l)\n"), + "Error in dgram_mcast.send ()\n"), -1); else - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Using ACE_OS::sendto () : Bytes sent : %d", - bytes_sent)); + ACE_DEBUG ((LM_DEBUG, + "Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); - if (n < 1) + if (n < 1) return n; - nbytes += bytes_sent; - iovcnt = 0; - } - } + nbytes += bytes_sent; + iovcnt = 0; + } + } } size_t bytes_sent = 0; - + // Check for remaining buffers to be sent! if (iovcnt != 0) { @@ -646,10 +461,9 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk, "Error in dgram_mcast.send ()\n"), -1); else - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d", - bytes_sent)); + ACE_DEBUG ((LM_DEBUG, + "Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); if (n < 1) return n; @@ -667,13 +481,13 @@ TAO_AV_UDP_QoS_Transport::send (const char *buf, { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_Transport::send ")); + "TAO_AV_UDP_QoS_Transport::send ")); char addr [BUFSIZ]; this->peer_addr_.addr_to_string (addr,BUFSIZ); if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) to %s\n", + "to %s\n", addr)); return this->handler_->get_socket ()->send (buf, @@ -701,13 +515,9 @@ TAO_AV_UDP_QoS_Transport::send (const iovec *iov, "Error in dgram_mcast.send ()\n"), -1); else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d", - bytes_sent)); - } - + ACE_DEBUG ((LM_DEBUG, + "Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); return bytes_sent; } @@ -761,12 +571,27 @@ TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *hand if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Acceptor Svc Handler QOS ENABLED \n")); - - TAO_AV_UDP_QoS_Session_Helper helper; + "Acceptor Svc Handler QOS ENABLED \n")); - result = helper.activate_qos_handler (handler->qos_session (), - handler); + ACE_QoS_Decorator* qos_decorator; + + // Decorate the above handler with QoS functionality. + ACE_NEW_RETURN (qos_decorator, + ACE_QoS_Decorator (handler, + handler->qos_session (), + this->av_core_->reactor ()), + -1); + + // Initialize the Decorator. + if (qos_decorator->init () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "QoS Decorator init () failed.\n"), + -1); + + // Register the decorated Event Handler with the Reactor. + result = this->av_core_->reactor ()->register_handler (qos_decorator, + ACE_Event_Handler::QOS_MASK | + ACE_Event_Handler::READ_MASK); if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, "Error in registering the Decorator with the Reactor\n"), @@ -783,7 +608,7 @@ TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint, { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open ")); + "TAO_AV_UDP_QoS_Acceptor::open ")); this->av_core_ = av_core; this->endpoint_ = endpoint; @@ -800,7 +625,7 @@ TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint, BUFSIZ); if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open: %s", + "TAO_AV_UDP_QoS_Acceptor::open: %s", buf)); int result = this->open_i (inet_addr); @@ -829,13 +654,13 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint, addr += ":8000"; ACE_INET_Addr *address; ACE_NEW_RETURN (address, - ACE_INET_Addr ("0"), + ACE_INET_Addr (addr.c_str ()), -1); address->addr_to_string (buf, BUFSIZ); ACE_DEBUG ((LM_DEBUG, - "(%N,%l) ADDRESS IS %s\n", + "ADDRESS IS %s\n", buf)); int result = this->open_i (address); @@ -848,20 +673,19 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint, int TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr) { + ACE_DECLARE_NEW_CORBA_ENV; int result = 0; - + // TAO_AV_Callback *callback = 0; // this->endpoint_->get_callback (this->flowname_.c_str (), // callback); ACE_INET_Addr *local_addr; - ACE_NEW_RETURN (local_addr, - ACE_INET_Addr (*inet_addr), - -1); - ACE_QoS_Params qos_params; + ACE_Flow_Spec ace_flow_spec; + ACE_QoS* ace_qos = 0; FillQoSParams (qos_params, @@ -869,128 +693,160 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr) ace_qos); + AVStreams::QoS qos; + + int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), + qos); + TAO_AV_UDP_QoS_Flow_Handler* handler; ACE_NEW_RETURN (handler, TAO_AV_UDP_QoS_Flow_Handler, -1); - + TAO_AV_Flow_Handler *flow_handler = 0; flow_handler = handler; + + // Create a QoS Session Factory. + ACE_QoS_Session_Factory session_factory; + + // Ask the factory to create a QoS session. This could be RAPI or + // GQoS based on the parameter passed. + + //@@YAmuna : Later make this generic for GQoS + this->qos_session_ = + session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); + + // Create a destination address for the QoS session. The same + // address should be used for the subscribe call later. A copy + // is made below only to distinguish the two usages of the dest + // address. + ACE_INET_Addr dest_addr (*inet_addr); - handler->endpoint (this->endpoint_); - handler->flowspec_entry (this->entry_); - handler->av_core (this->av_core_); + // A QoS session is defined by the 3-tuple [DestAddr, DestPort, + // Protocol]. Initialize the QoS session. + if (this->qos_session_->open (*inet_addr, + IPPROTO_UDP) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in opening the QoS session\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "QoS session opened successfully\n")); - if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - - TAO_AV_UDP_QoS_Session_Helper helper; - - int result = handler->get_socket ()->open (*inet_addr, - qos_params, - AF_INET, - 0, - 0, - 0, - ACE_OVERLAPPED_SOCKET_FLAG - | ACE_FLAG_MULTIPOINT_C_LEAF - | ACE_FLAG_MULTIPOINT_D_LEAF, - 1); - - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"), - -1); + + result = handler->get_socket ()->subscribe (*inet_addr, + qos_params, + 1, + 0, + AF_INET, + // ACE_FROM_PROTOCOL_INFO, + 0, + 0, // ACE_Protocol_Info, + 0, + ACE_OVERLAPPED_SOCKET_FLAG + | ACE_FLAG_MULTIPOINT_C_LEAF + | ACE_FLAG_MULTIPOINT_D_LEAF, + this->qos_session_); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1); + else ACE_DEBUG ((LM_DEBUG, + "Subscribe succeeded\n")); - result = handler->get_socket ()->get_local_addr (*local_addr); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Error in getting Local Address (%N|%l)\n"), - -1); + handler->qos_session (this->qos_session_); - // Create a destination address for the QoS session. The same - // address should be used for the subscribe call later. A copy - // is made below only to distinguish the two usages of the dest - // address. - ACE_INET_Addr dest_addr; - dest_addr.set (local_addr->get_port_number (), - local_addr->get_host_name ()); + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr (*inet_addr), + -1); + + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + // This is a sender + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER); + } + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + // This is a receiver + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); + } + + if (qos_available == 0) + { + + handler->translate (qos.QoSParams, + &ace_flow_spec); - char dest_buf [BUFSIZ]; - dest_addr.addr_to_string (dest_buf, - BUFSIZ); - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Session Address is %s\n", - dest_buf)); + ACE_NEW_RETURN (ace_qos, + ACE_QoS, + -1); - this->qos_session_ = helper.open_qos_session (handler, - dest_addr); - - if (this->qos_session_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "QoS Session Open Failed (%N|%l)\n"), - -1); - - handler->qos_session (this->qos_session_); - - if (this->activate_svc_handler (handler) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Activate Svc Handler Failed (%N|%l)\n"), - -1); - - AVStreams::QoS qos; - int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), - qos); + Fill_ACE_QoS fill_ace_qos; - if (qos_available == 0) + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) { - - ACE_Flow_Spec *ace_flow_spec = 0; - ACE_NEW_RETURN (ace_flow_spec, - ACE_Flow_Spec, - -1); - - handler->translate (qos.QoSParams, - ace_flow_spec); - - if (helper.set_qos (*ace_flow_spec, - handler) == -1) - + if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, + &ace_flow_spec) !=0) ACE_ERROR_RETURN ((LM_ERROR, - "Set QoS Failed (%N|%l)\n"), + "Unable to fill simplex sender qos\n"), -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Sender QoS parameters\n")); } - } - else - { + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, + &ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex receiver qos\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Receiver QoS parameters\n")); + + } + + + + this->qos_manager_ + = handler->get_socket ()->qos_manager (); - int result = handler->get_socket ()->open (*inet_addr, - qos_params, - AF_INET, - 0, - 0, - 0, - ACE_OVERLAPPED_SOCKET_FLAG - | ACE_FLAG_MULTIPOINT_C_LEAF - | ACE_FLAG_MULTIPOINT_D_LEAF, - 1); - - if (result < 0) + // Set the QoS for the session. Replaces the ioctl () call that + // was being made previously. + if (this->qos_session_->qos (handler->get_socket (), + &this->qos_manager_, + *ace_qos) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"), + "Unable to set QoS\n"), -1); + else + ACE_DEBUG ((LM_DEBUG, + "Setting QOS succeeds.\n")); } + + ACE_DEBUG ((LM_DEBUG, + "It reached here \n")); + + // ACE_Time_Value era (10); + //this->adaptor ().flowspec_entry (this->entry_); + //this->adaptor ().endpoint (this->endpoint_); +// TAO_AV_CORE::instance ()->monitor ().register_session (this->qos_session_, + // adaptor, + // (void *) TAO_AV_CORE::instance ()->get_adaptor ((char*)this->flowname ()), + // era, + // QoS_Monitor::PEAK_BANDWIDTH); + + //ll_ace_qos.map ().unbind (g_711); TAO_AV_Protocol_Object *object = this->flow_protocol_factory_->make_protocol_object (this->entry_, - this->endpoint_, - flow_handler, - flow_handler->transport ()); + this->endpoint_, + flow_handler, + flow_handler->transport ()); flow_handler->protocol_object (object); - + AVStreams::Negotiator_ptr negotiator; ACE_TRY_EX (negotiator) @@ -1006,34 +862,40 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr) ACE_CATCHANY { ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Negotiator Not Found \n")); + "Negotiator Not Found \n")); } ACE_ENDTRY; - + + // callback->protocol_object (object); + // this->endpoint_->set_protocol_object (this->flowname_.c_str (), + // object); this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler); this->entry_->protocol_object (object); + char buf[BUFSIZ]; + local_addr->addr_to_string (buf,BUFSIZ); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_QoS_ACCEPTOR::open:%s \n", + buf)); + result = handler->get_socket ()->get_local_addr (*local_addr); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result); local_addr->set (local_addr->get_port_number (), local_addr->get_host_name ()); - - if (TAO_debug_level > 0) - { - char buf [BUFSIZ]; - local_addr->addr_to_string (buf, - BUFSIZ); - ACE_DEBUG ((LM_DEBUG, - "Local Address is %s\n", - buf)); - } + + local_addr->addr_to_string (buf, + BUFSIZ); + ACE_DEBUG ((LM_DEBUG, + "REMOTE ADDRESS IS %s\n", + buf)); this->entry_->set_local_addr (local_addr); this->entry_->handler (flow_handler); - return 0; - + // call activate svc handler. + return this->activate_svc_handler (handler); } int @@ -1129,11 +991,11 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry, this->entry_ = entry; this->flowname_ = entry->flowname (); - ACE_INET_Addr *local_addr; - ACE_NEW_RETURN (local_addr, - ACE_INET_Addr ("0"), - -1); - +// ACE_INET_Addr *local_addr; +// ACE_NEW_RETURN (local_addr, +// ACE_INET_Addr ("0"), +// -1); + TAO_AV_Flow_Handler *flow_handler = 0; @@ -1141,118 +1003,150 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry, ACE_NEW_RETURN (handler, TAO_AV_UDP_QoS_Flow_Handler, -1); - - flow_handler = handler; - - handler->endpoint (this->endpoint_); - handler->flowspec_entry (this->entry_); - handler->av_core (this->av_core_); - - ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*, - entry->address ()); - ACE_QoS_Params qos_params; - ACE_QoS* ace_qos = 0; + flow_handler = handler; + ACE_QoS_Params qos_params; + + ACE_Flow_Spec *ace_flow_spec = 0; + + ACE_QoS* ace_qos; + FillQoSParams (qos_params, 0, ace_qos); - - result = handler->get_socket ()->open (*local_addr, - qos_params, - AF_INET, - 0, - 0, - 0, - ACE_OVERLAPPED_SOCKET_FLAG - | ACE_FLAG_MULTIPOINT_C_LEAF - | ACE_FLAG_MULTIPOINT_D_LEAF, - 1); + + AVStreams::QoS qos; + int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), + qos); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Data socket open failed (%N|%l)\n"), - -1); - result = handler->get_socket ()->get_local_addr (*local_addr); + ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*, + entry->address ()); - - ACE_INET_Addr *session_addr = 0; - if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - ACE_NEW_RETURN (session_addr, - ACE_INET_Addr, - -1); - - session_addr->set (local_addr->get_port_number (), - local_addr->get_host_name ()); - - } - else - { - session_addr = inet_addr; - } - - char sess_buf [BUFSIZ]; - session_addr->addr_to_string (sess_buf, - BUFSIZ); - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "Session Address is %s\n", - sess_buf)); + // Create a QoS Session Factory. + ACE_QoS_Session_Factory session_factory; + // Ask the factory to create a QoS session. This could be RAPI or + // GQoS based on the parameter passed. + + //@@YAmuna : Later make this generic for GQoS + this->qos_session_ = + session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); + + // Create a destination address for the QoS session. The same // address should be used for the subscribe call later. A copy // is made below only to distinguish the two usages of the dest // address. - ACE_INET_Addr dest_addr (*session_addr); - - TAO_AV_UDP_QoS_Session_Helper helper; - - this->qos_session_ = helper.open_qos_session (handler, - *session_addr); - - if (this->qos_session_ == 0) + ACE_INET_Addr dest_addr (*inet_addr); + + // A QoS session is defined by the 3-tuple [DestAddr, DestPort, + // Protocol]. Initialize the QoS session. + if (this->qos_session_->open (*inet_addr, + IPPROTO_UDP) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "QoS Session Open Failed (%N|%l)\n"), - -1); + "Error in opening the QoS session\n"), + -1); else ACE_DEBUG ((LM_DEBUG, "QoS session opened successfully\n")); + + result = handler->get_socket ()->subscribe (*inet_addr, + qos_params, + 1, + 0, + AF_INET, + // ACE_FROM_PROTOCOL_INFO, + 0, + 0, // ACE_Protocol_Info, + 0, + ACE_OVERLAPPED_SOCKET_FLAG + | ACE_FLAG_MULTIPOINT_C_LEAF + | ACE_FLAG_MULTIPOINT_D_LEAF, + this->qos_session_); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1); + // Now disable Multicast loopback. + // @@Should we make this a policy? + handler->qos_session (this->qos_session_); + ACE_INET_Addr *local_addr; + + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr (*inet_addr), + -1); + + + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + // This is a sender + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER); + } + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + // This is a receiver + this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); + } + this->qos_manager_ = handler->get_socket ()->qos_manager (); - AVStreams::QoS qos; - - int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), - qos); if (qos_available == 0) { - - ACE_Flow_Spec* ace_flow_spec; ACE_NEW_RETURN (ace_flow_spec, ACE_Flow_Spec, -1); handler->translate (qos.QoSParams, - ace_flow_spec); + ace_flow_spec); + + ACE_QoS* ace_qos; + + ACE_NEW_RETURN (ace_qos, + ACE_QoS, + -1); + + Fill_ACE_QoS fill_ace_qos; - if (helper.set_qos (*ace_flow_spec, - handler) == -1) + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, + ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex receiver qos\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Receiver QoS parameters\n")); + } + else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, + ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex sender qos\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Filled up the Sender QoS parameters\n")); + } + + // Set the QoS for the session. Replaces the ioctl () call that + // was being made previously. + if (this->qos_session_->qos (handler->get_socket (), + &this->qos_manager_, + *ace_qos) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Unable to set QoS (%N|%l)\n"), + "Unable to set QoS\n"), -1); else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Setting QOS succeeds.\n")); - } + ACE_DEBUG ((LM_DEBUG, + "Setting QOS succeeds.\n")); } TAO_AV_Protocol_Object *object = @@ -1275,41 +1169,27 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry, } ACE_CATCHANY { - ACE_DEBUG ((LM_DEBUG, - "Negotiator not found for flow %s\n", - this->entry_->flowname ())); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_UDP_QoS_Connector::connect"); } ACE_ENDTRY; flow_handler->protocol_object (object); - + // callback->protocol_object (object); + // this->endpoint_->set_protocol_object (this->flowname_.c_str (), + // object); this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler); this->entry_->protocol_object (object); - result = handler->get_socket ()->get_local_addr (*local_addr); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Get local addr failed (%N|%l)\n"), - result); - - local_addr->set (local_addr->get_port_number (), - local_addr->get_host_name ()); + char buf[BUFSIZ]; + local_addr->addr_to_string (buf,BUFSIZ); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_UDP_CONNECTOR::connect:%s \n", + buf)); - if (TAO_debug_level > 0) - { - char buf[BUFSIZ]; - local_addr->addr_to_string (buf, - BUFSIZ); - - ACE_DEBUG ((LM_DEBUG, - "Local Address is %s\n", - buf)); - } - entry->set_local_addr (local_addr); entry->handler (flow_handler); transport = flow_handler->transport (); - // call activate svc handler. return this->activate_svc_handler (handler); } @@ -1318,15 +1198,29 @@ int TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler) { int result = 0; - - TAO_AV_UDP_QoS_Session_Helper helper; + ACE_QoS_Decorator* qos_decorator; + + // Decorate the above handler with QoS functionality. + ACE_NEW_RETURN (qos_decorator, + ACE_QoS_Decorator (handler, + handler->qos_session (), + this->av_core_->reactor ()), + -1); + + // Initialize the Decorator. + if (qos_decorator->init () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "QoS Decorator init () failed.\n"), + -1); + + // Register the decorated Event Handler with the Reactor. + result = this->av_core_->reactor ()->register_handler (qos_decorator, + ACE_Event_Handler::QOS_MASK | + ACE_Event_Handler::READ_MASK); - result = helper.activate_qos_handler (this->qos_session_, - handler); - if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, - "(%N,%l) Error in registering the Decorator with the Reactor\n"), + "Error in registering the Decorator with the Reactor\n"), -1); return result; |