summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp1125
1 files changed, 0 insertions, 1125 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp b/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
deleted file mode 100644
index 73eccfcd240..00000000000
--- a/TAO/orbsvcs/orbsvcs/AV/QoS_UDP.cpp
+++ /dev/null
@@ -1,1125 +0,0 @@
-// $Id$
-
-#include "QoS_UDP.h"
-#include "UDP.h"
-#include "orbsvcs/AV/AVStreams_i.h"
-#include "orbsvcs/AV/MCast.h"
-//#include "orbsvcs/AV/QoS_MCast.h"
-#include "orbsvcs/AV/Fill_ACE_QoS.h"
-
-#if !defined (__ACE_INLINE__)
-#include "orbsvcs/AV/QoS_UDP.i"
-#endif /* __ACE_INLINE__ */
-
-//------------------------------------------------------------
-// TAO_AV_UDP_Flow_Handler
-//------------------------------------------------------------
-
-
-int
-FillQoSParams (ACE_QoS_Params &qos_params,
- iovec* iov,
- ACE_QoS* qos)
-{
- qos_params.callee_data (iov);
- qos_params.caller_data (0);
- qos_params.socket_qos (qos);
- qos_params.group_socket_qos (0);
- qos_params.flags (ACE_JL_BOTH);
-
- return 0;
-}
-
-
-TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void)
-{
- ACE_NEW (this->transport_,
- TAO_AV_UDP_QoS_Transport (this));
-}
-
-TAO_AV_UDP_QoS_Flow_Handler::~TAO_AV_UDP_QoS_Flow_Handler (void)
-{
- delete this->transport_;
-}
-
-TAO_AV_Transport *
-TAO_AV_UDP_QoS_Flow_Handler::transport (void)
-{
- return this->transport_;
-}
-
-int
-TAO_AV_UDP_QoS_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
-{
- this->protocol_object_->handle_input ();
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n"));
-
- if (this->qos_session_->update_qos () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in updating QoS\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- " Updating QOS succeeds.\n"));
-
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
- const void *arg)
-{
- return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
-}
-
-int
-TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Flow_Handler::set_remote_address\n"));
-
- ACE_INET_Addr *inet_addr =
- ACE_dynamic_cast (ACE_INET_Addr*,address);
-
- this->peer_addr_ = *inet_addr;
-
- TAO_AV_UDP_QoS_Transport *transport =
- ACE_dynamic_cast (TAO_AV_UDP_QoS_Transport*,this->transport_);
-
- return transport->set_remote_address (*inet_addr);
-}
-
-
-ACE_HANDLE
-TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n",
- this->qos_sock_dgram_.get_handle ()));
-
- return this->qos_sock_dgram_.get_handle () ;
-}
-
-//------------------------------------------------------------
-// TAO_AV_UDP_Transport
-//------------------------------------------------------------
-
-TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (void)
- :handler_ (0)
-{
-}
-
-TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler)
- :handler_ (handler),
- addr_ (0)
-{
-}
-
-TAO_AV_UDP_QoS_Transport::~TAO_AV_UDP_QoS_Transport (void)
-{
-}
-
-int
-TAO_AV_UDP_QoS_Transport::set_remote_address (const ACE_INET_Addr &address)
-{
- this->peer_addr_ = address;
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Transport::open (ACE_Addr * /*address*/)
-{
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Transport::close (void)
-{
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Transport::mtu (void)
-{
- return ACE_MAX_DGRAM_SIZE;
-}
-
-ACE_Addr*
-TAO_AV_UDP_QoS_Transport::get_peer_addr (void)
-{
- return &this->peer_addr_;
-}
-
-ssize_t
-TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
- ACE_Time_Value *)
-{
- // For the most part this was copied from GIOP::send_request and
- // friends.
-
- iovec iov[IOV_MAX];
- int iovcnt = 0;
- ssize_t n = 0;
- ssize_t nbytes = 0;
-
- for (const ACE_Message_Block *i = mblk;
- i != 0;
- i = i->cont ())
- {
- // Make sure there is something to send!
- if (i->length () > 0)
- {
- iov[iovcnt].iov_base = i->rd_ptr ();
- iov[iovcnt].iov_len = i->length ();
- iovcnt++;
-
- // The buffer is full make a OS call. @@ TODO this should
- // be optimized on a per-platform basis, for instance, some
- // platforms do not implement writev() there we should copy
- // the data into a buffer and call send_n(). In other cases
- // there may be some limits on the size of the iovec, there
- // we should set IOV_MAX to that limit.
-
- size_t bytes_sent = 0;
-
- if (iovcnt == IOV_MAX)
- {
- if (this->handler_->get_socket ()->send (iov,
- 1,
- bytes_sent,
- 0,
- this->handler_->qos_session ()->dest_addr (),
- 0,
- 0) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in dgram_mcast.send ()\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
-
- if (n < 1)
- return n;
-
- nbytes += bytes_sent;
- iovcnt = 0;
- }
- }
- }
-
- size_t bytes_sent = 0;
-
- // Check for remaining buffers to be sent!
- if (iovcnt != 0)
- {
- if (this->handler_->get_socket ()->send (iov,
- 1,
- bytes_sent,
- 0,
- this->handler_->qos_session ()->dest_addr (),
- 0,
- 0) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in dgram_mcast.send ()\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
-
- if (n < 1)
- return n;
-
- nbytes += bytes_sent;
- }
-
- return nbytes;
-}
-
-ssize_t
-TAO_AV_UDP_QoS_Transport::send (const char *buf,
- size_t len,
- ACE_Time_Value *)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Transport::send "));
-
- char addr [BUFSIZ];
- this->peer_addr_.addr_to_string (addr,BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "to %s\n",
- addr));
-
- return this->handler_->get_socket ()->send (buf,
- len,
- this->handler_->qos_session ()->dest_addr (),
- 0,
- 0,
- 0);
-}
-
-ssize_t
-TAO_AV_UDP_QoS_Transport::send (const iovec *iov,
- int /*iovcnt*/,
- ACE_Time_Value *)
-{
- size_t bytes_sent = 0;
- if (this->handler_->get_socket ()->send (iov,
- 1,
- bytes_sent,
- 0,
- this->handler_->qos_session ()->dest_addr (),
- 0,
- 0) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in dgram_mcast.send ()\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Using ACE_OS::sendto () : Bytes sent : %d",
- bytes_sent));
-
- return bytes_sent;
-}
-
-ssize_t
-TAO_AV_UDP_QoS_Transport::recv (char *buf,
- size_t len,
- ACE_Time_Value *)
-{
- return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
-}
-
-ssize_t
-TAO_AV_UDP_QoS_Transport::recv (char *buf,
- size_t len,
- int flags,
- ACE_Time_Value *timeout)
-{
- return this->handler_->get_socket ()->recv (buf,
- len,
- this->peer_addr_,
- flags,
- timeout);
-}
-
-ssize_t
-TAO_AV_UDP_QoS_Transport::recv (iovec *iov,
- int /*iovcnt*/,
- ACE_Time_Value *timeout)
-{
- return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
-}
-
-
-//------------------------------------------------------------
-// TAO_AV_UDP_Acceptor
-//------------------------------------------------------------
-
-TAO_AV_UDP_QoS_Acceptor::TAO_AV_UDP_QoS_Acceptor (void)
-{
-}
-
-TAO_AV_UDP_QoS_Acceptor::~TAO_AV_UDP_QoS_Acceptor (void)
-{
-}
-
-int
-TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
-{
- int result = 0;
-
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Acceptor Svc Handler QOS ENABLED \n"));
-
- ACE_QoS_Decorator* qos_decorator;
-
- // Decorate the above handler with QoS functionality.
- ACE_NEW_RETURN (qos_decorator,
- ACE_QoS_Decorator (handler,
- handler->qos_session (),
- this->av_core_->reactor ()),
- -1);
-
- // Initialize the Decorator.
- if (qos_decorator->init () != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "QoS Decorator init () failed.\n"),
- -1);
-
- // Register the decorated Event Handler with the Reactor.
- result = this->av_core_->reactor ()->register_handler (qos_decorator,
- ACE_Event_Handler::QOS_MASK |
- ACE_Event_Handler::READ_MASK);
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in registering the Decorator with the Reactor\n"),
- -1);
-
- return result;
-}
-
-int
-TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
- TAO_AV_Core *av_core,
- TAO_FlowSpec_Entry *entry,
- TAO_AV_Flow_Protocol_Factory *factory)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Acceptor::open "));
-
- this->av_core_ = av_core;
- this->endpoint_ = endpoint;
- this->entry_ = entry;
-
-
- this->flow_protocol_factory_ = factory;
- this->flowname_ = entry->flowname ();
- ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) entry->address ();
-// inet_addr->set (inet_addr->get_port_number (),
-// inet_addr->get_host_name ());
- char buf[BUFSIZ];
- inet_addr->addr_to_string (buf,
- BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Acceptor::open: %s",
- buf));
-
- int result = this->open_i (inet_addr);
-
- if (result < 0)
- return result;
-
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
- TAO_AV_Core *av_core,
- TAO_FlowSpec_Entry *entry,
- TAO_AV_Flow_Protocol_Factory *factory)
-{
- this->av_core_ = av_core;
- this->endpoint_ = endpoint;
- this->entry_ = entry;
- this->flow_protocol_factory_ = factory;
- this->flowname_ = entry->flowname ();
- ACE_INET_Addr *address;
- ACE_NEW_RETURN (address,
- ACE_INET_Addr ("0"),
- -1);
- int result = this->open_i (address);
- if (result < 0)
- return result;
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Acceptor::translate (CosPropertyService::Properties &qos_params,
- ACE_Flow_Spec *ace_flow_spec)
-{
- for (unsigned int i = 0;
- i < qos_params.length ();
- i++)
- {
- if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
- {
- CORBA::Short type;
- qos_params [i].property_value >>= type;
- ace_flow_spec->service_type (type);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
- {
- CORBA::ULong tok_rate;
- qos_params [i].property_value >>= tok_rate;
- ace_flow_spec->token_rate (tok_rate);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Size") == 0)
- {
- CORBA::ULong tok_buck_size;
- qos_params [i].property_value >>= tok_buck_size;
- ace_flow_spec->token_bucket_size (tok_buck_size);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
- {
- CORBA::ULong peak_bw;
- qos_params [i].property_value >>= peak_bw;
- ace_flow_spec->peak_bandwidth (peak_bw);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
- {
- CORBA::ULong lat;
- qos_params [i].property_value >>= lat;
- ace_flow_spec->latency (lat);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
- {
- CORBA::ULong delay_var;
- qos_params [i].property_value >>= delay_var;
- ace_flow_spec->delay_variation (delay_var);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Max_SDU_Size") == 0)
- {
- CORBA::ULong max_sdu;
- qos_params [i].property_value >>= max_sdu;
- ace_flow_spec->delay_variation (max_sdu);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Minimum_Policed_Size") == 0)
- {
- CORBA::ULong min_pol_size;
- qos_params [i].property_value >>= min_pol_size;
- ace_flow_spec->delay_variation (min_pol_size);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "TTL") == 0)
- {
- CORBA::ULong ttl;
- qos_params [i].property_value >>= ttl;
- ace_flow_spec->delay_variation (ttl);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Priority") == 0)
- {
- CORBA::ULong priority;
- qos_params [i].property_value >>= priority;
- ace_flow_spec->delay_variation (priority);
- }
- }
-
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
-{
- int result = -1;
- // TAO_AV_Callback *callback = 0;
- // this->endpoint_->get_callback (this->flowname_.c_str (),
- // callback);
- ACE_INET_Addr *local_addr;
-
- AVStreams::QoS qos;
- this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
- qos);
-
- ACE_Flow_Spec *ace_flow_spec;
- ACE_NEW_RETURN (ace_flow_spec,
- ACE_Flow_Spec,
- -1);
-
- this->translate (qos.QoSParams,
- ace_flow_spec);
-
- ACE_QoS* ace_qos;
-
- ACE_NEW_RETURN (ace_qos,
- ACE_QoS,
- -1);
-
- Fill_ACE_QoS fill_ace_qos;
-
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
- ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex sender qos\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Filled up the Sender QoS parameters\n"));
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
- ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex receiver qos\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Filled up the Receiver QoS parameters\n"));
-
- }
-
- ACE_QoS_Params qos_params;
- FillQoSParams (qos_params,
- 0,
- ace_qos);
-
- // Create a QoS Session Factory.
- ACE_QoS_Session_Factory session_factory;
-
- // Ask the factory to create a QoS session. This could be RAPI or
- // GQoS based on the parameter passed.
-
- //@@YAmuna : Later make this generic for GQoS
- this->qos_session_ =
- session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
-
- // Create a destination address for the QoS session. The same
- // address should be used for the subscribe call later. A copy
- // is made below only to distinguish the two usages of the dest
- // address.
- ACE_INET_Addr dest_addr (*inet_addr);
-
- // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
- // Protocol]. Initialize the QoS session.
- if (this->qos_session_->open (*inet_addr,
- IPPROTO_UDP) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in opening the QoS session\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "QoS session opened successfully\n"));
-
- TAO_AV_UDP_QoS_Flow_Handler* handler;
- ACE_NEW_RETURN (handler,
- TAO_AV_UDP_QoS_Flow_Handler,
- -1);
-
- TAO_AV_Flow_Handler *flow_handler = 0;
- flow_handler = handler;
-
- result = handler->get_socket ()->subscribe (*inet_addr,
- qos_params,
- 1,
- 0,
- AF_INET,
- // ACE_FROM_PROTOCOL_INFO,
- 0,
- 0, // ACE_Protocol_Info,
- 0,
- ACE_OVERLAPPED_SOCKET_FLAG
- | ACE_FLAG_MULTIPOINT_C_LEAF
- | ACE_FLAG_MULTIPOINT_D_LEAF,
- this->qos_session_);
-
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
- else ACE_DEBUG ((LM_DEBUG,
- "Subscribe succeeded\n"));
-
- handler->qos_session (this->qos_session_);
-
- ACE_NEW_RETURN (local_addr,
- ACE_INET_Addr (*inet_addr),
- -1);
-
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- // This is a sender
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- // This is a receiver
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
- }
-
- this->qos_manager_
- = handler->get_socket ()->qos_manager ();
-
- // Set the QoS for the session. Replaces the ioctl () call that
- // was being made previously.
- if (this->qos_session_->qos (handler->get_socket (),
- &this->qos_manager_,
- *ace_qos) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to set QoS\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Setting QOS succeeds.\n"));
-
- // ACE_Time_Value era (10);
- //this->adaptor ().flowspec_entry (this->entry_);
- //this->adaptor ().endpoint (this->endpoint_);
-// TAO_AV_CORE::instance ()->monitor ().register_session (this->qos_session_,
- // adaptor,
- // (void *) TAO_AV_CORE::instance ()->get_adaptor ((char*)this->flowname ()),
- // era,
- // QoS_Monitor::PEAK_BANDWIDTH);
-
- //ll_ace_qos.map ().unbind (g_711);
-
- TAO_AV_Protocol_Object *object =
- this->flow_protocol_factory_->make_protocol_object (this->entry_,
- this->endpoint_,
- flow_handler,
- flow_handler->transport ());
- flow_handler->protocol_object (object);
- // callback->protocol_object (object);
-// this->endpoint_->set_protocol_object (this->flowname_.c_str (),
-// object);
- this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
- this->entry_->protocol_object (object);
-
- char buf[BUFSIZ];
- local_addr->addr_to_string (buf,BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_ACCEPTOR::open:%s \n",
- buf));
-
- this->entry_->set_local_addr (local_addr);
- this->entry_->handler (flow_handler);
-
- // call activate svc handler.
- return this->activate_svc_handler (handler);
-}
-
-int
-TAO_AV_UDP_QoS_Acceptor::close (void)
-{
- return 0;
-}
-
-//------------------------------------------------------------
-// TAO_AV_UDP_Connector
-//------------------------------------------------------------
-TAO_AV_UDP_QoS_Connector::TAO_AV_UDP_QoS_Connector (void)
-{
-}
-
-TAO_AV_UDP_QoS_Connector::~TAO_AV_UDP_QoS_Connector (void)
-{
-}
-
-int
-TAO_AV_UDP_QoS_Connector::open (TAO_Base_StreamEndPoint *endpoint,
- TAO_AV_Core *av_core,
- TAO_AV_Flow_Protocol_Factory *factory)
-
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Connector::open "));
-
- this->endpoint_ = endpoint;
- this->av_core_ = av_core;
- this->flow_protocol_factory_ = factory;
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Connector::translate (CosPropertyService::Properties &qos_params,
- ACE_Flow_Spec *ace_flow_spec)
-{
- for (unsigned int i = 0;
- i < qos_params.length ();
- i++)
- {
- if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
- {
- CORBA::Short type;
- qos_params [i].property_value >>= type;
- ace_flow_spec->service_type (type);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
- {
- CORBA::ULong tok_rate;
- qos_params [i].property_value >>= tok_rate;
- ace_flow_spec->token_rate (tok_rate);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Rate") == 0)
- {
- CORBA::ULong tok_buck_size;
- qos_params [i].property_value >>= tok_buck_size;
- ace_flow_spec->token_bucket_size (tok_buck_size);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
- {
- CORBA::ULong peak_bw;
- qos_params [i].property_value >>= peak_bw;
- ace_flow_spec->peak_bandwidth (peak_bw);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
- {
- CORBA::ULong lat;
- qos_params [i].property_value >>= lat;
- ace_flow_spec->latency (lat);
- }
- else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
- {
- CORBA::ULong delay_var;
- qos_params [i].property_value >>= delay_var;
- ace_flow_spec->delay_variation (delay_var);
- }
-
- }
-
- return 0;
-}
-
-
-int
-TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
- TAO_AV_Transport *&transport)
-{
- int result = -1;
- this->entry_ = entry;
- this->flowname_ = entry->flowname ();
-
-// ACE_INET_Addr *local_addr;
-// ACE_NEW_RETURN (local_addr,
-// ACE_INET_Addr ("0"),
-// -1);
-
- Fill_ACE_QoS fill_ace_qos;
-
- AVStreams::QoS qos;
- this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
- qos);
- ACE_Flow_Spec *ace_flow_spec;
- ACE_NEW_RETURN (ace_flow_spec,
- ACE_Flow_Spec,
- -1);
-
- this->translate (qos.QoSParams,
- ace_flow_spec);
-
- ACE_QoS* ace_qos;
-
- ACE_NEW_RETURN (ace_qos,
- ACE_QoS,
- -1);
-
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
- ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex receiver qos\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Filled up the Receiver QoS parameters\n"));
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
- ace_flow_spec) !=0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to fill simplex sender qos\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Filled up the Sender QoS parameters\n"));
- }
-
- ACE_QoS_Params qos_params;
- FillQoSParams (qos_params,
- 0,
- ace_qos);
-
- ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,
- entry->address ());
- TAO_AV_Flow_Handler *flow_handler = 0;
-
- // Create a QoS Session Factory.
- ACE_QoS_Session_Factory session_factory;
-
- // Ask the factory to create a QoS session. This could be RAPI or
- // GQoS based on the parameter passed.
-
- //@@YAmuna : Later make this generic for GQoS
- this->qos_session_ =
- session_factory.create_session (ACE_QoS_Session_Factory::ACE_RAPI_SESSION);
-
-
- // Create a destination address for the QoS session. The same
- // address should be used for the subscribe call later. A copy
- // is made below only to distinguish the two usages of the dest
- // address.
- ACE_INET_Addr dest_addr (*inet_addr);
-
- // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
- // Protocol]. Initialize the QoS session.
- if (this->qos_session_->open (*inet_addr,
- IPPROTO_UDP) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in opening the QoS session\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "QoS session opened successfully\n"));
-
- TAO_AV_UDP_QoS_Flow_Handler *handler;
- ACE_NEW_RETURN (handler,
- TAO_AV_UDP_QoS_Flow_Handler,
- -1);
-
- handler->qos_session (this->qos_session_);
-
- flow_handler = handler;
-
-
-
-
- result = handler->get_socket ()->subscribe (*inet_addr,
- qos_params,
- 1,
- 0,
- AF_INET,
- // ACE_FROM_PROTOCOL_INFO,
- 0,
- 0, // ACE_Protocol_Info,
- 0,
- ACE_OVERLAPPED_SOCKET_FLAG
- | ACE_FLAG_MULTIPOINT_C_LEAF
- | ACE_FLAG_MULTIPOINT_D_LEAF,
- this->qos_session_);
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_QOS_UDP_MCast_Acceptor::subscribe failed\n"),-1);
- // Now disable Multicast loopback.
- // @@Should we make this a policy?
-
- ACE_INET_Addr *local_addr;
-
- ACE_NEW_RETURN (local_addr,
- ACE_INET_Addr (*inet_addr),
- -1);
-
-
- if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
- {
- // This is a sender
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_SENDER);
- }
- else if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
- {
- // This is a receiver
- this->qos_session_->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
- }
-
- this->qos_manager_ =
- handler->get_socket ()->qos_manager ();
-
- // Set the QoS for the session. Replaces the ioctl () call that
- // was being made previously.
- if (this->qos_session_->qos (handler->get_socket (),
- &this->qos_manager_,
- *ace_qos) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Unable to set QoS\n"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "Setting QOS succeeds.\n"));
-
-
- TAO_AV_Protocol_Object *object =
- this->flow_protocol_factory_->make_protocol_object (this->entry_,
- this->endpoint_,
- flow_handler,
- flow_handler->transport ());
- flow_handler->protocol_object (object);
- // callback->protocol_object (object);
- // this->endpoint_->set_protocol_object (this->flowname_.c_str (),
- // object);
- this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
- this->entry_->protocol_object (object);
-
- char buf[BUFSIZ];
- local_addr->addr_to_string (buf,BUFSIZ);
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_CONNECTOR::connect:%s \n",
- buf));
-
- entry->set_local_addr (local_addr);
- entry->handler (flow_handler);
- transport = flow_handler->transport ();
- // call activate svc handler.
- return this->activate_svc_handler (handler);
-}
-
-int
-TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
-{
- int result = 0;
- ACE_QoS_Decorator* qos_decorator;
-
- // Decorate the above handler with QoS functionality.
- ACE_NEW_RETURN (qos_decorator,
- ACE_QoS_Decorator (handler,
- handler->qos_session (),
- this->av_core_->reactor ()),
- -1);
-
- // Initialize the Decorator.
- if (qos_decorator->init () != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "QoS Decorator init () failed.\n"),
- -1);
-
- // Register the decorated Event Handler with the Reactor.
- result = this->av_core_->reactor ()->register_handler (qos_decorator,
- ACE_Event_Handler::QOS_MASK |
- ACE_Event_Handler::READ_MASK);
-
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in registering the Decorator with the Reactor\n"),
- -1);
-
- return result;
-}
-
-int
-TAO_AV_UDP_QoS_Connector::close (void)
-{
- return 0;
-}
-
-//------------------------------------------------------------
-// TAO_AV_UDP_Protocol_Factory
-//------------------------------------------------------------
-
-TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory (void)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory\n"));
-}
-
-TAO_AV_UDP_QoS_Factory::~TAO_AV_UDP_QoS_Factory (void)
-{
-}
-
-int
-TAO_AV_UDP_QoS_Factory::match_protocol (const char *protocol_string)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Factory::match_protocol\n"));
-
- if (ACE_OS::strcasecmp (protocol_string,"QoS_UDP") == 0)
- return 1;
- return 0;
-}
-
-TAO_AV_Acceptor*
-TAO_AV_UDP_QoS_Factory::make_acceptor (void)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Factory::make_acceptor "));
-
- TAO_AV_Acceptor *acceptor = 0;
- ACE_NEW_RETURN (acceptor,
- TAO_AV_UDP_QoS_Acceptor,
- 0);
- return acceptor;
-}
-
-TAO_AV_Connector*
-TAO_AV_UDP_QoS_Factory::make_connector (void)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Factory::make_connector "));
-
- TAO_AV_Connector *connector = 0;
- ACE_NEW_RETURN (connector,
- TAO_AV_UDP_QoS_Connector,
- 0);
- return connector;
-}
-
-int
-TAO_AV_UDP_QoS_Factory::init (int /* argc */,
- char * /* argv */ [])
-{
- return 0;
-}
-
-
-//------------------------------------------------------------
-// TAO_AV_UDP_Flow_Factory
-//------------------------------------------------------------
-TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory (void)
-{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory\n"));
-}
-
-TAO_AV_UDP_QoS_Flow_Factory::~TAO_AV_UDP_QoS_Flow_Factory (void)
-{
-}
-
-int
-TAO_AV_UDP_QoS_Flow_Factory::init (int /* argc */,
- char * /* argv */ [])
-{
- return 0;
-}
-
-int
-TAO_AV_UDP_QoS_Flow_Factory::match_protocol (const char *flow_string)
-{
- if (ACE_OS::strcasecmp (flow_string,"QoS_UDP") == 0)
- return 1;
- return 0;
-}
-
-TAO_AV_Protocol_Object*
-TAO_AV_UDP_QoS_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
- TAO_Base_StreamEndPoint *endpoint,
- TAO_AV_Flow_Handler *handler,
- TAO_AV_Transport *transport)
-{
- TAO_AV_Callback *callback = 0;
- endpoint->get_callback (entry->flowname (),
- callback);
-
-
- TAO_AV_UDP_Object *object = 0;
- ACE_NEW_RETURN (object,
- TAO_AV_UDP_Object (callback,
- transport),
- 0);
- callback->open (object,
- handler);
- endpoint->set_protocol_object (entry->flowname (),
- object);
- return object;
-}
-
-ACE_FACTORY_DEFINE (AV, TAO_AV_UDP_QoS_Flow_Factory)
-ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Flow_Factory,
- ACE_TEXT ("UDP_QoS_Flow_Factory"),
- ACE_SVC_OBJ_T,
- &ACE_SVC_NAME (TAO_AV_UDP_QoS_Flow_Factory),
- ACE_Service_Type::DELETE_THIS |
- ACE_Service_Type::DELETE_OBJ,
- 0)
-
-ACE_FACTORY_DEFINE (AV, TAO_AV_UDP_QoS_Factory)
-
-ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Factory,
- ACE_TEXT ("UDP_QoS_Factory"),
- ACE_SVC_OBJ_T,
- &ACE_SVC_NAME (TAO_AV_UDP_QoS_Factory),
- ACE_Service_Type::DELETE_THIS |
- ACE_Service_Type::DELETE_OBJ,
- 0)
-
-