diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.h')
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.h | 116 |
1 files changed, 88 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h index 4511a63b64..b2937e94c7 100644 --- a/cpp/src/qpid/broker/QueuePolicy.h +++ b/cpp/src/qpid/broker/QueuePolicy.h @@ -21,40 +21,100 @@ #ifndef _QueuePolicy_ #define _QueuePolicy_ +#include <deque> #include <iostream> +#include <memory> +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/broker/QueuedMessage.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Mutex.h" namespace qpid { - namespace broker { - class QueuePolicy - { - static const std::string maxCountKey; - static const std::string maxSizeKey; - - static uint64_t defaultMaxSize; +namespace broker { + +class QueuePolicy +{ + static uint64_t defaultMaxSize; - const uint32_t maxCount; - const uint64_t maxSize; - uint32_t count; - uint64_t size; + uint32_t maxCount; + uint64_t maxSize; + const std::string type; + uint32_t count; + uint64_t size; + bool policyExceeded; - static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); - - public: - QueuePolicy(uint32_t maxCount, uint64_t maxSize); - QueuePolicy(const qpid::framing::FieldTable& settings); - void enqueued(uint64_t size); - void dequeued(uint64_t size); - void update(qpid::framing::FieldTable& settings); - bool limitExceeded(); - uint32_t getMaxCount() const { return maxCount; } - uint64_t getMaxSize() const { return maxSize; } - - static void setDefaultMaxSize(uint64_t); - friend std::ostream& operator<<(std::ostream&, const QueuePolicy&); - }; - } -} + static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); + + public: + typedef std::deque<QueuedMessage> Messages; + static QPID_BROKER_EXTERN const std::string maxCountKey; + static QPID_BROKER_EXTERN const std::string maxSizeKey; + static QPID_BROKER_EXTERN const std::string typeKey; + static QPID_BROKER_EXTERN const std::string REJECT; + static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK; + static QPID_BROKER_EXTERN const std::string RING; + static QPID_BROKER_EXTERN const std::string RING_STRICT; + + virtual ~QueuePolicy() {} + QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg); + virtual void enqueued(const QueuedMessage&); + virtual void dequeued(const QueuedMessage&); + virtual bool isEnqueued(const QueuedMessage&); + QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings); + uint32_t getMaxCount() const { return maxCount; } + uint64_t getMaxSize() const { return maxSize; } + void encode(framing::Buffer& buffer) const; + void decode ( framing::Buffer& buffer ); + uint32_t encodedSize() const; + virtual void getPendingDequeues(Messages& result); + + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static std::string getType(const qpid::framing::FieldTable& settings); + static void setDefaultMaxSize(uint64_t); + friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, + const QueuePolicy&); + protected: + const std::string name; + + QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + + virtual bool checkLimit(boost::intrusive_ptr<Message> msg); + void enqueued(uint64_t size); + void dequeued(uint64_t size); +}; + + +class FlowToDiskPolicy : public QueuePolicy +{ + public: + FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize); + bool checkLimit(boost::intrusive_ptr<Message> msg); +}; + +class RingQueuePolicy : public QueuePolicy +{ + public: + RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + bool isEnqueued(const QueuedMessage&); + bool checkLimit(boost::intrusive_ptr<Message> msg); + void getPendingDequeues(Messages& result); + private: + Messages pendingDequeues; + Messages queue; + const bool strict; + + bool find(const QueuedMessage&, Messages&, bool remove); +}; + +}} #endif |