summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueuePolicy.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.h')
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h48
1 files changed, 40 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index 3cdd63784d..d7a71bc3ba 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -43,8 +43,20 @@ class QueuePolicy
uint32_t count;
uint64_t size;
bool policyExceeded;
-
- static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
+
+ /**
+ * Producer flow control: when level is > flowStop*, flow control is ON.
+ * then level is < flowResume*, flow control is OFF. If == 0, flow control
+ * is not used. If both byte and msg count thresholds are set, then
+ * passing _either_ level may turn flow control ON, but _both_ must be
+ * below level before flow control will be turned OFF.
+ */
+ uint32_t flowStopCount;
+ uint32_t flowResumeCount;
+ uint64_t flowStopSize;
+ uint64_t flowResumeSize;
+ bool flowStopped; // true = producers held in flow control
+
protected:
uint64_t getCurrentQueueSize() const { return size; }
@@ -54,10 +66,16 @@ class QueuePolicy
static QPID_BROKER_EXTERN const std::string maxCountKey;
static QPID_BROKER_EXTERN const std::string maxSizeKey;
static QPID_BROKER_EXTERN const std::string typeKey;
+ static QPID_BROKER_EXTERN const std::string flowStopCountKey;
+ static QPID_BROKER_EXTERN const std::string flowResumeCountKey;
+ static QPID_BROKER_EXTERN const std::string flowStopSizeKey;
+ static QPID_BROKER_EXTERN const std::string flowResumeSizeKey;
+
+ // Policy types:
static QPID_BROKER_EXTERN const std::string REJECT;
static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK;
static QPID_BROKER_EXTERN const std::string RING;
- static QPID_BROKER_EXTERN const std::string RING_STRICT;
+ static QPID_BROKER_EXTERN const std::string RING_STRICT;
virtual ~QueuePolicy() {}
QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
@@ -68,14 +86,22 @@ class QueuePolicy
virtual bool isEnqueued(const QueuedMessage&);
QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
uint32_t getMaxCount() const { return maxCount; }
- uint64_t getMaxSize() const { return maxSize; }
+ uint64_t getMaxSize() const { return maxSize; }
+ uint32_t getFlowStopCount() const { return flowStopCount; }
+ uint32_t getFlowResumeCount() const { return flowResumeCount; }
+ uint64_t getFlowStopSize() const { return flowStopSize; }
+ uint64_t getFlowResumeSize() const { return flowResumeSize; }
+ bool isFlowControlActive() const { return flowStopped; }
+ bool monitorFlowControl() const { return flowStopCount || flowStopSize; }
void encode(framing::Buffer& buffer) const;
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
virtual void getPendingDequeues(Messages& result);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
- static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT,
+ uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
+ uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
static std::string getType(const qpid::framing::FieldTable& settings);
@@ -85,7 +111,9 @@ class QueuePolicy
protected:
const std::string name;
- QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT,
+ uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
+ uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
void enqueued(uint64_t size);
@@ -96,14 +124,18 @@ class QueuePolicy
class FlowToDiskPolicy : public QueuePolicy
{
public:
- FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
+ FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize,
+ uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
+ uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
bool checkLimit(boost::intrusive_ptr<Message> msg);
};
class RingQueuePolicy : public QueuePolicy
{
public:
- RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING,
+ uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
+ uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);