summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Consumer.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Consumer.h')
-rw-r--r--cpp/src/qpid/broker/Consumer.h29
1 files changed, 13 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 64073621be..64fc4288af 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -21,21 +21,23 @@
#ifndef _Consumer_
#define _Consumer_
-#include "qpid/broker/Message.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/OwnershipToken.h"
+#include <boost/shared_ptr.hpp>
+#include <string>
namespace qpid {
namespace broker {
+class DeliveryRecord;
+class Message;
class Queue;
class QueueListeners;
/**
* Base class for consumers which represent a subscription to a queue.
*/
-class Consumer
-{
+class Consumer : public QueueCursor {
const bool acquires;
// inListeners allows QueueListeners to efficiently track if this
// instance is registered for notifications without having to
@@ -47,22 +49,17 @@ class Consumer
public:
typedef boost::shared_ptr<Consumer> shared_ptr;
- Consumer(const std::string& _name, bool preAcquires = true)
- : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
+ Consumer(const std::string& _name, SubscriptionType type)
+ : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name) {}
virtual ~Consumer(){}
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
- /**@return the position of the last message seen by this consumer */
- virtual framing::SequenceNumber getPosition() const { return position; }
-
- virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
-
- virtual bool deliver(QueuedMessage& msg) = 0;
+ virtual bool deliver(const QueueCursor& cursor, const Message& msg) = 0;
virtual void notify() = 0;
- virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
- virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ virtual bool filter(const Message&) { return true; }
+ virtual bool accept(const Message&) { return true; }
virtual OwnershipToken* getSession() = 0;
virtual void cancel() = 0;
@@ -75,7 +72,7 @@ class Consumer
* Not to be confused with accept() above, which is asking if
* this consumer will consume/browse the message.
*/
- virtual void acknowledged(const QueuedMessage&) = 0;
+ virtual void acknowledged(const DeliveryRecord&) = 0;
/** Called if queue has been deleted, if true suppress the error message.
* Used by HA ReplicatingSubscriptions where such errors are normal.
@@ -83,7 +80,7 @@ class Consumer
virtual bool hideDeletedError() { return false; }
protected:
- framing::SequenceNumber position;
+ //framing::SequenceNumber position;
private:
friend class QueueListeners;