diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Consumer.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 29 |
1 files changed, 13 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 64073621be..64fc4288af 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -21,21 +21,23 @@ #ifndef _Consumer_ #define _Consumer_ -#include "qpid/broker/Message.h" -#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/QueueCursor.h" #include "qpid/broker/OwnershipToken.h" +#include <boost/shared_ptr.hpp> +#include <string> namespace qpid { namespace broker { +class DeliveryRecord; +class Message; class Queue; class QueueListeners; /** * Base class for consumers which represent a subscription to a queue. */ -class Consumer -{ +class Consumer : public QueueCursor { const bool acquires; // inListeners allows QueueListeners to efficiently track if this // instance is registered for notifications without having to @@ -47,22 +49,17 @@ class Consumer public: typedef boost::shared_ptr<Consumer> shared_ptr; - Consumer(const std::string& _name, bool preAcquires = true) - : acquires(preAcquires), inListeners(false), name(_name), position(0) {} + Consumer(const std::string& _name, SubscriptionType type) + : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name) {} virtual ~Consumer(){} bool preAcquires() const { return acquires; } const std::string& getName() const { return name; } - /**@return the position of the last message seen by this consumer */ - virtual framing::SequenceNumber getPosition() const { return position; } - - virtual void setPosition(framing::SequenceNumber pos) { position = pos; } - - virtual bool deliver(QueuedMessage& msg) = 0; + virtual bool deliver(const QueueCursor& cursor, const Message& msg) = 0; virtual void notify() = 0; - virtual bool filter(boost::intrusive_ptr<Message>) { return true; } - virtual bool accept(boost::intrusive_ptr<Message>) { return true; } + virtual bool filter(const Message&) { return true; } + virtual bool accept(const Message&) { return true; } virtual OwnershipToken* getSession() = 0; virtual void cancel() = 0; @@ -75,7 +72,7 @@ class Consumer * Not to be confused with accept() above, which is asking if * this consumer will consume/browse the message. */ - virtual void acknowledged(const QueuedMessage&) = 0; + virtual void acknowledged(const DeliveryRecord&) = 0; /** Called if queue has been deleted, if true suppress the error message. * Used by HA ReplicatingSubscriptions where such errors are normal. @@ -83,7 +80,7 @@ class Consumer virtual bool hideDeletedError() { return false; } protected: - framing::SequenceNumber position; + //framing::SequenceNumber position; private: friend class QueueListeners; |