diff options
author | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-08-04 17:43:45 +0000 |
---|---|---|
committer | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-08-04 17:43:45 +0000 |
commit | 9928c12e788aab257b0cd1ecbb4606735260d316 (patch) | |
tree | b567e8fae8e408771ff34610f9686db7822a7fe1 | |
parent | ff56c2606b0a35149a1533ed2afb337c43a26e92 (diff) | |
download | ATCD-9928c12e788aab257b0cd1ecbb4606735260d316.tar.gz |
ChangeLogTag: ri Aug 04 3:33:31 2001 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp | 811 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h | 36 |
2 files changed, 466 insertions, 381 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp index 27e6888ee11..1b944625014 100644 --- a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp @@ -32,6 +32,154 @@ FillQoSParams (ACE_QoS_Params &qos_params, return 0; } +TAO_AV_UDP_QoS_Session_Helper::TAO_AV_UDP_QoS_Session_Helper (void) +{ + +} + +TAO_AV_UDP_QoS_Session_Helper::~TAO_AV_UDP_QoS_Session_Helper (void) +{ +} + +int +TAO_AV_UDP_QoS_Session_Helper::set_qos (ACE_Flow_Spec &ace_flow_spec, + TAO_AV_UDP_QoS_Flow_Handler *handler) +{ + ACE_QoS* ace_qos = 0; + + ACE_NEW_RETURN (ace_qos, + ACE_QoS, + -1); + + Fill_ACE_QoS fill_ace_qos; + + if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, + &ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex sender qos (%N|%l)\n"), + -1); + else + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Filled up the Sender QoS parameters\n")); + } + else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, + &ace_flow_spec) !=0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to fill simplex receiver qos (%N|%l)\n"), + -1); + else + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Filled up the Receiver QoS parameters\n")); + + } + + ACE_QoS_Manager qos_manager = handler->get_socket ()->qos_manager (); + + // Set the QoS for the session. Replaces the ioctl () call that + // was being made previously. + if (handler->qos_session ()->qos (handler->get_socket (), + &qos_manager, + *ace_qos) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to set QoS (%N|%l)\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Setting QOS succeeds\n")); + + return 0; +} + +ACE_QoS_Session * +TAO_AV_UDP_QoS_Session_Helper::open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler, + ACE_INET_Addr &addr) +{ + ACE_QoS_Params qos_params; + + ACE_QoS* ace_qos = 0; + + FillQoSParams (qos_params, + 0, + ace_qos); + + + // Create a QoS Session Factory. + ACE_QoS_Session_Factory session_factory; + + // Ask the factory to create a QoS session. This could be RAPI or + // GQoS based on the parameter passed. + + //@@YAmuna : Later make this generic for GQoS + ACE_QoS_Session *qos_session = session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); + + // Create a destination address for the QoS session. The same + // address should be used for the subscribe call later. A copy + // is made below only to distinguish the two usages of the dest + // address. + ACE_INET_Addr dest_addr (addr); + + // A QoS session is defined by the 3-tuple [DestAddr, DestPort, + // Protocol]. Initialize the QoS session. + if (qos_session->open (dest_addr, + IPPROTO_UDP) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in opening the QoS session\n"), + 0); + else + ACE_DEBUG ((LM_DEBUG, + "QoS session opened successfully\n")); + + if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + // This is a sender + qos_session->flags (ACE_QoS_Session::ACE_QOS_SENDER); + } + else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + // This is a receiver + qos_session->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); + } + + return qos_session; +} + +int +TAO_AV_UDP_QoS_Session_Helper::activate_qos_handler (ACE_QoS_Session *qos_session, + TAO_AV_UDP_QoS_Flow_Handler *handler) +{ + ACE_QoS_Decorator* qos_decorator; + + // Decorate the above handler with QoS functionality. + ACE_NEW_RETURN (qos_decorator, + ACE_QoS_Decorator (handler, + qos_session, + handler->av_core ()->reactor ()), + -1); + + // Initialize the Decorator. + if (qos_decorator->init () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "QoS Decorator init () failed (%N|%l)\n"), + -1); + + // Register the decorated Event Handler with the Reactor. + int result = handler->av_core ()->reactor ()->register_handler (qos_decorator, + ACE_Event_Handler::QOS_MASK | + ACE_Event_Handler::READ_MASK); + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in registering the Decorator with the Reactor (%N|%l)\n"), + -1); + + return 0; + +} TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void) { @@ -134,35 +282,34 @@ int TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec, CosPropertyService::Properties &qos_params) { - int size = qos_params.length (); - qos_params.length (size + 1); + qos_params.length (9); - qos_params [size].property_name = CORBA::string_dup ("Service_Type"); - qos_params [size].property_value <<= (CORBA::Short) ace_flow_spec->service_type (); + qos_params [0].property_name = CORBA::string_dup ("Service_Type"); + qos_params [0].property_value <<= (CORBA::Short) ace_flow_spec->service_type (); - qos_params [size].property_name = CORBA::string_dup ("Token_Rate"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate (); + qos_params [1].property_name = CORBA::string_dup ("Token_Rate"); + qos_params [1].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate (); - qos_params [size].property_name = CORBA::string_dup ("Token_Bucket_Size"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size (); + qos_params [2].property_name = CORBA::string_dup ("Token_Bucket_Size"); + qos_params [2].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size (); - qos_params [size].property_name = CORBA::string_dup ("Peak_Bandwidth"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth (); + qos_params [3].property_name = CORBA::string_dup ("Peak_Bandwidth"); + qos_params [3].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth (); - qos_params [size].property_name = CORBA::string_dup ("Latency"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->latency (); + qos_params [4].property_name = CORBA::string_dup ("Latency"); + qos_params [4].property_value <<= (CORBA::ULong) ace_flow_spec->latency (); - qos_params [size].property_name = CORBA::string_dup ("Delay_Variation"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation (); + qos_params [5].property_name = CORBA::string_dup ("Delay_Variation"); + qos_params [5].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation (); - qos_params [size].property_name = CORBA::string_dup ("Max_SDU_Size"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size (); + qos_params [6].property_name = CORBA::string_dup ("Max_SDU_Size"); + qos_params [6].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size (); - qos_params [size].property_name = CORBA::string_dup ("Minimum_Policed_Size"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size (); + qos_params [7].property_name = CORBA::string_dup ("Minimum_Policed_Size"); + qos_params [7].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size (); - qos_params [size].property_name = CORBA::string_dup ("TTL"); - qos_params [size].property_value <<= (CORBA::ULong) ace_flow_spec->ttl (); + qos_params [8].property_name = CORBA::string_dup ("TTL"); + qos_params [8].property_value <<= (CORBA::ULong) ace_flow_spec->ttl (); return 0; } @@ -216,7 +363,7 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/) AVStreams::streamQoS new_qos; ACE_Flow_Spec ace_flow_spec = - this->qos_session_->qos ().receiving_flowspec (); + this->qos_session_->qos ().sending_flowspec (); new_qos.length (1); this->translate (&ace_flow_spec, new_qos [0].QoSParams); @@ -234,10 +381,12 @@ TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/) { ACE_QoS_Manager qos_manager = this->get_socket ()->qos_manager (); - + + ACE_QoS ace_qos = this->qos_session_->qos (); + this->qos_session_->qos (this->get_socket (), &qos_manager, - this->qos_session_->qos ()); + ace_qos); } } @@ -337,7 +486,26 @@ TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address) TAO_AV_UDP_QoS_Transport *transport = ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_); + + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + { + + TAO_AV_UDP_QoS_Session_Helper helper; + this->qos_session_ = helper.open_qos_session (this, + *inet_addr); + + if (this->qos_session_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "QoS Session Open Failed (%N|%l)\n"), + -1); + + if (helper.activate_qos_handler (this->qos_session_, + this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Activating QoS Handler Failed (%N|%l)\n"), + -1); + } return transport->set_remote_address (*inet_addr); } @@ -445,30 +613,25 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk, 0, 0) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error in dgram_mcast.send ()\n"), + "Error in dgram_mcast.send () (%N|%l)\n"), -1); else - { - if( TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d", - bytes_sent)); - } - - } + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); - if (n < 1) + if (n < 1) return n; - nbytes += bytes_sent; - iovcnt = 0; - } - } + nbytes += bytes_sent; + iovcnt = 0; + } + } } size_t bytes_sent = 0; - + // Check for remaining buffers to be sent! if (iovcnt != 0) { @@ -483,14 +646,10 @@ TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk, "Error in dgram_mcast.send ()\n"), -1); else - { if( TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d", - bytes_sent)); - } - } + ACE_DEBUG ((LM_DEBUG, + "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d", + bytes_sent)); if (n < 1) return n; @@ -548,6 +707,7 @@ TAO_AV_UDP_QoS_Transport::send (const iovec *iov, "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d", bytes_sent)); } + return bytes_sent; } @@ -603,25 +763,10 @@ TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *hand ACE_DEBUG ((LM_DEBUG, "(%N,%l) Acceptor Svc Handler QOS ENABLED \n")); - ACE_QoS_Decorator* qos_decorator; - - // Decorate the above handler with QoS functionality. - ACE_NEW_RETURN (qos_decorator, - ACE_QoS_Decorator (handler, - handler->qos_session (), - this->av_core_->reactor ()), - -1); - - // Initialize the Decorator. - if (qos_decorator->init () != 0) - ACE_ERROR_RETURN ((LM_ERROR, - "QoS Decorator init () failed.\n"), - -1); - - // Register the decorated Event Handler with the Reactor. - result = this->av_core_->reactor ()->register_handler (qos_decorator, - ACE_Event_Handler::QOS_MASK | - ACE_Event_Handler::READ_MASK); + TAO_AV_UDP_QoS_Session_Helper helper; + + result = helper.activate_qos_handler (handler->qos_session (), + handler); if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, "Error in registering the Decorator with the Reactor\n"), @@ -684,7 +829,7 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint, addr += ":8000"; ACE_INET_Addr *address; ACE_NEW_RETURN (address, - ACE_INET_Addr (addr.c_str ()), + ACE_INET_Addr ("0"), -1); address->addr_to_string (buf, @@ -703,18 +848,19 @@ TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint, int TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr) { - ACE_DECLARE_NEW_CORBA_ENV; int result = 0; - + // TAO_AV_Callback *callback = 0; // this->endpoint_->get_callback (this->flowname_.c_str (), // callback); ACE_INET_Addr *local_addr; - ACE_QoS_Params qos_params; + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr (*inet_addr), + -1); - ACE_Flow_Spec ace_flow_spec; + ACE_QoS_Params qos_params; ACE_QoS* ace_qos = 0; @@ -723,173 +869,128 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr) ace_qos); - AVStreams::QoS qos; - - int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), - qos); - TAO_AV_UDP_QoS_Flow_Handler* handler; ACE_NEW_RETURN (handler, TAO_AV_UDP_QoS_Flow_Handler, -1); - + TAO_AV_Flow_Handler *flow_handler = 0; flow_handler = handler; - - // Create a QoS Session Factory. - ACE_QoS_Session_Factory session_factory; - - // Ask the factory to create a QoS session. This could be RAPI or - // GQoS based on the parameter passed. - - //@@YAmuna : Later make this generic for GQoS - this->qos_session_ = - session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); - - // Create a destination address for the QoS session. The same - // address should be used for the subscribe call later. A copy - // is made below only to distinguish the two usages of the dest - // address. - ACE_INET_Addr dest_addr (*inet_addr); - - // A QoS session is defined by the 3-tuple [DestAddr, DestPort, - // Protocol]. Initialize the QoS session. - if (this->qos_session_->open (*inet_addr, - IPPROTO_UDP) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error in opening the QoS session\n"), - -1); - else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) QoS session opened successfully\n")); - } - - - result = handler->get_socket ()->subscribe (*inet_addr, - qos_params, - 1, - 0, - AF_INET, - // ACE_FROM_PROTOCOL_INFO, - 0, - 0, // ACE_Protocol_Info, - 0, - ACE_OVERLAPPED_SOCKET_FLAG - | ACE_FLAG_MULTIPOINT_C_LEAF - | ACE_FLAG_MULTIPOINT_D_LEAF, - this->qos_session_); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1); - else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Subscribe succeeded\n")); - } + handler->endpoint (this->endpoint_); + handler->flowspec_entry (this->entry_); + handler->av_core (this->av_core_); - handler->qos_session (this->qos_session_); - - ACE_NEW_RETURN (local_addr, - ACE_INET_Addr (*inet_addr), - -1); - - if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) { - // This is a sender - this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER); - } - else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - // This is a receiver - this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); - } + + TAO_AV_UDP_QoS_Session_Helper helper; + + int result = handler->get_socket ()->open (*inet_addr, + qos_params, + AF_INET, + 0, + 0, + 0, + ACE_OVERLAPPED_SOCKET_FLAG + | ACE_FLAG_MULTIPOINT_C_LEAF + | ACE_FLAG_MULTIPOINT_D_LEAF, + 1); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"), + -1); - if (qos_available == 0) - { - - handler->translate (qos.QoSParams, - &ace_flow_spec); + result = handler->get_socket ()->get_local_addr (*local_addr); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in getting Local Address (%N|%l)\n"), + -1); + + // Create a destination address for the QoS session. The same + // address should be used for the subscribe call later. A copy + // is made below only to distinguish the two usages of the dest + // address. + ACE_INET_Addr dest_addr; + dest_addr.set (local_addr->get_port_number (), + local_addr->get_host_name ()); + char dest_buf [BUFSIZ]; + dest_addr.addr_to_string (dest_buf, + BUFSIZ); - ACE_NEW_RETURN (ace_qos, - ACE_QoS, - -1); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Session Address is %s\n", + dest_buf)); - Fill_ACE_QoS fill_ace_qos; + this->qos_session_ = helper.open_qos_session (handler, + dest_addr); + + if (this->qos_session_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "QoS Session Open Failed (%N|%l)\n"), + -1); + + handler->qos_session (this->qos_session_); + + if (this->activate_svc_handler (handler) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Activate Svc Handler Failed (%N|%l)\n"), + -1); + + AVStreams::QoS qos; + int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), + qos); - if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) + if (qos_available == 0) { - if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, - &ace_flow_spec) !=0) - ACE_ERROR_RETURN ((LM_ERROR, - "Unable to fill simplex sender qos\n"), - -1); - else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Filled up the Sender QoS parameters\n")); - } - } - else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, - &ace_flow_spec) !=0) + + ACE_Flow_Spec *ace_flow_spec = 0; + ACE_NEW_RETURN (ace_flow_spec, + ACE_Flow_Spec, + -1); + + handler->translate (qos.QoSParams, + ace_flow_spec); + + if (helper.set_qos (*ace_flow_spec, + handler) == -1) + ACE_ERROR_RETURN ((LM_ERROR, - "Unable to fill simplex receiver qos\n"), + "Set QoS Failed (%N|%l)\n"), -1); - else - { - if(TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Filled up the Receiver QoS parameters\n")); - } - } + } + else + { - - - this->qos_manager_ - = handler->get_socket ()->qos_manager (); - - // Set the QoS for the session. Replaces the ioctl () call that - // was being made previously. - if (this->qos_session_->qos (handler->get_socket (), - &this->qos_manager_, - *ace_qos) == -1) + int result = handler->get_socket ()->open (*inet_addr, + qos_params, + AF_INET, + 0, + 0, + 0, + ACE_OVERLAPPED_SOCKET_FLAG + | ACE_FLAG_MULTIPOINT_C_LEAF + | ACE_FLAG_MULTIPOINT_D_LEAF, + 1); + + if (result < 0) ACE_ERROR_RETURN ((LM_ERROR, - "Unable to set QoS\n"), + "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"), -1); - else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Setting QOS succeeds.\n")); - } } - - // ACE_Time_Value era (10); - //this->adaptor ().flowspec_entry (this->entry_); - //this->adaptor ().endpoint (this->endpoint_); -// TAO_AV_CORE::instance ()->monitor ().register_session (this->qos_session_, - // adaptor, - // (void *) TAO_AV_CORE::instance ()->get_adaptor ((char*)this->flowname ()), - // era, - // QoS_Monitor::PEAK_BANDWIDTH); - - //ll_ace_qos.map ().unbind (g_711); TAO_AV_Protocol_Object *object = this->flow_protocol_factory_->make_protocol_object (this->entry_, - this->endpoint_, - flow_handler, - flow_handler->transport ()); + this->endpoint_, + flow_handler, + flow_handler->transport ()); flow_handler->protocol_object (object); - + AVStreams::Negotiator_ptr negotiator; ACE_TRY_EX (negotiator) @@ -908,37 +1009,31 @@ TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr) "(%N,%l) Negotiator Not Found \n")); } ACE_ENDTRY; - - // callback->protocol_object (object); - // this->endpoint_->set_protocol_object (this->flowname_.c_str (), - // object); + this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler); this->entry_->protocol_object (object); - char buf[BUFSIZ]; - local_addr->addr_to_string (buf,BUFSIZ); - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_QoS_ACCEPTOR::open:%s \n", - buf)); - result = handler->get_socket ()->get_local_addr (*local_addr); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result); local_addr->set (local_addr->get_port_number (), local_addr->get_host_name ()); - - local_addr->addr_to_string (buf, - BUFSIZ); - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) REMOTE ADDRESS IS %s\n", - buf)); + + if (TAO_debug_level > 0) + { + char buf [BUFSIZ]; + local_addr->addr_to_string (buf, + BUFSIZ); + ACE_DEBUG ((LM_DEBUG, + "Local Address is %s\n", + buf)); + } this->entry_->set_local_addr (local_addr); this->entry_->handler (flow_handler); - // call activate svc handler. - return this->activate_svc_handler (handler); + return 0; + } int @@ -1034,11 +1129,11 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry, this->entry_ = entry; this->flowname_ = entry->flowname (); -// ACE_INET_Addr *local_addr; -// ACE_NEW_RETURN (local_addr, -// ACE_INET_Addr ("0"), -// -1); - + ACE_INET_Addr *local_addr; + ACE_NEW_RETURN (local_addr, + ACE_INET_Addr ("0"), + -1); + TAO_AV_Flow_Handler *flow_handler = 0; @@ -1046,155 +1141,111 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry, ACE_NEW_RETURN (handler, TAO_AV_UDP_QoS_Flow_Handler, -1); - - + flow_handler = handler; + + handler->endpoint (this->endpoint_); + handler->flowspec_entry (this->entry_); + handler->av_core (this->av_core_); + + ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*, + entry->address ()); ACE_QoS_Params qos_params; - - ACE_Flow_Spec *ace_flow_spec = 0; - - ACE_QoS* ace_qos; - + + ACE_QoS* ace_qos = 0; + FillQoSParams (qos_params, 0, ace_qos); - - AVStreams::QoS qos; - int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), - qos); - - ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*, - entry->address ()); - - - // Create a QoS Session Factory. - ACE_QoS_Session_Factory session_factory; + result = handler->get_socket ()->open (*local_addr, + qos_params, + AF_INET, + 0, + 0, + 0, + ACE_OVERLAPPED_SOCKET_FLAG + | ACE_FLAG_MULTIPOINT_C_LEAF + | ACE_FLAG_MULTIPOINT_D_LEAF, + 1); - // Ask the factory to create a QoS session. This could be RAPI or - // GQoS based on the parameter passed. + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Data socket open failed (%N|%l)\n"), + -1); - //@@YAmuna : Later make this generic for GQoS - this->qos_session_ = - session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION); + result = handler->get_socket ()->get_local_addr (*local_addr); + + ACE_INET_Addr *session_addr = 0; + if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) + { + ACE_NEW_RETURN (session_addr, + ACE_INET_Addr, + -1); + + session_addr->set (local_addr->get_port_number (), + local_addr->get_host_name ()); + + } + else + { + session_addr = inet_addr; + } + char sess_buf [BUFSIZ]; + session_addr->addr_to_string (sess_buf, + BUFSIZ); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Session Address is %s\n", + sess_buf)); + // Create a destination address for the QoS session. The same // address should be used for the subscribe call later. A copy // is made below only to distinguish the two usages of the dest // address. - ACE_INET_Addr dest_addr (*inet_addr); - - // A QoS session is defined by the 3-tuple [DestAddr, DestPort, - // Protocol]. Initialize the QoS session. - if (this->qos_session_->open (*inet_addr, - IPPROTO_UDP) == -1) + ACE_INET_Addr dest_addr (*session_addr); + + TAO_AV_UDP_QoS_Session_Helper helper; + + this->qos_session_ = helper.open_qos_session (handler, + *session_addr); + + if (this->qos_session_ == 0) ACE_ERROR_RETURN ((LM_ERROR, - "Error in opening the QoS session\n"), - -1); + "QoS Session Open Failed (%N|%l)\n"), + -1); else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) QoS session opened successfully\n")); - } - - - result = handler->get_socket ()->subscribe (*inet_addr, - qos_params, - 1, - 0, - AF_INET, - // ACE_FROM_PROTOCOL_INFO, - 0, - 0, // ACE_Protocol_Info, - 0, - ACE_OVERLAPPED_SOCKET_FLAG - | ACE_FLAG_MULTIPOINT_C_LEAF - | ACE_FLAG_MULTIPOINT_D_LEAF, - this->qos_session_); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1); - // Now disable Multicast loopback. - // @@Should we make this a policy? + ACE_DEBUG ((LM_DEBUG, + "QoS session opened successfully\n")); handler->qos_session (this->qos_session_); - ACE_INET_Addr *local_addr; - - ACE_NEW_RETURN (local_addr, - ACE_INET_Addr (*inet_addr), - -1); - - - if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) - { - // This is a sender - this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER); - } - else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - // This is a receiver - this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER); - } - this->qos_manager_ = handler->get_socket ()->qos_manager (); + AVStreams::QoS qos; + + int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (), + qos); if (qos_available == 0) { + + ACE_Flow_Spec* ace_flow_spec; ACE_NEW_RETURN (ace_flow_spec, ACE_Flow_Spec, -1); handler->translate (qos.QoSParams, - ace_flow_spec); - - ACE_QoS* ace_qos; - - ACE_NEW_RETURN (ace_qos, - ACE_QoS, - -1); - - Fill_ACE_QoS fill_ace_qos; + ace_flow_spec); - if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER) - { - if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos, - ace_flow_spec) !=0) - ACE_ERROR_RETURN ((LM_ERROR, - "Unable to fill simplex receiver qos\n"), - -1); - else - { - if( TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Filled up the Receiver QoS parameters\n")); - } - } - else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER) - { - if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos, - ace_flow_spec) !=0) - ACE_ERROR_RETURN ((LM_ERROR, - "Unable to fill simplex sender qos\n"), - -1); - else - { - if(TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) Filled up the Sender QoS parameters\n")); - } - } - - // Set the QoS for the session. Replaces the ioctl () call that - // was being made previously. - if (this->qos_session_->qos (handler->get_socket (), - &this->qos_manager_, - *ace_qos) == -1) + if (helper.set_qos (*ace_flow_spec, + handler) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Unable to set QoS\n"), + "Unable to set QoS (%N|%l)\n"), -1); else { @@ -1224,27 +1275,41 @@ TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry, } ACE_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_UDP_QoS_Connector::connect"); + ACE_DEBUG ((LM_DEBUG, + "Negotiator not found for flow %s\n", + this->entry_->flowname ())); } ACE_ENDTRY; flow_handler->protocol_object (object); - // callback->protocol_object (object); - // this->endpoint_->set_protocol_object (this->flowname_.c_str (), - // object); + this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler); this->entry_->protocol_object (object); - char buf[BUFSIZ]; - local_addr->addr_to_string (buf,BUFSIZ); - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%N,%l) TAO_AV_UDP_CONNECTOR::connect:%s \n", - buf)); + result = handler->get_socket ()->get_local_addr (*local_addr); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Get local addr failed (%N|%l)\n"), + result); + local_addr->set (local_addr->get_port_number (), + local_addr->get_host_name ()); + + if (TAO_debug_level > 0) + { + char buf[BUFSIZ]; + local_addr->addr_to_string (buf, + BUFSIZ); + + ACE_DEBUG ((LM_DEBUG, + "Local Address is %s\n", + buf)); + } + entry->set_local_addr (local_addr); entry->handler (flow_handler); transport = flow_handler->transport (); + // call activate svc handler. return this->activate_svc_handler (handler); } @@ -1253,26 +1318,12 @@ int TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler) { int result = 0; - ACE_QoS_Decorator* qos_decorator; - - // Decorate the above handler with QoS functionality. - ACE_NEW_RETURN (qos_decorator, - ACE_QoS_Decorator (handler, - handler->qos_session (), - this->av_core_->reactor ()), - -1); - - // Initialize the Decorator. - if (qos_decorator->init () != 0) - ACE_ERROR_RETURN ((LM_ERROR, - "(%N,%l) QoS Decorator init () failed.\n"), - -1); - - // Register the decorated Event Handler with the Reactor. - result = this->av_core_->reactor ()->register_handler (qos_decorator, - ACE_Event_Handler::QOS_MASK | - ACE_Event_Handler::READ_MASK); + + TAO_AV_UDP_QoS_Session_Helper helper; + result = helper.activate_qos_handler (this->qos_session_, + handler); + if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Error in registering the Decorator with the Reactor\n"), diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h index 7c0a9f11437..981f1c5a2da 100644 --- a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h +++ b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.h @@ -125,18 +125,30 @@ public: virtual ACE_Event_Handler* event_handler (void){ return this; } virtual ACE_QoS_Session* qos_session (void); virtual void qos_session (ACE_QoS_Session *qos_session); + int translate (ACE_Flow_Spec *ace_flow_spec, CosPropertyService::Properties &qos_params); int translate (CosPropertyService::Properties &qos_params, ACE_Flow_Spec *ace_flow_spec); void negotiator (AVStreams::Negotiator_ptr); + + void endpoint (TAO_Base_StreamEndPoint *endpoint); + TAO_Base_StreamEndPoint* endpoint (void); + + void flowspec_entry (TAO_FlowSpec_Entry *entry); + TAO_FlowSpec_Entry* flowspec_entry (void); + void av_core (TAO_AV_Core *avcore); + TAO_AV_Core* av_core (void); + protected: TAO_AV_Core *av_core_; ACE_INET_Addr peer_addr_; ACE_SOCK_Dgram_Mcast_QoS qos_sock_dgram_; ACE_QoS_Session *qos_session_; + TAO_FlowSpec_Entry *entry_; + TAO_Base_StreamEndPoint *endpoint_; AVStreams::Negotiator_ptr negotiator_; }; @@ -159,7 +171,7 @@ public: virtual int open_i (ACE_INET_Addr *address); virtual int close (void); - // virtual int activate_svc_handler (TAO_AV_Flow_Handler *handler); + virtual int activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler); @@ -239,6 +251,28 @@ public: TAO_AV_Transport *transport); }; +/// Helper class to create qos sessions, +/// activate qos handlers and set qos +/// (For separation of concerns) +class TAO_AV_UDP_QoS_Session_Helper +{ +public: + TAO_AV_UDP_QoS_Session_Helper (void); + ~TAO_AV_UDP_QoS_Session_Helper (void); + + /// Open a QoS Session with the specified address + ACE_QoS_Session* open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler, + ACE_INET_Addr &addr); + + /// Activate the QoS handler to receive QoS events + int activate_qos_handler (ACE_QoS_Session *qos_session, + TAO_AV_UDP_QoS_Flow_Handler *handler); + + /// Set the required QoS for the session + int set_qos (ACE_Flow_Spec& ace_flow_spec, + TAO_AV_UDP_QoS_Flow_Handler *handler); +}; + ACE_STATIC_SVC_DECLARE (TAO_AV_UDP_QoS_Flow_Factory) ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_UDP_QoS_Flow_Factory) |