summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.h')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h19
1 files changed, 14 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 26fd815424..5a83fd0fb3 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -30,6 +30,7 @@
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/NameGenerator.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
@@ -74,8 +75,10 @@ class SemanticState : private boost::noncopyable {
public boost::enable_shared_from_this<ConsumerImpl>,
public management::Manageable
{
+ protected:
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
+ private:
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
@@ -95,17 +98,20 @@ class SemanticState : private boost::noncopyable {
void allocateCredit(boost::intrusive_ptr<Message>& msg);
bool haveCredit();
+ protected:
+ virtual bool doDispatch();
+ size_t unacked() { return parent->unacked.size(); }
+
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& tag, const std::string& resumeId,
- uint64_t resumeTtl, const framing::FieldTable& arguments);
- ~ConsumerImpl();
+ const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ virtual ~ConsumerImpl();
OwnershipToken* getSession();
- bool deliver(QueuedMessage& msg);
+ virtual bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
void cancel() {}
@@ -142,7 +148,10 @@ class SemanticState : private boost::noncopyable {
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
- // Manageable entry points
+
+ void acknowledged(const broker::QueuedMessage&) {}
+
+ // manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};