summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ace/Makefile4
-rw-r--r--ace/QoS_Session.h128
-rw-r--r--ace/QoS_Session_Factory.cpp96
-rw-r--r--ace/QoS_Session_Factory.h88
-rw-r--r--ace/QoS_Session_Impl.cpp510
-rw-r--r--ace/QoS_Session_Impl.h191
-rw-r--r--ace/QoS_Session_Impl.i120
-rw-r--r--ace/SOCK.cpp29
-rw-r--r--ace/SOCK.h21
-rw-r--r--ace/SOCK_Dgram_Mcast.cpp26
-rw-r--r--ace/SOCK_Dgram_Mcast.h8
11 files changed, 1212 insertions, 9 deletions
diff --git a/ace/Makefile b/ace/Makefile
index 5b2dafb47ec..1f7054104ce 100644
--- a/ace/Makefile
+++ b/ace/Makefile
@@ -182,7 +182,9 @@ OTHER_FILES = \
Name_Space \
Naming_Context \
Registry_Name_Space \
- Remote_Name_Space
+ Remote_Name_Space \
+ QoS_Session_Impl \
+ QoS_Session_Factory
TEMPLATE_FILES = \
Acceptor \
diff --git a/ace/QoS_Session.h b/ace/QoS_Session.h
new file mode 100644
index 00000000000..53545809117
--- /dev/null
+++ b/ace/QoS_Session.h
@@ -0,0 +1,128 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ===========================================================================
+//
+// = LIBRARY
+// ace
+//
+// = FILENAME
+// QoS_Session.h
+//
+// = AUTHOR
+// Vishal Kachroo <vishal@cs.wustl.edu>
+//
+// ===========================================================================
+
+#ifndef ACE_QOS_SESSION_H
+#define ACE_QOS_SESSION_H
+
+#include "ace/INET_Addr.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_SOCK;
+#include "ace/Containers_T.h"
+
+typedef int ACE_Protocol_ID;
+// IPPROTO_UDP or IPPROTO_TCP.
+
+class ACE_Export ACE_QoS_Session
+{
+ // = TITLE
+ // A QoS Session object.
+ //
+ // = DESCRIPTION
+ // This class defines the interface for a QoS Session. It abstracts the
+ // notion of QoS on different platforms and presents a simple, easy-to-use
+ // API. Current [RAPI,GQoS] and future implementations will conform to this
+ // interface.
+
+public:
+
+ enum ACE_End_Point_Type
+ {
+ ACE_QOS_SENDER,
+ ACE_QOS_RECEIVER,
+ ACE_QOS_BOTH
+ };
+ // A flag to indicate if this endpoint is a sender or a receiver or both.
+
+ virtual ~ACE_QoS_Session (void);
+ // to shutup g++.
+
+ virtual int open (ACE_INET_Addr dest_addr,
+ ACE_Protocol_ID protocol_id) = 0;
+ // Open a QoS session [dest IP, dest port, Protocol ID].
+
+ virtual int close (void) = 0;
+ // Close the QoS Session.
+
+ virtual ACE_QoS qos (void) const = 0;
+ // Returns the QoS in the current session.
+
+ virtual int qos (ACE_SOCK *socket,
+ const ACE_QoS &ace_qos) = 0;
+ // Set QoS for the current session. The socket parameter is used to confirm if
+ // this QoS session was subscribed to by the socket.
+
+ virtual void qos (const ACE_QoS &ace_qos) = 0;
+ // Sets the QoS for this session object to ace_qos. Does not interfere with the
+ // QoS in the underlying socket. This call is useful to update the QoS object
+ // when the underlying socket QoS is being set through a mechanism other than
+ // the previous qos () method e.g. inside the dgram_mcast.subscribe () where the
+ // QoS for the socket is set through ACE_OS::join_leaf ().
+
+ virtual int update_qos (void) = 0;
+ // This is called from handle_qos () method of the the QoS Event Handler.
+ // Invoking this method is an indication of a QoS event occurring, that may have
+ // resulted in a change of QoS for the underlying session. This method updates
+ // the QoS object associated with this session.
+
+ virtual int session_id (void) const = 0;
+ // Get the session id.
+
+ virtual void session_id (const int session_id) = 0;
+ // Set the session id.
+
+ virtual ACE_INET_Addr dest_addr (void) const = 0;
+ // Get the destination address for this session.
+
+ virtual void dest_addr (const ACE_INET_Addr &dest_addr) = 0;
+ // Set the destination address for this session.
+
+ virtual int version (void) = 0;
+ // Returns the version of the underlying RSVP implementation. Is
+ // meaningful only when the underlying implementation has versioning.
+
+protected:
+
+ int session_id_;
+ // session id for the session.
+
+ ACE_INET_Addr dest_addr_;
+ // Destination address for this session.
+
+ ACE_Protocol_ID protocol_id_;
+ // Is this a TCP or a UDP session.
+
+ ACE_QoS qos_;
+ // QoS for this session.
+
+ ACE_End_Point_Type flags_;
+ // Specifies if this is a sending/receiving/both session.
+
+};
+
+#endif /* ACE_QOS_SESSION_H */
+
+
+
+
+
+
+
+
+
diff --git a/ace/QoS_Session_Factory.cpp b/ace/QoS_Session_Factory.cpp
new file mode 100644
index 00000000000..1e0e892bbd0
--- /dev/null
+++ b/ace/QoS_Session_Factory.cpp
@@ -0,0 +1,96 @@
+// QoS_Session_Factory.cpp
+// $Id$
+
+#define ACE_BUILD_DLL
+
+#include "ace/QoS_Session_Factory.h"
+#include "ace/QoS_Session_Impl.h"
+
+ACE_RCSID(ace, QoS_Session_Factory, "$Id$")
+
+ACE_ALLOC_HOOK_DEFINE(ACE_QoS_Session_Factory)
+
+ACE_QoS_Session_Factory::ACE_QoS_Session_Factory (void)
+{
+ ACE_TRACE ("ACE_QoS_Session_Factory::ACE_QoS_Session_Factory");
+}
+
+ACE_QoS_Session_Factory::~ACE_QoS_Session_Factory (void)
+{
+ ACE_TRACE ("ACE_QoS_Session_Factory::~ACE_QoS_Session_Factory");
+}
+
+// Create a QoS session of the given type (RAPI or GQoS).
+ACE_QoS_Session *
+ACE_QoS_Session_Factory::create_session (ACE_QoS_Session_Type qos_session_type)
+{
+
+ ACE_QoS_Session * qos_session = 0;
+
+#if defined (ACE_HAS_RAPI)
+ if (qos_session_type == ACE_RAPI_SESSION)
+ ACE_NEW_RETURN (qos_session,
+ ACE_RAPI_Session,
+ 0);
+#endif /* ACE_HAS_RAPI */
+
+ if (qos_session_type == ACE_GQOS_SESSION)
+ ACE_NEW_RETURN (qos_session,
+ ACE_GQoS_Session,
+ 0);
+
+ if (this->add_session (qos_session) == -1)
+ {
+ delete qos_session;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in adding session\n"),
+ 0);
+ }
+
+ return qos_session;
+}
+
+// Destroy the QoS Session.
+int
+ACE_QoS_Session_Factory::destroy_session (ACE_QoS_Session *qos_session)
+{
+
+ if ((qos_session != 0) && (this->remove_session (qos_session) == -1))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in destroying session\n"),
+ -1);
+
+ return 0;
+}
+
+// Add a session to the set of sessions created by this factory. This is a
+// private method called by the create_session ().
+int
+ACE_QoS_Session_Factory::add_session (ACE_QoS_Session *qos_session)
+{
+ if (this->qos_session_set_.insert (qos_session) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in adding a new session to the session set\n"),
+ -1);
+
+ return 0;
+}
+
+// Remove a session from the set of sessions created by this factory. This is a
+// private method called by the destroy_session ().
+int
+ACE_QoS_Session_Factory::remove_session (ACE_QoS_Session *qos_session)
+{
+ if (this->qos_session_set_.remove (qos_session) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in removing a session from the session set\n"),
+ -1);
+
+ return 0;
+}
+
+
+
+
+
+
diff --git a/ace/QoS_Session_Factory.h b/ace/QoS_Session_Factory.h
new file mode 100644
index 00000000000..6d4f2d20044
--- /dev/null
+++ b/ace/QoS_Session_Factory.h
@@ -0,0 +1,88 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ===========================================================================
+//
+// = LIBRARY
+// ace
+//
+// = FILENAME
+// QoS_Session_Factory.h
+//
+// = AUTHOR
+// Vishal Kachroo <vishal@cs.wustl.edu>
+//
+// ===========================================================================
+
+#ifndef ACE_QOS_SESSION_FACTORY_H
+#define ACE_QOS_SESSION_FACTORY_H
+
+#include "ace/QoS_Session.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Containers_T.h"
+
+class ACE_QoS_Session;
+// Forward declare this, so the factory uses only references to this.
+
+class ACE_Export ACE_QoS_Session_Factory
+{
+ // = TITLE
+ // Concrete factory for the QoS Session objects.
+ //
+ // = DESCRIPTION
+ // This class manages the life cycle of QoS Session objects. These
+ // objects are currently either RAPI session objects or GQoS session
+ // objects. It stores the sessions in an unbounded set.
+
+public :
+
+ // = Initialization and termination methods.
+ ACE_QoS_Session_Factory (void);
+ // Default constructor.
+
+ ~ACE_QoS_Session_Factory (void);
+ // Default destructor.
+
+ enum ACE_QoS_Session_Type
+ {
+ ACE_RAPI_SESSION,
+ ACE_GQOS_SESSION
+ };
+ // Types of sessions for this factory to manage.
+
+ ACE_QoS_Session * create_session (ACE_QoS_Session_Type qos_session_type);
+ // Create a QoS session of the given type (RAPI or GQoS).
+
+ int destroy_session (ACE_QoS_Session *qos_session);
+ // Destroy the QoS Session.
+
+private:
+
+ int add_session (ACE_QoS_Session *qos_session);
+ // Used by the create_session () to add new sessions to the
+ // set of sessions created by this factory.
+
+ int remove_session (ACE_QoS_Session *qos_session);
+ // Used by the destroy_session () to remove a session from the set
+ // of sessions created by this factory.
+
+ typedef ACE_Unbounded_Set <ACE_QoS_Session *> QOS_SESSION_SET;
+ QOS_SESSION_SET qos_session_set_;
+ // Unordered set of QoS Sessions.
+
+};
+
+#endif /* ACE_QOS_SESSION_FACTORY_H */
+
+
+
+
+
+
+
+
+
diff --git a/ace/QoS_Session_Impl.cpp b/ace/QoS_Session_Impl.cpp
new file mode 100644
index 00000000000..52286687b60
--- /dev/null
+++ b/ace/QoS_Session_Impl.cpp
@@ -0,0 +1,510 @@
+// QoS_Session_Impl.cpp
+// $Id$
+
+#define ACE_BUILD_DLL
+
+#include "ace/QoS_Session_Impl.h"
+#include "ace/SOCK.h"
+
+#if defined (ACE_LACKS_INLINE_FUNCTIONS)
+#include "ace/QoS_Session_Impl.i"
+#endif
+
+ACE_RCSID(ace, QoS_Session_Impl, "$Id$")
+
+ACE_ALLOC_HOOK_DEFINE(ACE_QoS_Session_Impl)
+
+#if defined (ACE_HAS_RAPI)
+#include "rapi/rapi_err.h"
+
+int ACE_RAPI_Session::rsvp_error = 0;
+
+// Call back function used by RAPI to report RSVP events. This function translates
+// the RAPI QoS parameters into the more generic ACE_QoS parameters for the
+// underlying RAPI session.
+int
+rsvp_callback (rapi_sid_t sid,
+ rapi_eventinfo_t eventype,
+ int style_id,
+ int errcode,
+ int errvalue,
+ sockaddr * errnode,
+ u_char errflags,
+ int filter_spec_no,
+ rapi_filter_t *filter_spec_list,
+ int flow_spec_no,
+ rapi_flowspec_t *flow_spec_list,
+ int ad_spec_no,
+ rapi_adspec_t *ad_spec_list,
+ void *args
+ )
+{
+
+ ACE_QoS_Session * qos_session = (ACE_QoS_Session *) args;
+
+ // Extended Legacy format.
+ qos_flowspecx_t *csxp = &flow_spec_list->specbody_qosx;
+
+ ACE_Flow_Spec sending_fs (csxp->xspec_r,
+ csxp->xspec_b,
+ csxp->xspec_p,
+ 0,
+ 0,
+ 0,
+ csxp->xspec_M,
+ csxp->xspec_m,
+ 25,
+ 0);
+
+ switch(eventype)
+ {
+ case RAPI_PATH_EVENT:
+ ACE_DEBUG ((LM_DEBUG,
+ "RSVP PATH Event received\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "No. of TSpecs received : %d\n",
+ flow_spec_no));
+
+ // Set the sending flowspec QoS of the given session.
+ qos_session->qos ().sending_flowspec (sending_fs);
+
+ break;
+
+ case RAPI_RESV_EVENT:
+ ACE_DEBUG ((LM_DEBUG,
+ "RSVP RESV Event received\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "No. of FlowSpecs received : %d\n",
+ flow_spec_no));
+
+ // Choose based on the service type : [QOS_GUARANTEEDX/QOS_CNTR_LOAD].
+ switch (csxp->spec_type)
+ {
+ case QOS_GUARANTEEDX:
+ // Slack term in MICROSECONDS
+ qos_session->qos ().receiving_flowspec ().delay_variation (csxp->xspec_S);
+
+ // @@How does the guaranteed rate parameter map to the ACE_Flow_Spec.
+ // Note there is no break !!
+
+ case QOS_CNTR_LOAD:
+
+ // qos_service_type.
+ qos_session->qos ().receiving_flowspec ().service_type (csxp->spec_type);
+ // Token Bucket Average Rate (B/s)
+ qos_session->qos ().receiving_flowspec ().token_rate (csxp->xspec_r);
+ // Token Bucket Rate (B)
+ qos_session->qos ().receiving_flowspec ().token_bucket_size (csxp->xspec_b);
+ // Peak Data Rate (B/s)
+ qos_session->qos ().receiving_flowspec ().peak_bandwidth (csxp->xspec_p);
+ // Minimum Policed Unit (B)
+ qos_session->qos ().receiving_flowspec ().minimum_policed_size (csxp->xspec_m);
+ // Max Packet Size (B)
+ qos_session->qos ().receiving_flowspec ().max_sdu_size (csxp->xspec_M);
+
+ break;
+
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unknown flowspec type.\n"),
+ 0);
+ };
+
+ break;
+
+ case RAPI_PATH_ERROR:
+ ACE_DEBUG ((LM_DEBUG,
+ "PATH ERROR Event received\n"
+ "Code=%d Val=%d Node= %s\n",
+ errcode,
+ errvalue,
+ ACE_OS::inet_ntoa(((sockaddr_in *)errnode)->sin_addr)));
+
+ break;
+
+ case RAPI_RESV_ERROR:
+ ACE_DEBUG ((LM_DEBUG,
+ "RESV ERROR Event received\n"
+ "Code=%d Val=%d Node= %s\n",
+ errcode,
+ errvalue,
+ ACE_OS::inet_ntoa(((sockaddr_in *)errnode)->sin_addr)));
+ break;
+
+ case RAPI_RESV_CONFIRM:
+ ACE_DEBUG ((LM_DEBUG,
+ "RESV CONFIRM Event received\n"));
+ break;
+
+ }
+
+}
+
+// Constructor.
+ACE_RAPI_Session::ACE_RAPI_Session (void)
+{
+ ACE_TRACE ("ACE_RAPI_Session::ACE_RAPI_Session");
+}
+
+// Open a RAPI QoS session [dest IP, dest port, Protocol ID].
+int
+ACE_RAPI_Session::open (ACE_INET_Addr dest_addr,
+ ACE_Protocol_ID protocol_id)
+{
+ this->dest_addr_ = dest_addr;
+ this->protocol_id_ = protocol_id;
+
+ // Open a RAPI session. Note "this" is being passed as an argument to
+ // the callback function. The callback function uses this argument to
+ // update the QoS of this session based on the RSVP event it receives.
+ if (this->session_id_ = rapi_session((sockaddr *) dest_addr.get_addr (),
+ protocol_id,
+ 0,
+ rsvp_callback,
+ (void *) this,
+ &rsvp_error) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "rapi_session () call fails. Error\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "rapi_session () call succeeds\n"));
+
+ return 0;
+}
+
+// Close the RAPI QoS Session.
+int
+ACE_RAPI_Session::close (void)
+{
+ if (rsvp_error = rapi_release(this->session_id_))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Can't release RSVP session:\n\t%s\n",
+ rapi_errlist[rsvp_error]),
+ -1);
+ return 0;
+}
+
+int
+ACE_RAPI_Session::qos (ACE_SOCK *socket,
+ const ACE_QoS &ace_qos)
+{
+ ACE_UNUSED_ARG (socket);
+
+ // If sender : call sending_qos ()
+ // If receiver : call receiving_qos ()
+ // If both : call sending_qos () and receiving_qos ()
+
+ if (this->flags_ != ACE_QOS_RECEIVER)
+ return this->sending_qos (ace_qos);
+
+ if (this->flags_ != ACE_QOS_SENDER)
+ return this->receiving_qos (ace_qos);
+
+ return 0;
+}
+
+// Set sending QoS for this RAPI session.
+int
+ACE_RAPI_Session::sending_qos (const ACE_QoS &ace_qos)
+{
+
+ ACE_Flow_Spec sending_flowspec = ace_qos.sending_flowspec ();
+ rapi_tspec_t *t_spec = init_tspec_simplified (sending_flowspec);
+
+ if (t_spec == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in translating from ACE Flow Spec to"
+ " RAPI TSpec\n"),
+ -1);
+
+ char buffer[BUFSIZ];
+
+ // This formats the t_spec in a visually intuitive char * that can
+ // be printed.
+
+ (void) rapi_fmt_tspec(t_spec, buffer, sizeof(buffer));
+ ACE_DEBUG ((LM_DEBUG,
+ "\nSender TSpec : %s\n",
+ buffer));
+
+ // Print out all the fields separately.
+ ACE_DEBUG ((LM_DEBUG,
+ "\nTSpec :\n"
+ "\t Spec Type = %d\n"
+ "\t Rate = %f\n"
+ "\t Bucket = %f\n"
+ "\t Peak = %f\n"
+ "\t MPU = %d\n"
+ "\t MDU = %d\n"
+ "\t\t TTL = %d\n",
+ t_spec->tspecbody_qosx.spec_type,
+ t_spec->tspecbody_qosx.xtspec_r,
+ t_spec->tspecbody_qosx.xtspec_b,
+ t_spec->tspecbody_qosx.xtspec_p,
+ t_spec->tspecbody_qosx.xtspec_m,
+ t_spec->tspecbody_qosx.xtspec_M,
+ sending_flowspec.ttl ()));
+
+ // @@Hardcoded port. This should be changed later.
+ ACE_INET_Addr sender_addr (8001);
+
+ // Set the Sender TSpec for this QoS session.
+ if(rapi_sender(this->session_id_,
+ 0,
+ (sockaddr *) sender_addr.get_addr (),
+ NULL,
+ t_spec,
+ NULL,
+ NULL,
+ sending_flowspec.ttl ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "rapi_sender error:\n\tPATH Generation can't be started\n"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "rapi_sender () call succeeds ! \n"));
+ return 0;
+}
+
+// Set receiving QoS for this RAPI session.
+int
+ACE_RAPI_Session::receiving_qos (const ACE_QoS &ace_qos)
+{
+
+ ACE_Flow_Spec receiving_flowspec = ace_qos.receiving_flowspec ();
+ rapi_flowspec_t *flow_spec = init_flowspec_simplified (receiving_flowspec);
+
+ if (flow_spec == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in translating from ACE Flow Spec to"
+ " RAPI FlowSpec\n"),
+ -1);
+
+ char buffer[BUFSIZ];
+
+ // This formats the flow_spec in a visually intuitive char * that can
+ // be printed.
+ (void)rapi_fmt_flowspec(flow_spec, buffer, sizeof(buffer));
+ ACE_DEBUG ((LM_DEBUG,
+ "\nReceiver FlowSpec : %s\n",
+ buffer));
+
+ // Print out all the fields separately.
+ ACE_DEBUG ((LM_DEBUG,
+ "\nFlowSpec :\n"
+ "\t Spec Type = %d\n"
+ "\t Rate = %f\n"
+ "\t Bucket = %f\n"
+ "\t Peak = %f\n"
+ "\t MPU = %d\n"
+ "\t MDU = %d\n",
+ flow_spec->specbody_qosx.spec_type,
+ flow_spec->specbody_qosx.xspec_r,
+ flow_spec->specbody_qosx.xspec_b,
+ flow_spec->specbody_qosx.xspec_p,
+ flow_spec->specbody_qosx.xspec_m,
+ flow_spec->specbody_qosx.xspec_M));
+
+ // @@Hardcoded port. This should be changed later.
+ ACE_INET_Addr receiver_addr (8002);
+
+ // Set the Receiver FlowSpec for this QoS session.
+ // @@The filter style is hardcoded to WildCard. This can be changed later.
+ if (rapi_reserve(this->session_id_,
+ RAPI_REQ_CONFIRM,
+ // Setting the RAPI_REQ_CONFIRM flag requests confirmation
+ // of the resevation, by means of a confirmation upcall of
+ // type RAPI_RESV_CONFIRM.
+ (sockaddr *)receiver_addr.get_addr (),
+ RAPI_RSTYLE_WILDCARD,
+ // This applies the flowspec to all the senders. Given this,
+ // @@I am passing the filter_spec to be null, hoping this will work.
+ NULL,
+ NULL,
+ 0,
+ NULL,
+ // The filter spec is NULL. This should work since the RSTYLE is
+ // WILDCARD.
+ 1,
+ flow_spec) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "rapi_reserve () error:\n\tRESV Generation can't be started\n"),
+ -1);
+
+ return 0;
+}
+
+int
+ACE_RAPI_Session::update_qos (void)
+{
+ if ((rsvp_error = rapi_dispatch ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in rapi_dispatch () : %s\n",
+ rapi_errlist[rsvp_error]),
+ -1);
+ return 0;
+}
+
+// Construct a simplified RAPI Sender TSpec object
+// from an ACE_Flow_Spec. Note the form of the TSpec is
+// simplified as against the full bodied IntServ version.
+
+rapi_tspec_t *
+ACE_RAPI_Session::init_tspec_simplified (const ACE_Flow_Spec &flow_spec)
+{
+ rapi_tspec_t *t_spec;
+
+ ACE_NEW_RETURN (t_spec,
+ rapi_tspec_t,
+ 0);
+
+ qos_tspecx_t *ctxp = &(t_spec->tspecbody_qosx);
+
+ // There may be some type incompatibility here.
+ // Note the types of the LHS are float32_t, uint32_t etc.
+
+ ctxp->spec_type = QOS_TSPEC;
+ ctxp->xtspec_r = flow_spec.token_rate (); // Token Rate (B/s)
+ ctxp->xtspec_b = flow_spec.token_bucket_size (); // Token Bucket Depth (B)
+ ctxp->xtspec_p = flow_spec.peak_bandwidth (); // Peak Data Rate (B/s)
+ ctxp->xtspec_m = flow_spec.minimum_policed_size (); // Minimum policed unit.
+
+ // @@Hardcoded for the time being.
+ ctxp->xtspec_M = 65535; // Maximum SDU size.
+
+ t_spec->len = sizeof(rapi_hdr_t) + sizeof(qos_tspecx_t);
+ t_spec->form = RAPI_TSPECTYPE_Simplified;
+
+ return (t_spec);
+}
+
+
+// Construct a simplified RAPI flowspec object from ACE_Flow_Spec.
+// Note the form of the FlowSpec is simplified as against the
+// full bodied IntServ version.
+
+rapi_flowspec_t *
+ACE_RAPI_Session::init_flowspec_simplified(const ACE_Flow_Spec &flow_spec)
+{
+ rapi_flowspec_t *flowsp;
+ ACE_NEW_RETURN (flowsp,
+ rapi_flowspec_t,
+ 0);
+
+ // Extended Legacy format.
+ qos_flowspecx_t *csxp = &flowsp->specbody_qosx;
+
+ // Choose based on the service type : [QOS_GUARANTEEDX/QOS_CNTR_LOAD].
+ switch (flow_spec.service_type ())
+ {
+ case QOS_GUARANTEEDX:
+ csxp->xspec_R = 0 ; // Guaranteed Rate B/s. @@How does this map to the
+ // ACE Flow Spec Parameters.
+
+ csxp->xspec_S = flow_spec.delay_variation () ; // Slack term in MICROSECONDS
+
+ // Note there is no break !!
+
+ case QOS_CNTR_LOAD:
+ csxp->spec_type = flow_spec.service_type (); // qos_service_type
+ csxp->xspec_r = flow_spec.token_rate (); // Token Bucket Average Rate (B/s)
+ csxp->xspec_b = flow_spec.token_bucket_size (); // Token Bucket Rate (B)
+ csxp->xspec_p = flow_spec.peak_bandwidth (); // Peak Data Rate (B/s)
+ csxp->xspec_m = flow_spec.minimum_policed_size (); // Minimum Policed Unit (B)
+
+ // @@Hardcoded Max. Pkt. size.
+ csxp->xspec_M = 65535; // Max Packet Size (B)
+
+ flowsp->form = RAPI_FLOWSTYPE_Simplified;
+ break;
+
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unknown flowspec type.\n"),
+ 0);
+ }
+
+ flowsp->len = sizeof(rapi_flowspec_t);
+ return flowsp;
+}
+
+#endif /* ACE_HAS_RAPI */
+
+// This is a GQoS session ID generator.
+int ACE_GQoS_Session::GQoS_session_id = 0;
+
+// Constructor.
+ACE_GQoS_Session::ACE_GQoS_Session (void)
+{
+ ACE_TRACE ("ACE_GQoS_Session::ACE_GQoS_Session");
+}
+
+// Open a GQoS session [dest IP, dest port, Protocol ID].
+int
+ACE_GQoS_Session::open (ACE_INET_Addr dest_addr,
+ ACE_Protocol_ID protocol_id)
+{
+ this->dest_addr_ = dest_addr;
+ this->protocol_id_ = protocol_id;
+
+ this->session_id_ = GQoS_session_id++;
+
+ return 0;
+}
+
+// Close the GQoS Session.
+int
+ACE_GQoS_Session::close (void)
+{
+ // TBD.
+ return 0;
+}
+
+// Set the QoS for this GQoS session.
+int
+ACE_GQoS_Session::qos (ACE_SOCK *socket,
+ const ACE_QoS &ace_qos)
+{
+
+ // Confirm if the current session is one of the QoS sessions subscribed
+ // to by the given socket.
+
+ if (socket->qos_session_set ().find (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "This QoS session was not subscribed to"
+ " by the socket\n"),
+ -1);
+
+ // Set the QOS according to the supplied ACE_QoS. The I/O control
+ // code used under the hood is SIO_SET_QOS.
+
+ u_long ret_bytes = 0;
+
+ ACE_QoS qos = ace_qos;
+ if (ACE_OS::ioctl (socket->get_handle (),
+ ACE_SIO_SET_QOS,
+ qos,
+ &ret_bytes) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in Qos set ACE_OS::ioctl() %d\n",
+ ret_bytes),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Setting QoS with ACE_OS::ioctl () succeeds \n"));
+
+ return 0;
+}
+
+int
+ACE_GQoS_Session::update_qos (void)
+{
+ return 0;
+}
+
+
+
+
diff --git a/ace/QoS_Session_Impl.h b/ace/QoS_Session_Impl.h
new file mode 100644
index 00000000000..80da7b4e6d4
--- /dev/null
+++ b/ace/QoS_Session_Impl.h
@@ -0,0 +1,191 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ===========================================================================
+//
+// = LIBRARY
+// ace
+//
+// = FILENAME
+// QoS_Session_Impl.h
+//
+// = AUTHOR
+// Vishal Kachroo <vishal@cs.wustl.edu>
+//
+// ===========================================================================
+
+#ifndef ACE_QOS_SESSION_IMPL_H
+#define ACE_QOS_SESSION_IMPL_H
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/QoS_Session.h"
+
+#if defined (ACE_HAS_RAPI)
+#include "rapi/rapi_lib.h"
+
+class ACE_Export ACE_RAPI_Session : public ACE_QoS_Session
+{
+ // = TITLE
+ // A RAPI QoS session object.
+ //
+ // = DESCRIPTION
+ // This class is a RAPI (RSVP API, an implementation of RSVP on UNIX)
+ // implementation of the ACE_QoS_Session interface.
+
+public:
+
+ ~ACE_RAPI_Session (void);
+ // Default destructor.
+
+ static int rsvp_error;
+ // Error handling for RSVP callback
+
+ virtual int open (ACE_INET_Addr dest_addr,
+ ACE_Protocol_ID protocol_id);
+ // Open a RAPI QoS session [dest IP, dest port, Protocol ID].
+
+ virtual int close (void);
+ // Close the RAPI QoS Session.
+
+ virtual ACE_QoS qos (void) const;
+ // Returns the QoS for this RAPI session.
+
+ virtual int qos (ACE_SOCK *socket,
+ const ACE_QoS &ace_qos);
+ // Set QoS for this RAPI session. The socket parameter is used to confirm if
+ // this QoS session was subscribed to by the socket.
+
+ virtual void qos (const ACE_QoS &ace_qos);
+ // Sets the QoS for this session object to ace_qos. Does not interfere with the
+ // QoS in the underlying socket. This call is useful to update the QoS object
+ // when the underlying socket QoS is being set through a mechanism other than
+ // the previous qos () method e.g. inside the dgram_mcast.subscribe () where the
+ // QoS for the socket is set through ACE_OS::join_leaf ().
+
+ virtual int update_qos (void);
+ // Calls rapi_dispatch () that further triggers the call back function.
+ // It is a mechanism of updating the QoS for this session asynchronously, as
+ // RSVP events occur.
+
+ virtual int session_id (void) const;
+ // Get the RAPI session id.
+
+ virtual void session_id (const int session_id);
+ // Set the RAPI session id.
+
+ virtual ACE_INET_Addr dest_addr (void) const;
+ // Get the destination address for this RAPI session.
+
+ virtual void dest_addr (const ACE_INET_Addr &dest_addr);
+ // Set the destination address for this RAPI session.
+
+ virtual int version ();
+ // RAPI version. Returned value = 100 * major-version + minor-version.
+
+ friend class ACE_QoS_Session_Factory;
+ // The factory is a friend so it can create this object through
+ // the only private constructor.
+
+private:
+
+ ACE_RAPI_Session (void);
+ // Default constuctor. Constructor is defined private so that only
+ // the friend factory can instantiate this class.
+
+ rapi_tspec_t *init_tspec_simplified (const ACE_Flow_Spec &flow_spec);
+ // Construct a simplified RAPI Sender TSpec object
+ // from an ACE_Flow_Spec object. Used internally by this class.
+
+ rapi_flowspec_t *init_flowspec_simplified(const ACE_Flow_Spec &flow_spec);
+ // Construct a simplified RAPI Receiver FlowSpec object
+ // from an ACE_Flow_Spec object. Used internally by the class.
+
+ int sending_qos (const ACE_QoS &ace_qos);
+ // Set sending QoS for this RAPI session.
+
+ int receiving_qos (const ACE_QoS &ace_qos);
+ // Set receiving QoS for this RAPI session.
+
+};
+
+#endif /* ACE_HAS_RAPI */
+
+class ACE_Export ACE_GQoS_Session : public ACE_QoS_Session
+{
+ // = TITLE
+ // A GQoS session object.
+ //
+ // = DESCRIPTION
+ // This class is a GQoS (Generic QoS, an implementation of RSVP on
+ // Win2K) implementation of the ACE_QoS_Session interface.
+
+public:
+
+ ~ACE_GQoS_Session (void);
+ // Default destructor.
+
+ static int GQoS_session_id;
+ // This is a session ID generator. It does a lot more than expected
+ // from an int!.
+
+ virtual int open (ACE_INET_Addr dest_addr,
+ ACE_Protocol_ID protocol_id);
+ // Open a GQoS session [dest IP, dest port, Protocol ID].
+
+ virtual int close (void);
+ // Close the GQoS Session.
+
+ virtual ACE_QoS qos (void) const;
+ // Returns the QoS for this GQoS session.
+
+ virtual int qos (ACE_SOCK *socket,
+ const ACE_QoS &ace_qos);
+ // Set QoS for this GQoS session. The socket parameter is used to confirm if
+ // this QoS session was subscribed to by the socket.
+
+ virtual void qos (const ACE_QoS &ace_qos);
+ // Sets the QoS for this session object to ace_qos. Does not interfere with the
+ // QoS in the underlying socket. This call is useful to update the QoS object
+ // when the underlying socket QoS is being set through a mechanism other than
+ // the previous qos () method e.g. inside the dgram_mcast.subscribe () where the
+ // QoS for the socket is set through ACE_OS::join_leaf ().
+
+ virtual int update_qos (void);
+ // Calls the ioctl (ACE_SIO_GET_QOS). It is a mechanism of updating the
+ // QoS for this session asynchronously, as RSVP events occur.
+
+ virtual ACE_INET_Addr dest_addr (void) const;
+ // Get the destination address for this GQoS session.
+
+ virtual void dest_addr (const ACE_INET_Addr &dest_addr);
+ // Set the destination address for this GQoS session.
+
+ virtual int session_id (void) const;
+ // Get the GQoS session id.
+
+ virtual void session_id (const int session_id);
+ // Set the GQoS session id.
+
+ virtual int version ();
+ // GQoS version.
+
+ friend class ACE_QoS_Session_Factory;
+ // The factory is a friend so it can create this object through
+ // the only private constructor.
+
+private:
+
+ ACE_GQoS_Session (void);
+ // Default constructor. Constructor is defined private so that only
+ // the friend factory can instantiate this class.
+
+};
+
+#if !defined (ACE_LACKS_INLINE_FUNCTIONS)
+#include "ace/QoS_Session_Impl.i"
+#endif /* ACE_LACKS_INLINE_FUNCTIONS */
+
+#endif /*ACE_QOS_SESSION_IMPL_H */
diff --git a/ace/QoS_Session_Impl.i b/ace/QoS_Session_Impl.i
new file mode 100644
index 00000000000..2f696092013
--- /dev/null
+++ b/ace/QoS_Session_Impl.i
@@ -0,0 +1,120 @@
+/* -*- C++ -*- */
+// $Id$
+
+// QoS_Session_Impl.i
+
+#if defined (ACE_HAS_RAPI)
+
+ACE_INLINE
+ACE_RAPI_Session::~ACE_RAPI_Session (void)
+{
+ ACE_TRACE ("ACE_RAPI_Session::~ACE_RAPI_Session");
+}
+
+// Returns the QoS for this RAPI session.
+ACE_INLINE ACE_QoS
+ACE_RAPI_Session::qos (void) const
+{
+ return this->qos_;
+}
+
+// Overloaded method to set the QoS for this session object. Does not
+// interfere with the underlying socket QoS.
+ACE_INLINE void
+ACE_RAPI_Session::qos (const ACE_QoS &ace_qos)
+{
+ this->qos_ = ace_qos;
+}
+
+// Get the RAPI session id.
+ACE_INLINE int
+ACE_RAPI_Session::session_id (void) const
+{
+ return this->session_id_;
+}
+
+// Set the RAPI session id.
+ACE_INLINE void
+ACE_RAPI_Session::session_id (const int session_id)
+{
+ this->session_id_ = session_id;
+}
+
+// Get the destination address for this RAPI session.
+ACE_INLINE ACE_INET_Addr
+ACE_RAPI_Session::dest_addr (void) const
+{
+ return this->dest_addr_;
+}
+
+// Set the destination address for this RAPI session.
+ACE_INLINE void
+ACE_RAPI_Session::dest_addr (const ACE_INET_Addr &dest_addr)
+{
+ this->dest_addr_ = dest_addr;
+}
+
+// RAPI version. Returned value = 100 * major-version + minor-version.
+ACE_INLINE int
+ACE_RAPI_Session::version (void)
+{
+ return 0;
+}
+
+#endif /* ACE_HAS_RAPI */
+
+ACE_INLINE
+ACE_GQoS_Session::~ACE_GQoS_Session (void)
+{
+ ACE_TRACE ("ACE_GQoS_Session::~ACE_GQoS_Session");
+}
+
+// Returns the QoS for this GQoS session.
+ACE_INLINE ACE_QoS
+ACE_GQoS_Session::qos (void) const
+{
+ return this->qos_;
+}
+
+// Overloaded method to set the QoS for this session object. Does not
+// interfere with the underlying socket QoS.
+ACE_INLINE void
+ACE_GQoS_Session::qos (const ACE_QoS &ace_qos)
+{
+ this->qos_ = ace_qos;
+}
+
+// Get the GQoS session id.
+ACE_INLINE int
+ACE_GQoS_Session::session_id (void) const
+{
+ return this->session_id_;
+}
+
+// Set the GQoS session id.
+ACE_INLINE void
+ACE_GQoS_Session::session_id (const int session_id)
+{
+ this->session_id_ = session_id;
+}
+
+// Get the destination address for this GQoS session.
+ACE_INLINE ACE_INET_Addr
+ACE_GQoS_Session::dest_addr (void) const
+{
+ return this->dest_addr_;
+}
+
+// Set the destination address for this GQoS session.
+ACE_INLINE void
+ACE_GQoS_Session::dest_addr (const ACE_INET_Addr &dest_addr)
+{
+ this->dest_addr_ = dest_addr;
+}
+
+// GQoS version.
+ACE_INLINE int
+ACE_GQoS_Session::version (void)
+{
+ return 0;
+}
diff --git a/ace/SOCK.cpp b/ace/SOCK.cpp
index 10e4bde796b..cf38eda6887 100644
--- a/ace/SOCK.cpp
+++ b/ace/SOCK.cpp
@@ -106,6 +106,28 @@ ACE_SOCK::open (int type,
return 0;
}
+// Adds the given session to the list of session objects
+// joined by this socket.
+
+int
+ACE_SOCK::join_qos_session (ACE_QoS_Session *qos_session)
+{
+
+ if (this->qos_session_set ().insert (qos_session) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error in adding a new session to the "
+ "socket session set\n"),
+ -1);
+ return 0;
+}
+
+// Returns the QoS session set for this socket.
+ACE_Unbounded_Set <ACE_QoS_Session *>
+ACE_SOCK::qos_session_set (void)
+{
+ return this->qos_session_set_;
+}
+
// General purpose constructor for performing server ACE_SOCK
// creation.
@@ -157,7 +179,7 @@ ACE_SOCK::open (int type,
else
return 0;
}
-
+
ACE_SOCK::ACE_SOCK (int type,
int protocol_family,
int protocol,
@@ -179,3 +201,8 @@ ACE_SOCK::ACE_SOCK (int type,
ASYS_TEXT ("ACE_SOCK::ACE_SOCK")));
}
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Unbounded_Set<ACE_QoS_Session *>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Unbounded_Set<ACE_QoS_Session *>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/SOCK.h b/ace/SOCK.h
index 576806bc256..9ce912d095c 100644
--- a/ace/SOCK.h
+++ b/ace/SOCK.h
@@ -25,6 +25,7 @@
#include "ace/Addr.h"
#include "ace/IPC_SAP.h"
+#include "ace/QoS_Session.h"
class ACE_Export ACE_SOCK : public ACE_IPC_SAP
{
@@ -87,6 +88,16 @@ public:
int reuse_addr);
// Wrapper around the QoS-enabled <WSASocket> function.
+ int join_qos_session (ACE_QoS_Session *qos_session);
+ // Join the given QoS session. A socket can join multiple QoS sessions.
+ // This call adds the given QoS session to the list of QoS sessions
+ // that the socket has already joined.
+
+ typedef ACE_Unbounded_Set <ACE_QoS_Session *> ACE_QOS_SESSION_SET;
+
+ ACE_QOS_SESSION_SET qos_session_set (void);
+ // Get the QoS session set.
+
protected:
ACE_SOCK (int type,
int protocol_family,
@@ -100,7 +111,7 @@ protected:
int protocol,
ACE_Protocol_Info *protocolinfo,
ACE_SOCK_GROUP g,
- u_long flags,
+ u_long flags,
int reuse_addr);
// Constructor with arguments to call the QoS-enabled <WSASocket>
// function.
@@ -108,6 +119,10 @@ protected:
ACE_SOCK (void);
// Default constructor is private to prevent instances of this class
// from being defined.
+
+ ACE_QOS_SESSION_SET qos_session_set_;
+ // Set of QoS sessions that this socket has joined.
+
};
#if !defined (ACE_LACKS_INLINE_FUNCTIONS)
@@ -115,3 +130,7 @@ protected:
#endif /* ACE_LACKS_INLINE_FUNCTIONS */
#endif /* ACE_SOCK_H */
+
+
+
+
diff --git a/ace/SOCK_Dgram_Mcast.cpp b/ace/SOCK_Dgram_Mcast.cpp
index de0351e1eeb..91be405541f 100644
--- a/ace/SOCK_Dgram_Mcast.cpp
+++ b/ace/SOCK_Dgram_Mcast.cpp
@@ -379,7 +379,8 @@ ACE_SOCK_Dgram_Mcast::subscribe (const ACE_INET_Addr &mcast_addr,
int protocol,
ACE_Protocol_Info *protocolinfo,
ACE_SOCK_GROUP g,
- u_long flags)
+ u_long flags,
+ ACE_QoS_Session *qos_session)
{
ACE_TRACE ("ACE_SOCK_Dgram_Mcast::subscribe");
@@ -400,15 +401,30 @@ ACE_SOCK_Dgram_Mcast::subscribe (const ACE_INET_Addr &mcast_addr,
protocol_family,
protocol,
reuse_addr,
- protocolinfo);
+ protocolinfo);
// Check for the "short-circuit" return value of 1 (for NT).
if (result != 0)
return result;
-
+
// Tell network device driver to read datagrams with a
// <mcast_request_if_> IP interface.
else
{
+ // Check if the mcast_addr passed into this method is the
+ // same as the QoS session address.
+ if (mcast_addr == qos_session->dest_addr ())
+
+ // Subscribe to the QoS session.
+ if (this->join_qos_session (qos_session) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to join QoS Session\n"),
+ -1);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Dest Addr in the QoS Session does"
+ " not match the address passed into"
+ " subscribe\n"),
+ -1);
sockaddr_in mult_addr;
@@ -423,7 +439,9 @@ ACE_SOCK_Dgram_Mcast::subscribe (const ACE_INET_Addr &mcast_addr,
qos_params) == ACE_INVALID_HANDLE)
return -1;
else
- return 0;
+ qos_session->qos (*(qos_params.socket_qos ()));
+
+ return 0;
}
}
diff --git a/ace/SOCK_Dgram_Mcast.h b/ace/SOCK_Dgram_Mcast.h
index 468cbb0493f..632e015ef0d 100644
--- a/ace/SOCK_Dgram_Mcast.h
+++ b/ace/SOCK_Dgram_Mcast.h
@@ -74,13 +74,17 @@ public:
int protocol = 0,
ACE_Protocol_Info *protocolinfo = 0,
ACE_SOCK_GROUP g = 0,
- u_long flags = 0);
+ u_long flags = 0,
+ ACE_QoS_Session *qos_session = 0);
// This is a QoS-enabled method for joining a multicast group, which
// passes <qos_params> via <ACE_OS::join_leaf>. The network
// interface device driver is instructed to accept datagrams with
// <mcast_addr> multicast addresses. If the socket has already been
// opened, <subscribe> closes the socket and opens a new socket
- // bound to the <mcast_addr>.
+ // bound to the <mcast_addr>. The session object specifies the QoS
+ // session that the socket wants to subscribe to. A socket may
+ // subscribe to multiple QoS sessions by calling this method multiple
+ // times with different session objects.
//
// The <net_if> interface is hardware specific, e.g., use "netstat
// -i" to find whether your interface is, such as "le0" or something