diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 26fd815424..5a83fd0fb3 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -30,6 +30,7 @@ #include "qpid/broker/DtxBuffer.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/NameGenerator.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/TxBuffer.h" #include "qpid/framing/FrameHandler.h" @@ -74,8 +75,10 @@ class SemanticState : private boost::noncopyable { public boost::enable_shared_from_this<ConsumerImpl>, public management::Manageable { + protected: mutable qpid::sys::Mutex lock; SemanticState* const parent; + private: const boost::shared_ptr<Queue> queue; const bool ackExpected; const bool acquire; @@ -95,17 +98,20 @@ class SemanticState : private boost::noncopyable { void allocateCredit(boost::intrusive_ptr<Message>& msg); bool haveCredit(); + protected: + virtual bool doDispatch(); + size_t unacked() { return parent->unacked.size(); } + public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; ConsumerImpl(SemanticState* parent, const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, - const std::string& tag, const std::string& resumeId, - uint64_t resumeTtl, const framing::FieldTable& arguments); - ~ConsumerImpl(); + const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + virtual ~ConsumerImpl(); OwnershipToken* getSession(); - bool deliver(QueuedMessage& msg); + virtual bool deliver(QueuedMessage& msg); bool filter(boost::intrusive_ptr<Message> msg); bool accept(boost::intrusive_ptr<Message> msg); void cancel() {} @@ -142,7 +148,10 @@ class SemanticState : private boost::noncopyable { SemanticState& getParent() { return *parent; } const SemanticState& getParent() const { return *parent; } - // Manageable entry points + + void acknowledged(const broker::QueuedMessage&) {} + + // manageable entry points management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; |