summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueuePolicy.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.h')
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.h116
1 files changed, 88 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h
index 4511a63b64..b2937e94c7 100644
--- a/cpp/src/qpid/broker/QueuePolicy.h
+++ b/cpp/src/qpid/broker/QueuePolicy.h
@@ -21,40 +21,100 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
+#include <deque>
#include <iostream>
+#include <memory>
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
namespace qpid {
- namespace broker {
- class QueuePolicy
- {
- static const std::string maxCountKey;
- static const std::string maxSizeKey;
-
- static uint64_t defaultMaxSize;
+namespace broker {
+
+class QueuePolicy
+{
+ static uint64_t defaultMaxSize;
- const uint32_t maxCount;
- const uint64_t maxSize;
- uint32_t count;
- uint64_t size;
+ uint32_t maxCount;
+ uint64_t maxSize;
+ const std::string type;
+ uint32_t count;
+ uint64_t size;
+ bool policyExceeded;
- static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
-
- public:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize);
- QueuePolicy(const qpid::framing::FieldTable& settings);
- void enqueued(uint64_t size);
- void dequeued(uint64_t size);
- void update(qpid::framing::FieldTable& settings);
- bool limitExceeded();
- uint32_t getMaxCount() const { return maxCount; }
- uint64_t getMaxSize() const { return maxSize; }
-
- static void setDefaultMaxSize(uint64_t);
- friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
- };
- }
-}
+ static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+
+ public:
+ typedef std::deque<QueuedMessage> Messages;
+ static QPID_BROKER_EXTERN const std::string maxCountKey;
+ static QPID_BROKER_EXTERN const std::string maxSizeKey;
+ static QPID_BROKER_EXTERN const std::string typeKey;
+ static QPID_BROKER_EXTERN const std::string REJECT;
+ static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK;
+ static QPID_BROKER_EXTERN const std::string RING;
+ static QPID_BROKER_EXTERN const std::string RING_STRICT;
+
+ virtual ~QueuePolicy() {}
+ QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
+ virtual void enqueued(const QueuedMessage&);
+ virtual void dequeued(const QueuedMessage&);
+ virtual bool isEnqueued(const QueuedMessage&);
+ QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
+ uint32_t getMaxCount() const { return maxCount; }
+ uint64_t getMaxSize() const { return maxSize; }
+ void encode(framing::Buffer& buffer) const;
+ void decode ( framing::Buffer& buffer );
+ uint32_t encodedSize() const;
+ virtual void getPendingDequeues(Messages& result);
+
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static std::string getType(const qpid::framing::FieldTable& settings);
+ static void setDefaultMaxSize(uint64_t);
+ friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&,
+ const QueuePolicy&);
+ protected:
+ const std::string name;
+
+ QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+ virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
+ void enqueued(uint64_t size);
+ void dequeued(uint64_t size);
+};
+
+
+class FlowToDiskPolicy : public QueuePolicy
+{
+ public:
+ FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
+};
+
+class RingQueuePolicy : public QueuePolicy
+{
+ public:
+ RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ bool isEnqueued(const QueuedMessage&);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
+ void getPendingDequeues(Messages& result);
+ private:
+ Messages pendingDequeues;
+ Messages queue;
+ const bool strict;
+
+ bool find(const QueuedMessage&, Messages&, bool remove);
+};
+
+}}
#endif