summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueuePolicy.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.h')
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h34
1 files changed, 20 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index 54745876d5..b2937e94c7 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -40,14 +40,14 @@ class QueuePolicy
uint32_t maxCount;
uint64_t maxSize;
const std::string type;
- qpid::sys::AtomicValue<uint32_t> count;
- qpid::sys::AtomicValue<uint64_t> size;
+ uint32_t count;
+ uint64_t size;
bool policyExceeded;
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
- static std::string getType(const qpid::framing::FieldTable& settings);
public:
+ typedef std::deque<QueuedMessage> Messages;
static QPID_BROKER_EXTERN const std::string maxCountKey;
static QPID_BROKER_EXTERN const std::string maxSizeKey;
static QPID_BROKER_EXTERN const std::string typeKey;
@@ -57,27 +57,34 @@ class QueuePolicy
static QPID_BROKER_EXTERN const std::string RING_STRICT;
virtual ~QueuePolicy() {}
- QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&);
+ QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
+ virtual void enqueued(const QueuedMessage&);
virtual void dequeued(const QueuedMessage&);
virtual bool isEnqueued(const QueuedMessage&);
- virtual bool checkLimit(const QueuedMessage&);
QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
uint32_t getMaxCount() const { return maxCount; }
uint64_t getMaxSize() const { return maxSize; }
void encode(framing::Buffer& buffer) const;
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
+ virtual void getPendingDequeues(Messages& result);
-
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static std::string getType(const qpid::framing::FieldTable& settings);
static void setDefaultMaxSize(uint64_t);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&,
const QueuePolicy&);
protected:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ const std::string name;
- virtual void enqueued(const QueuedMessage&);
+ QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+ virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
void enqueued(uint64_t size);
void dequeued(uint64_t size);
};
@@ -86,21 +93,20 @@ class QueuePolicy
class FlowToDiskPolicy : public QueuePolicy
{
public:
- FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
- bool checkLimit(const QueuedMessage&);
+ FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
};
class RingQueuePolicy : public QueuePolicy
{
public:
- RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);
- bool checkLimit(const QueuedMessage&);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
+ void getPendingDequeues(Messages& result);
private:
- typedef std::deque<QueuedMessage> Messages;
- qpid::sys::Mutex lock;
Messages pendingDequeues;
Messages queue;
const bool strict;