diff options
-rw-r--r-- | ace/Makefile | 4 | ||||
-rw-r--r-- | ace/QoS_Session.h | 128 | ||||
-rw-r--r-- | ace/QoS_Session_Factory.cpp | 96 | ||||
-rw-r--r-- | ace/QoS_Session_Factory.h | 88 | ||||
-rw-r--r-- | ace/QoS_Session_Impl.cpp | 510 | ||||
-rw-r--r-- | ace/QoS_Session_Impl.h | 191 | ||||
-rw-r--r-- | ace/QoS_Session_Impl.i | 120 | ||||
-rw-r--r-- | ace/SOCK.cpp | 29 | ||||
-rw-r--r-- | ace/SOCK.h | 21 | ||||
-rw-r--r-- | ace/SOCK_Dgram_Mcast.cpp | 26 | ||||
-rw-r--r-- | ace/SOCK_Dgram_Mcast.h | 8 |
11 files changed, 1212 insertions, 9 deletions
diff --git a/ace/Makefile b/ace/Makefile index 5b2dafb47ec..1f7054104ce 100644 --- a/ace/Makefile +++ b/ace/Makefile @@ -182,7 +182,9 @@ OTHER_FILES = \ Name_Space \ Naming_Context \ Registry_Name_Space \ - Remote_Name_Space + Remote_Name_Space \ + QoS_Session_Impl \ + QoS_Session_Factory TEMPLATE_FILES = \ Acceptor \ diff --git a/ace/QoS_Session.h b/ace/QoS_Session.h new file mode 100644 index 00000000000..53545809117 --- /dev/null +++ b/ace/QoS_Session.h @@ -0,0 +1,128 @@ +/* -*- C++ -*- */ +// $Id$ + +// =========================================================================== +// +// = LIBRARY +// ace +// +// = FILENAME +// QoS_Session.h +// +// = AUTHOR +// Vishal Kachroo <vishal@cs.wustl.edu> +// +// =========================================================================== + +#ifndef ACE_QOS_SESSION_H +#define ACE_QOS_SESSION_H + +#include "ace/INET_Addr.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_SOCK; +#include "ace/Containers_T.h" + +typedef int ACE_Protocol_ID; +// IPPROTO_UDP or IPPROTO_TCP. + +class ACE_Export ACE_QoS_Session +{ + // = TITLE + // A QoS Session object. + // + // = DESCRIPTION + // This class defines the interface for a QoS Session. It abstracts the + // notion of QoS on different platforms and presents a simple, easy-to-use + // API. Current [RAPI,GQoS] and future implementations will conform to this + // interface. + +public: + + enum ACE_End_Point_Type + { + ACE_QOS_SENDER, + ACE_QOS_RECEIVER, + ACE_QOS_BOTH + }; + // A flag to indicate if this endpoint is a sender or a receiver or both. + + virtual ~ACE_QoS_Session (void); + // to shutup g++. + + virtual int open (ACE_INET_Addr dest_addr, + ACE_Protocol_ID protocol_id) = 0; + // Open a QoS session [dest IP, dest port, Protocol ID]. + + virtual int close (void) = 0; + // Close the QoS Session. + + virtual ACE_QoS qos (void) const = 0; + // Returns the QoS in the current session. + + virtual int qos (ACE_SOCK *socket, + const ACE_QoS &ace_qos) = 0; + // Set QoS for the current session. The socket parameter is used to confirm if + // this QoS session was subscribed to by the socket. + + virtual void qos (const ACE_QoS &ace_qos) = 0; + // Sets the QoS for this session object to ace_qos. Does not interfere with the + // QoS in the underlying socket. This call is useful to update the QoS object + // when the underlying socket QoS is being set through a mechanism other than + // the previous qos () method e.g. inside the dgram_mcast.subscribe () where the + // QoS for the socket is set through ACE_OS::join_leaf (). + + virtual int update_qos (void) = 0; + // This is called from handle_qos () method of the the QoS Event Handler. + // Invoking this method is an indication of a QoS event occurring, that may have + // resulted in a change of QoS for the underlying session. This method updates + // the QoS object associated with this session. + + virtual int session_id (void) const = 0; + // Get the session id. + + virtual void session_id (const int session_id) = 0; + // Set the session id. + + virtual ACE_INET_Addr dest_addr (void) const = 0; + // Get the destination address for this session. + + virtual void dest_addr (const ACE_INET_Addr &dest_addr) = 0; + // Set the destination address for this session. + + virtual int version (void) = 0; + // Returns the version of the underlying RSVP implementation. Is + // meaningful only when the underlying implementation has versioning. + +protected: + + int session_id_; + // session id for the session. + + ACE_INET_Addr dest_addr_; + // Destination address for this session. + + ACE_Protocol_ID protocol_id_; + // Is this a TCP or a UDP session. + + ACE_QoS qos_; + // QoS for this session. + + ACE_End_Point_Type flags_; + // Specifies if this is a sending/receiving/both session. + +}; + +#endif /* ACE_QOS_SESSION_H */ + + + + + + + + + diff --git a/ace/QoS_Session_Factory.cpp b/ace/QoS_Session_Factory.cpp new file mode 100644 index 00000000000..1e0e892bbd0 --- /dev/null +++ b/ace/QoS_Session_Factory.cpp @@ -0,0 +1,96 @@ +// QoS_Session_Factory.cpp +// $Id$ + +#define ACE_BUILD_DLL + +#include "ace/QoS_Session_Factory.h" +#include "ace/QoS_Session_Impl.h" + +ACE_RCSID(ace, QoS_Session_Factory, "$Id$") + +ACE_ALLOC_HOOK_DEFINE(ACE_QoS_Session_Factory) + +ACE_QoS_Session_Factory::ACE_QoS_Session_Factory (void) +{ + ACE_TRACE ("ACE_QoS_Session_Factory::ACE_QoS_Session_Factory"); +} + +ACE_QoS_Session_Factory::~ACE_QoS_Session_Factory (void) +{ + ACE_TRACE ("ACE_QoS_Session_Factory::~ACE_QoS_Session_Factory"); +} + +// Create a QoS session of the given type (RAPI or GQoS). +ACE_QoS_Session * +ACE_QoS_Session_Factory::create_session (ACE_QoS_Session_Type qos_session_type) +{ + + ACE_QoS_Session * qos_session = 0; + +#if defined (ACE_HAS_RAPI) + if (qos_session_type == ACE_RAPI_SESSION) + ACE_NEW_RETURN (qos_session, + ACE_RAPI_Session, + 0); +#endif /* ACE_HAS_RAPI */ + + if (qos_session_type == ACE_GQOS_SESSION) + ACE_NEW_RETURN (qos_session, + ACE_GQoS_Session, + 0); + + if (this->add_session (qos_session) == -1) + { + delete qos_session; + ACE_ERROR_RETURN ((LM_ERROR, + "Error in adding session\n"), + 0); + } + + return qos_session; +} + +// Destroy the QoS Session. +int +ACE_QoS_Session_Factory::destroy_session (ACE_QoS_Session *qos_session) +{ + + if ((qos_session != 0) && (this->remove_session (qos_session) == -1)) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in destroying session\n"), + -1); + + return 0; +} + +// Add a session to the set of sessions created by this factory. This is a +// private method called by the create_session (). +int +ACE_QoS_Session_Factory::add_session (ACE_QoS_Session *qos_session) +{ + if (this->qos_session_set_.insert (qos_session) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in adding a new session to the session set\n"), + -1); + + return 0; +} + +// Remove a session from the set of sessions created by this factory. This is a +// private method called by the destroy_session (). +int +ACE_QoS_Session_Factory::remove_session (ACE_QoS_Session *qos_session) +{ + if (this->qos_session_set_.remove (qos_session) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in removing a session from the session set\n"), + -1); + + return 0; +} + + + + + + diff --git a/ace/QoS_Session_Factory.h b/ace/QoS_Session_Factory.h new file mode 100644 index 00000000000..6d4f2d20044 --- /dev/null +++ b/ace/QoS_Session_Factory.h @@ -0,0 +1,88 @@ +/* -*- C++ -*- */ +// $Id$ + +// =========================================================================== +// +// = LIBRARY +// ace +// +// = FILENAME +// QoS_Session_Factory.h +// +// = AUTHOR +// Vishal Kachroo <vishal@cs.wustl.edu> +// +// =========================================================================== + +#ifndef ACE_QOS_SESSION_FACTORY_H +#define ACE_QOS_SESSION_FACTORY_H + +#include "ace/QoS_Session.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Containers_T.h" + +class ACE_QoS_Session; +// Forward declare this, so the factory uses only references to this. + +class ACE_Export ACE_QoS_Session_Factory +{ + // = TITLE + // Concrete factory for the QoS Session objects. + // + // = DESCRIPTION + // This class manages the life cycle of QoS Session objects. These + // objects are currently either RAPI session objects or GQoS session + // objects. It stores the sessions in an unbounded set. + +public : + + // = Initialization and termination methods. + ACE_QoS_Session_Factory (void); + // Default constructor. + + ~ACE_QoS_Session_Factory (void); + // Default destructor. + + enum ACE_QoS_Session_Type + { + ACE_RAPI_SESSION, + ACE_GQOS_SESSION + }; + // Types of sessions for this factory to manage. + + ACE_QoS_Session * create_session (ACE_QoS_Session_Type qos_session_type); + // Create a QoS session of the given type (RAPI or GQoS). + + int destroy_session (ACE_QoS_Session *qos_session); + // Destroy the QoS Session. + +private: + + int add_session (ACE_QoS_Session *qos_session); + // Used by the create_session () to add new sessions to the + // set of sessions created by this factory. + + int remove_session (ACE_QoS_Session *qos_session); + // Used by the destroy_session () to remove a session from the set + // of sessions created by this factory. + + typedef ACE_Unbounded_Set <ACE_QoS_Session *> QOS_SESSION_SET; + QOS_SESSION_SET qos_session_set_; + // Unordered set of QoS Sessions. + +}; + +#endif /* ACE_QOS_SESSION_FACTORY_H */ + + + + + + + + + diff --git a/ace/QoS_Session_Impl.cpp b/ace/QoS_Session_Impl.cpp new file mode 100644 index 00000000000..52286687b60 --- /dev/null +++ b/ace/QoS_Session_Impl.cpp @@ -0,0 +1,510 @@ +// QoS_Session_Impl.cpp +// $Id$ + +#define ACE_BUILD_DLL + +#include "ace/QoS_Session_Impl.h" +#include "ace/SOCK.h" + +#if defined (ACE_LACKS_INLINE_FUNCTIONS) +#include "ace/QoS_Session_Impl.i" +#endif + +ACE_RCSID(ace, QoS_Session_Impl, "$Id$") + +ACE_ALLOC_HOOK_DEFINE(ACE_QoS_Session_Impl) + +#if defined (ACE_HAS_RAPI) +#include "rapi/rapi_err.h" + +int ACE_RAPI_Session::rsvp_error = 0; + +// Call back function used by RAPI to report RSVP events. This function translates +// the RAPI QoS parameters into the more generic ACE_QoS parameters for the +// underlying RAPI session. +int +rsvp_callback (rapi_sid_t sid, + rapi_eventinfo_t eventype, + int style_id, + int errcode, + int errvalue, + sockaddr * errnode, + u_char errflags, + int filter_spec_no, + rapi_filter_t *filter_spec_list, + int flow_spec_no, + rapi_flowspec_t *flow_spec_list, + int ad_spec_no, + rapi_adspec_t *ad_spec_list, + void *args + ) +{ + + ACE_QoS_Session * qos_session = (ACE_QoS_Session *) args; + + // Extended Legacy format. + qos_flowspecx_t *csxp = &flow_spec_list->specbody_qosx; + + ACE_Flow_Spec sending_fs (csxp->xspec_r, + csxp->xspec_b, + csxp->xspec_p, + 0, + 0, + 0, + csxp->xspec_M, + csxp->xspec_m, + 25, + 0); + + switch(eventype) + { + case RAPI_PATH_EVENT: + ACE_DEBUG ((LM_DEBUG, + "RSVP PATH Event received\n")); + + ACE_DEBUG ((LM_DEBUG, + "No. of TSpecs received : %d\n", + flow_spec_no)); + + // Set the sending flowspec QoS of the given session. + qos_session->qos ().sending_flowspec (sending_fs); + + break; + + case RAPI_RESV_EVENT: + ACE_DEBUG ((LM_DEBUG, + "RSVP RESV Event received\n")); + + ACE_DEBUG ((LM_DEBUG, + "No. of FlowSpecs received : %d\n", + flow_spec_no)); + + // Choose based on the service type : [QOS_GUARANTEEDX/QOS_CNTR_LOAD]. + switch (csxp->spec_type) + { + case QOS_GUARANTEEDX: + // Slack term in MICROSECONDS + qos_session->qos ().receiving_flowspec ().delay_variation (csxp->xspec_S); + + // @@How does the guaranteed rate parameter map to the ACE_Flow_Spec. + // Note there is no break !! + + case QOS_CNTR_LOAD: + + // qos_service_type. + qos_session->qos ().receiving_flowspec ().service_type (csxp->spec_type); + // Token Bucket Average Rate (B/s) + qos_session->qos ().receiving_flowspec ().token_rate (csxp->xspec_r); + // Token Bucket Rate (B) + qos_session->qos ().receiving_flowspec ().token_bucket_size (csxp->xspec_b); + // Peak Data Rate (B/s) + qos_session->qos ().receiving_flowspec ().peak_bandwidth (csxp->xspec_p); + // Minimum Policed Unit (B) + qos_session->qos ().receiving_flowspec ().minimum_policed_size (csxp->xspec_m); + // Max Packet Size (B) + qos_session->qos ().receiving_flowspec ().max_sdu_size (csxp->xspec_M); + + break; + + default: + ACE_ERROR_RETURN ((LM_ERROR, + "Unknown flowspec type.\n"), + 0); + }; + + break; + + case RAPI_PATH_ERROR: + ACE_DEBUG ((LM_DEBUG, + "PATH ERROR Event received\n" + "Code=%d Val=%d Node= %s\n", + errcode, + errvalue, + ACE_OS::inet_ntoa(((sockaddr_in *)errnode)->sin_addr))); + + break; + + case RAPI_RESV_ERROR: + ACE_DEBUG ((LM_DEBUG, + "RESV ERROR Event received\n" + "Code=%d Val=%d Node= %s\n", + errcode, + errvalue, + ACE_OS::inet_ntoa(((sockaddr_in *)errnode)->sin_addr))); + break; + + case RAPI_RESV_CONFIRM: + ACE_DEBUG ((LM_DEBUG, + "RESV CONFIRM Event received\n")); + break; + + } + +} + +// Constructor. +ACE_RAPI_Session::ACE_RAPI_Session (void) +{ + ACE_TRACE ("ACE_RAPI_Session::ACE_RAPI_Session"); +} + +// Open a RAPI QoS session [dest IP, dest port, Protocol ID]. +int +ACE_RAPI_Session::open (ACE_INET_Addr dest_addr, + ACE_Protocol_ID protocol_id) +{ + this->dest_addr_ = dest_addr; + this->protocol_id_ = protocol_id; + + // Open a RAPI session. Note "this" is being passed as an argument to + // the callback function. The callback function uses this argument to + // update the QoS of this session based on the RSVP event it receives. + if (this->session_id_ = rapi_session((sockaddr *) dest_addr.get_addr (), + protocol_id, + 0, + rsvp_callback, + (void *) this, + &rsvp_error) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "rapi_session () call fails. Error\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "rapi_session () call succeeds\n")); + + return 0; +} + +// Close the RAPI QoS Session. +int +ACE_RAPI_Session::close (void) +{ + if (rsvp_error = rapi_release(this->session_id_)) + ACE_ERROR_RETURN ((LM_ERROR, + "Can't release RSVP session:\n\t%s\n", + rapi_errlist[rsvp_error]), + -1); + return 0; +} + +int +ACE_RAPI_Session::qos (ACE_SOCK *socket, + const ACE_QoS &ace_qos) +{ + ACE_UNUSED_ARG (socket); + + // If sender : call sending_qos () + // If receiver : call receiving_qos () + // If both : call sending_qos () and receiving_qos () + + if (this->flags_ != ACE_QOS_RECEIVER) + return this->sending_qos (ace_qos); + + if (this->flags_ != ACE_QOS_SENDER) + return this->receiving_qos (ace_qos); + + return 0; +} + +// Set sending QoS for this RAPI session. +int +ACE_RAPI_Session::sending_qos (const ACE_QoS &ace_qos) +{ + + ACE_Flow_Spec sending_flowspec = ace_qos.sending_flowspec (); + rapi_tspec_t *t_spec = init_tspec_simplified (sending_flowspec); + + if (t_spec == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in translating from ACE Flow Spec to" + " RAPI TSpec\n"), + -1); + + char buffer[BUFSIZ]; + + // This formats the t_spec in a visually intuitive char * that can + // be printed. + + (void) rapi_fmt_tspec(t_spec, buffer, sizeof(buffer)); + ACE_DEBUG ((LM_DEBUG, + "\nSender TSpec : %s\n", + buffer)); + + // Print out all the fields separately. + ACE_DEBUG ((LM_DEBUG, + "\nTSpec :\n" + "\t Spec Type = %d\n" + "\t Rate = %f\n" + "\t Bucket = %f\n" + "\t Peak = %f\n" + "\t MPU = %d\n" + "\t MDU = %d\n" + "\t\t TTL = %d\n", + t_spec->tspecbody_qosx.spec_type, + t_spec->tspecbody_qosx.xtspec_r, + t_spec->tspecbody_qosx.xtspec_b, + t_spec->tspecbody_qosx.xtspec_p, + t_spec->tspecbody_qosx.xtspec_m, + t_spec->tspecbody_qosx.xtspec_M, + sending_flowspec.ttl ())); + + // @@Hardcoded port. This should be changed later. + ACE_INET_Addr sender_addr (8001); + + // Set the Sender TSpec for this QoS session. + if(rapi_sender(this->session_id_, + 0, + (sockaddr *) sender_addr.get_addr (), + NULL, + t_spec, + NULL, + NULL, + sending_flowspec.ttl ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "rapi_sender error:\n\tPATH Generation can't be started\n"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "rapi_sender () call succeeds ! \n")); + return 0; +} + +// Set receiving QoS for this RAPI session. +int +ACE_RAPI_Session::receiving_qos (const ACE_QoS &ace_qos) +{ + + ACE_Flow_Spec receiving_flowspec = ace_qos.receiving_flowspec (); + rapi_flowspec_t *flow_spec = init_flowspec_simplified (receiving_flowspec); + + if (flow_spec == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in translating from ACE Flow Spec to" + " RAPI FlowSpec\n"), + -1); + + char buffer[BUFSIZ]; + + // This formats the flow_spec in a visually intuitive char * that can + // be printed. + (void)rapi_fmt_flowspec(flow_spec, buffer, sizeof(buffer)); + ACE_DEBUG ((LM_DEBUG, + "\nReceiver FlowSpec : %s\n", + buffer)); + + // Print out all the fields separately. + ACE_DEBUG ((LM_DEBUG, + "\nFlowSpec :\n" + "\t Spec Type = %d\n" + "\t Rate = %f\n" + "\t Bucket = %f\n" + "\t Peak = %f\n" + "\t MPU = %d\n" + "\t MDU = %d\n", + flow_spec->specbody_qosx.spec_type, + flow_spec->specbody_qosx.xspec_r, + flow_spec->specbody_qosx.xspec_b, + flow_spec->specbody_qosx.xspec_p, + flow_spec->specbody_qosx.xspec_m, + flow_spec->specbody_qosx.xspec_M)); + + // @@Hardcoded port. This should be changed later. + ACE_INET_Addr receiver_addr (8002); + + // Set the Receiver FlowSpec for this QoS session. + // @@The filter style is hardcoded to WildCard. This can be changed later. + if (rapi_reserve(this->session_id_, + RAPI_REQ_CONFIRM, + // Setting the RAPI_REQ_CONFIRM flag requests confirmation + // of the resevation, by means of a confirmation upcall of + // type RAPI_RESV_CONFIRM. + (sockaddr *)receiver_addr.get_addr (), + RAPI_RSTYLE_WILDCARD, + // This applies the flowspec to all the senders. Given this, + // @@I am passing the filter_spec to be null, hoping this will work. + NULL, + NULL, + 0, + NULL, + // The filter spec is NULL. This should work since the RSTYLE is + // WILDCARD. + 1, + flow_spec) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "rapi_reserve () error:\n\tRESV Generation can't be started\n"), + -1); + + return 0; +} + +int +ACE_RAPI_Session::update_qos (void) +{ + if ((rsvp_error = rapi_dispatch ()) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in rapi_dispatch () : %s\n", + rapi_errlist[rsvp_error]), + -1); + return 0; +} + +// Construct a simplified RAPI Sender TSpec object +// from an ACE_Flow_Spec. Note the form of the TSpec is +// simplified as against the full bodied IntServ version. + +rapi_tspec_t * +ACE_RAPI_Session::init_tspec_simplified (const ACE_Flow_Spec &flow_spec) +{ + rapi_tspec_t *t_spec; + + ACE_NEW_RETURN (t_spec, + rapi_tspec_t, + 0); + + qos_tspecx_t *ctxp = &(t_spec->tspecbody_qosx); + + // There may be some type incompatibility here. + // Note the types of the LHS are float32_t, uint32_t etc. + + ctxp->spec_type = QOS_TSPEC; + ctxp->xtspec_r = flow_spec.token_rate (); // Token Rate (B/s) + ctxp->xtspec_b = flow_spec.token_bucket_size (); // Token Bucket Depth (B) + ctxp->xtspec_p = flow_spec.peak_bandwidth (); // Peak Data Rate (B/s) + ctxp->xtspec_m = flow_spec.minimum_policed_size (); // Minimum policed unit. + + // @@Hardcoded for the time being. + ctxp->xtspec_M = 65535; // Maximum SDU size. + + t_spec->len = sizeof(rapi_hdr_t) + sizeof(qos_tspecx_t); + t_spec->form = RAPI_TSPECTYPE_Simplified; + + return (t_spec); +} + + +// Construct a simplified RAPI flowspec object from ACE_Flow_Spec. +// Note the form of the FlowSpec is simplified as against the +// full bodied IntServ version. + +rapi_flowspec_t * +ACE_RAPI_Session::init_flowspec_simplified(const ACE_Flow_Spec &flow_spec) +{ + rapi_flowspec_t *flowsp; + ACE_NEW_RETURN (flowsp, + rapi_flowspec_t, + 0); + + // Extended Legacy format. + qos_flowspecx_t *csxp = &flowsp->specbody_qosx; + + // Choose based on the service type : [QOS_GUARANTEEDX/QOS_CNTR_LOAD]. + switch (flow_spec.service_type ()) + { + case QOS_GUARANTEEDX: + csxp->xspec_R = 0 ; // Guaranteed Rate B/s. @@How does this map to the + // ACE Flow Spec Parameters. + + csxp->xspec_S = flow_spec.delay_variation () ; // Slack term in MICROSECONDS + + // Note there is no break !! + + case QOS_CNTR_LOAD: + csxp->spec_type = flow_spec.service_type (); // qos_service_type + csxp->xspec_r = flow_spec.token_rate (); // Token Bucket Average Rate (B/s) + csxp->xspec_b = flow_spec.token_bucket_size (); // Token Bucket Rate (B) + csxp->xspec_p = flow_spec.peak_bandwidth (); // Peak Data Rate (B/s) + csxp->xspec_m = flow_spec.minimum_policed_size (); // Minimum Policed Unit (B) + + // @@Hardcoded Max. Pkt. size. + csxp->xspec_M = 65535; // Max Packet Size (B) + + flowsp->form = RAPI_FLOWSTYPE_Simplified; + break; + + default: + ACE_ERROR_RETURN ((LM_ERROR, + "Unknown flowspec type.\n"), + 0); + } + + flowsp->len = sizeof(rapi_flowspec_t); + return flowsp; +} + +#endif /* ACE_HAS_RAPI */ + +// This is a GQoS session ID generator. +int ACE_GQoS_Session::GQoS_session_id = 0; + +// Constructor. +ACE_GQoS_Session::ACE_GQoS_Session (void) +{ + ACE_TRACE ("ACE_GQoS_Session::ACE_GQoS_Session"); +} + +// Open a GQoS session [dest IP, dest port, Protocol ID]. +int +ACE_GQoS_Session::open (ACE_INET_Addr dest_addr, + ACE_Protocol_ID protocol_id) +{ + this->dest_addr_ = dest_addr; + this->protocol_id_ = protocol_id; + + this->session_id_ = GQoS_session_id++; + + return 0; +} + +// Close the GQoS Session. +int +ACE_GQoS_Session::close (void) +{ + // TBD. + return 0; +} + +// Set the QoS for this GQoS session. +int +ACE_GQoS_Session::qos (ACE_SOCK *socket, + const ACE_QoS &ace_qos) +{ + + // Confirm if the current session is one of the QoS sessions subscribed + // to by the given socket. + + if (socket->qos_session_set ().find (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "This QoS session was not subscribed to" + " by the socket\n"), + -1); + + // Set the QOS according to the supplied ACE_QoS. The I/O control + // code used under the hood is SIO_SET_QOS. + + u_long ret_bytes = 0; + + ACE_QoS qos = ace_qos; + if (ACE_OS::ioctl (socket->get_handle (), + ACE_SIO_SET_QOS, + qos, + &ret_bytes) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in Qos set ACE_OS::ioctl() %d\n", + ret_bytes), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "Setting QoS with ACE_OS::ioctl () succeeds \n")); + + return 0; +} + +int +ACE_GQoS_Session::update_qos (void) +{ + return 0; +} + + + + diff --git a/ace/QoS_Session_Impl.h b/ace/QoS_Session_Impl.h new file mode 100644 index 00000000000..80da7b4e6d4 --- /dev/null +++ b/ace/QoS_Session_Impl.h @@ -0,0 +1,191 @@ +/* -*- C++ -*- */ +// $Id$ + +// =========================================================================== +// +// = LIBRARY +// ace +// +// = FILENAME +// QoS_Session_Impl.h +// +// = AUTHOR +// Vishal Kachroo <vishal@cs.wustl.edu> +// +// =========================================================================== + +#ifndef ACE_QOS_SESSION_IMPL_H +#define ACE_QOS_SESSION_IMPL_H + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/QoS_Session.h" + +#if defined (ACE_HAS_RAPI) +#include "rapi/rapi_lib.h" + +class ACE_Export ACE_RAPI_Session : public ACE_QoS_Session +{ + // = TITLE + // A RAPI QoS session object. + // + // = DESCRIPTION + // This class is a RAPI (RSVP API, an implementation of RSVP on UNIX) + // implementation of the ACE_QoS_Session interface. + +public: + + ~ACE_RAPI_Session (void); + // Default destructor. + + static int rsvp_error; + // Error handling for RSVP callback + + virtual int open (ACE_INET_Addr dest_addr, + ACE_Protocol_ID protocol_id); + // Open a RAPI QoS session [dest IP, dest port, Protocol ID]. + + virtual int close (void); + // Close the RAPI QoS Session. + + virtual ACE_QoS qos (void) const; + // Returns the QoS for this RAPI session. + + virtual int qos (ACE_SOCK *socket, + const ACE_QoS &ace_qos); + // Set QoS for this RAPI session. The socket parameter is used to confirm if + // this QoS session was subscribed to by the socket. + + virtual void qos (const ACE_QoS &ace_qos); + // Sets the QoS for this session object to ace_qos. Does not interfere with the + // QoS in the underlying socket. This call is useful to update the QoS object + // when the underlying socket QoS is being set through a mechanism other than + // the previous qos () method e.g. inside the dgram_mcast.subscribe () where the + // QoS for the socket is set through ACE_OS::join_leaf (). + + virtual int update_qos (void); + // Calls rapi_dispatch () that further triggers the call back function. + // It is a mechanism of updating the QoS for this session asynchronously, as + // RSVP events occur. + + virtual int session_id (void) const; + // Get the RAPI session id. + + virtual void session_id (const int session_id); + // Set the RAPI session id. + + virtual ACE_INET_Addr dest_addr (void) const; + // Get the destination address for this RAPI session. + + virtual void dest_addr (const ACE_INET_Addr &dest_addr); + // Set the destination address for this RAPI session. + + virtual int version (); + // RAPI version. Returned value = 100 * major-version + minor-version. + + friend class ACE_QoS_Session_Factory; + // The factory is a friend so it can create this object through + // the only private constructor. + +private: + + ACE_RAPI_Session (void); + // Default constuctor. Constructor is defined private so that only + // the friend factory can instantiate this class. + + rapi_tspec_t *init_tspec_simplified (const ACE_Flow_Spec &flow_spec); + // Construct a simplified RAPI Sender TSpec object + // from an ACE_Flow_Spec object. Used internally by this class. + + rapi_flowspec_t *init_flowspec_simplified(const ACE_Flow_Spec &flow_spec); + // Construct a simplified RAPI Receiver FlowSpec object + // from an ACE_Flow_Spec object. Used internally by the class. + + int sending_qos (const ACE_QoS &ace_qos); + // Set sending QoS for this RAPI session. + + int receiving_qos (const ACE_QoS &ace_qos); + // Set receiving QoS for this RAPI session. + +}; + +#endif /* ACE_HAS_RAPI */ + +class ACE_Export ACE_GQoS_Session : public ACE_QoS_Session +{ + // = TITLE + // A GQoS session object. + // + // = DESCRIPTION + // This class is a GQoS (Generic QoS, an implementation of RSVP on + // Win2K) implementation of the ACE_QoS_Session interface. + +public: + + ~ACE_GQoS_Session (void); + // Default destructor. + + static int GQoS_session_id; + // This is a session ID generator. It does a lot more than expected + // from an int!. + + virtual int open (ACE_INET_Addr dest_addr, + ACE_Protocol_ID protocol_id); + // Open a GQoS session [dest IP, dest port, Protocol ID]. + + virtual int close (void); + // Close the GQoS Session. + + virtual ACE_QoS qos (void) const; + // Returns the QoS for this GQoS session. + + virtual int qos (ACE_SOCK *socket, + const ACE_QoS &ace_qos); + // Set QoS for this GQoS session. The socket parameter is used to confirm if + // this QoS session was subscribed to by the socket. + + virtual void qos (const ACE_QoS &ace_qos); + // Sets the QoS for this session object to ace_qos. Does not interfere with the + // QoS in the underlying socket. This call is useful to update the QoS object + // when the underlying socket QoS is being set through a mechanism other than + // the previous qos () method e.g. inside the dgram_mcast.subscribe () where the + // QoS for the socket is set through ACE_OS::join_leaf (). + + virtual int update_qos (void); + // Calls the ioctl (ACE_SIO_GET_QOS). It is a mechanism of updating the + // QoS for this session asynchronously, as RSVP events occur. + + virtual ACE_INET_Addr dest_addr (void) const; + // Get the destination address for this GQoS session. + + virtual void dest_addr (const ACE_INET_Addr &dest_addr); + // Set the destination address for this GQoS session. + + virtual int session_id (void) const; + // Get the GQoS session id. + + virtual void session_id (const int session_id); + // Set the GQoS session id. + + virtual int version (); + // GQoS version. + + friend class ACE_QoS_Session_Factory; + // The factory is a friend so it can create this object through + // the only private constructor. + +private: + + ACE_GQoS_Session (void); + // Default constructor. Constructor is defined private so that only + // the friend factory can instantiate this class. + +}; + +#if !defined (ACE_LACKS_INLINE_FUNCTIONS) +#include "ace/QoS_Session_Impl.i" +#endif /* ACE_LACKS_INLINE_FUNCTIONS */ + +#endif /*ACE_QOS_SESSION_IMPL_H */ diff --git a/ace/QoS_Session_Impl.i b/ace/QoS_Session_Impl.i new file mode 100644 index 00000000000..2f696092013 --- /dev/null +++ b/ace/QoS_Session_Impl.i @@ -0,0 +1,120 @@ +/* -*- C++ -*- */ +// $Id$ + +// QoS_Session_Impl.i + +#if defined (ACE_HAS_RAPI) + +ACE_INLINE +ACE_RAPI_Session::~ACE_RAPI_Session (void) +{ + ACE_TRACE ("ACE_RAPI_Session::~ACE_RAPI_Session"); +} + +// Returns the QoS for this RAPI session. +ACE_INLINE ACE_QoS +ACE_RAPI_Session::qos (void) const +{ + return this->qos_; +} + +// Overloaded method to set the QoS for this session object. Does not +// interfere with the underlying socket QoS. +ACE_INLINE void +ACE_RAPI_Session::qos (const ACE_QoS &ace_qos) +{ + this->qos_ = ace_qos; +} + +// Get the RAPI session id. +ACE_INLINE int +ACE_RAPI_Session::session_id (void) const +{ + return this->session_id_; +} + +// Set the RAPI session id. +ACE_INLINE void +ACE_RAPI_Session::session_id (const int session_id) +{ + this->session_id_ = session_id; +} + +// Get the destination address for this RAPI session. +ACE_INLINE ACE_INET_Addr +ACE_RAPI_Session::dest_addr (void) const +{ + return this->dest_addr_; +} + +// Set the destination address for this RAPI session. +ACE_INLINE void +ACE_RAPI_Session::dest_addr (const ACE_INET_Addr &dest_addr) +{ + this->dest_addr_ = dest_addr; +} + +// RAPI version. Returned value = 100 * major-version + minor-version. +ACE_INLINE int +ACE_RAPI_Session::version (void) +{ + return 0; +} + +#endif /* ACE_HAS_RAPI */ + +ACE_INLINE +ACE_GQoS_Session::~ACE_GQoS_Session (void) +{ + ACE_TRACE ("ACE_GQoS_Session::~ACE_GQoS_Session"); +} + +// Returns the QoS for this GQoS session. +ACE_INLINE ACE_QoS +ACE_GQoS_Session::qos (void) const +{ + return this->qos_; +} + +// Overloaded method to set the QoS for this session object. Does not +// interfere with the underlying socket QoS. +ACE_INLINE void +ACE_GQoS_Session::qos (const ACE_QoS &ace_qos) +{ + this->qos_ = ace_qos; +} + +// Get the GQoS session id. +ACE_INLINE int +ACE_GQoS_Session::session_id (void) const +{ + return this->session_id_; +} + +// Set the GQoS session id. +ACE_INLINE void +ACE_GQoS_Session::session_id (const int session_id) +{ + this->session_id_ = session_id; +} + +// Get the destination address for this GQoS session. +ACE_INLINE ACE_INET_Addr +ACE_GQoS_Session::dest_addr (void) const +{ + return this->dest_addr_; +} + +// Set the destination address for this GQoS session. +ACE_INLINE void +ACE_GQoS_Session::dest_addr (const ACE_INET_Addr &dest_addr) +{ + this->dest_addr_ = dest_addr; +} + +// GQoS version. +ACE_INLINE int +ACE_GQoS_Session::version (void) +{ + return 0; +} diff --git a/ace/SOCK.cpp b/ace/SOCK.cpp index 10e4bde796b..cf38eda6887 100644 --- a/ace/SOCK.cpp +++ b/ace/SOCK.cpp @@ -106,6 +106,28 @@ ACE_SOCK::open (int type, return 0; } +// Adds the given session to the list of session objects +// joined by this socket. + +int +ACE_SOCK::join_qos_session (ACE_QoS_Session *qos_session) +{ + + if (this->qos_session_set ().insert (qos_session) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error in adding a new session to the " + "socket session set\n"), + -1); + return 0; +} + +// Returns the QoS session set for this socket. +ACE_Unbounded_Set <ACE_QoS_Session *> +ACE_SOCK::qos_session_set (void) +{ + return this->qos_session_set_; +} + // General purpose constructor for performing server ACE_SOCK // creation. @@ -157,7 +179,7 @@ ACE_SOCK::open (int type, else return 0; } - + ACE_SOCK::ACE_SOCK (int type, int protocol_family, int protocol, @@ -179,3 +201,8 @@ ACE_SOCK::ACE_SOCK (int type, ASYS_TEXT ("ACE_SOCK::ACE_SOCK"))); } +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Unbounded_Set<ACE_QoS_Session *>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Unbounded_Set<ACE_QoS_Session *> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/SOCK.h b/ace/SOCK.h index 576806bc256..9ce912d095c 100644 --- a/ace/SOCK.h +++ b/ace/SOCK.h @@ -25,6 +25,7 @@ #include "ace/Addr.h" #include "ace/IPC_SAP.h" +#include "ace/QoS_Session.h" class ACE_Export ACE_SOCK : public ACE_IPC_SAP { @@ -87,6 +88,16 @@ public: int reuse_addr); // Wrapper around the QoS-enabled <WSASocket> function. + int join_qos_session (ACE_QoS_Session *qos_session); + // Join the given QoS session. A socket can join multiple QoS sessions. + // This call adds the given QoS session to the list of QoS sessions + // that the socket has already joined. + + typedef ACE_Unbounded_Set <ACE_QoS_Session *> ACE_QOS_SESSION_SET; + + ACE_QOS_SESSION_SET qos_session_set (void); + // Get the QoS session set. + protected: ACE_SOCK (int type, int protocol_family, @@ -100,7 +111,7 @@ protected: int protocol, ACE_Protocol_Info *protocolinfo, ACE_SOCK_GROUP g, - u_long flags, + u_long flags, int reuse_addr); // Constructor with arguments to call the QoS-enabled <WSASocket> // function. @@ -108,6 +119,10 @@ protected: ACE_SOCK (void); // Default constructor is private to prevent instances of this class // from being defined. + + ACE_QOS_SESSION_SET qos_session_set_; + // Set of QoS sessions that this socket has joined. + }; #if !defined (ACE_LACKS_INLINE_FUNCTIONS) @@ -115,3 +130,7 @@ protected: #endif /* ACE_LACKS_INLINE_FUNCTIONS */ #endif /* ACE_SOCK_H */ + + + + diff --git a/ace/SOCK_Dgram_Mcast.cpp b/ace/SOCK_Dgram_Mcast.cpp index de0351e1eeb..91be405541f 100644 --- a/ace/SOCK_Dgram_Mcast.cpp +++ b/ace/SOCK_Dgram_Mcast.cpp @@ -379,7 +379,8 @@ ACE_SOCK_Dgram_Mcast::subscribe (const ACE_INET_Addr &mcast_addr, int protocol, ACE_Protocol_Info *protocolinfo, ACE_SOCK_GROUP g, - u_long flags) + u_long flags, + ACE_QoS_Session *qos_session) { ACE_TRACE ("ACE_SOCK_Dgram_Mcast::subscribe"); @@ -400,15 +401,30 @@ ACE_SOCK_Dgram_Mcast::subscribe (const ACE_INET_Addr &mcast_addr, protocol_family, protocol, reuse_addr, - protocolinfo); + protocolinfo); // Check for the "short-circuit" return value of 1 (for NT). if (result != 0) return result; - + // Tell network device driver to read datagrams with a // <mcast_request_if_> IP interface. else { + // Check if the mcast_addr passed into this method is the + // same as the QoS session address. + if (mcast_addr == qos_session->dest_addr ()) + + // Subscribe to the QoS session. + if (this->join_qos_session (qos_session) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to join QoS Session\n"), + -1); + else + ACE_ERROR_RETURN ((LM_ERROR, + "Dest Addr in the QoS Session does" + " not match the address passed into" + " subscribe\n"), + -1); sockaddr_in mult_addr; @@ -423,7 +439,9 @@ ACE_SOCK_Dgram_Mcast::subscribe (const ACE_INET_Addr &mcast_addr, qos_params) == ACE_INVALID_HANDLE) return -1; else - return 0; + qos_session->qos (*(qos_params.socket_qos ())); + + return 0; } } diff --git a/ace/SOCK_Dgram_Mcast.h b/ace/SOCK_Dgram_Mcast.h index 468cbb0493f..632e015ef0d 100644 --- a/ace/SOCK_Dgram_Mcast.h +++ b/ace/SOCK_Dgram_Mcast.h @@ -74,13 +74,17 @@ public: int protocol = 0, ACE_Protocol_Info *protocolinfo = 0, ACE_SOCK_GROUP g = 0, - u_long flags = 0); + u_long flags = 0, + ACE_QoS_Session *qos_session = 0); // This is a QoS-enabled method for joining a multicast group, which // passes <qos_params> via <ACE_OS::join_leaf>. The network // interface device driver is instructed to accept datagrams with // <mcast_addr> multicast addresses. If the socket has already been // opened, <subscribe> closes the socket and opens a new socket - // bound to the <mcast_addr>. + // bound to the <mcast_addr>. The session object specifies the QoS + // session that the socket wants to subscribe to. A socket may + // subscribe to multiple QoS sessions by calling this method multiple + // times with different session objects. // // The <net_if> interface is hardware specific, e.g., use "netstat // -i" to find whether your interface is, such as "le0" or something |