diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.h | 48 |
1 files changed, 40 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index 3cdd63784d..d7a71bc3ba 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -43,8 +43,20 @@ class QueuePolicy uint32_t count; uint64_t size; bool policyExceeded; - - static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue); + + /** + * Producer flow control: when level is > flowStop*, flow control is ON. + * then level is < flowResume*, flow control is OFF. If == 0, flow control + * is not used. If both byte and msg count thresholds are set, then + * passing _either_ level may turn flow control ON, but _both_ must be + * below level before flow control will be turned OFF. + */ + uint32_t flowStopCount; + uint32_t flowResumeCount; + uint64_t flowStopSize; + uint64_t flowResumeSize; + bool flowStopped; // true = producers held in flow control + protected: uint64_t getCurrentQueueSize() const { return size; } @@ -54,10 +66,16 @@ class QueuePolicy static QPID_BROKER_EXTERN const std::string maxCountKey; static QPID_BROKER_EXTERN const std::string maxSizeKey; static QPID_BROKER_EXTERN const std::string typeKey; + static QPID_BROKER_EXTERN const std::string flowStopCountKey; + static QPID_BROKER_EXTERN const std::string flowResumeCountKey; + static QPID_BROKER_EXTERN const std::string flowStopSizeKey; + static QPID_BROKER_EXTERN const std::string flowResumeSizeKey; + + // Policy types: static QPID_BROKER_EXTERN const std::string REJECT; static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK; static QPID_BROKER_EXTERN const std::string RING; - static QPID_BROKER_EXTERN const std::string RING_STRICT; + static QPID_BROKER_EXTERN const std::string RING_STRICT; virtual ~QueuePolicy() {} QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg); @@ -68,14 +86,22 @@ class QueuePolicy virtual bool isEnqueued(const QueuedMessage&); QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings); uint32_t getMaxCount() const { return maxCount; } - uint64_t getMaxSize() const { return maxSize; } + uint64_t getMaxSize() const { return maxSize; } + uint32_t getFlowStopCount() const { return flowStopCount; } + uint32_t getFlowResumeCount() const { return flowResumeCount; } + uint64_t getFlowStopSize() const { return flowStopSize; } + uint64_t getFlowResumeSize() const { return flowResumeSize; } + bool isFlowControlActive() const { return flowStopped; } + bool monitorFlowControl() const { return flowStopCount || flowStopSize; } void encode(framing::Buffer& buffer) const; void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; virtual void getPendingDequeues(Messages& result); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); - static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); static std::string getType(const qpid::framing::FieldTable& settings); @@ -85,7 +111,9 @@ class QueuePolicy protected: const std::string name; - QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); virtual bool checkLimit(boost::intrusive_ptr<Message> msg); void enqueued(uint64_t size); @@ -96,14 +124,18 @@ class QueuePolicy class FlowToDiskPolicy : public QueuePolicy { public: - FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize); + FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); bool checkLimit(boost::intrusive_ptr<Message> msg); }; class RingQueuePolicy : public QueuePolicy { public: - RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); bool isEnqueued(const QueuedMessage&); |