diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 15928ce599..67cfe808d0 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -26,7 +26,6 @@ #include "qpid/broker/Consumer.h" #include "qpid/broker/Credit.h" #include "qpid/broker/Deliverable.h" -#include "qpid/broker/DeliveryAdapter.h" #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/DtxBuffer.h" #include "qpid/broker/DtxManager.h" @@ -34,12 +33,15 @@ #include "qpid/broker/QueueObserver.h" #include "qpid/broker/TxBuffer.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/AggregateOutput.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicValue.h" +#include "qpid/broker/AclModule.h" +#include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Subscription.h" #include <list> @@ -47,13 +49,15 @@ #include <vector> #include <boost/enable_shared_from_this.hpp> -#include <boost/intrusive_ptr.hpp> #include <boost/cast.hpp> namespace qpid { namespace broker { +class Exchange; +class MessageStore; class SessionContext; +class SessionState; /** * @@ -94,28 +98,28 @@ class SemanticState : private boost::noncopyable { int deliveryCount; qmf::org::apache::qpid::broker::Subscription* mgmtObject; - bool checkCredit(boost::intrusive_ptr<Message>& msg); - void allocateCredit(boost::intrusive_ptr<Message>& msg); + bool checkCredit(const Message& msg); + void allocateCredit(const Message& msg); bool haveCredit(); protected: QPID_BROKER_EXTERN virtual bool doDispatch(); size_t unacked() { return parent->unacked.size(); } + QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>); public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; - QPID_BROKER_EXTERN ConsumerImpl( - SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, - bool ack, bool acquire, bool exclusive, - const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, - const framing::FieldTable& arguments); - QPID_BROKER_EXTERN virtual ~ConsumerImpl(); + QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, SubscriptionType type, bool exclusive, + const std::string& tag, const std::string& resumeId, + uint64_t resumeTtl, const framing::FieldTable& arguments); + QPID_BROKER_EXTERN ~ConsumerImpl(); QPID_BROKER_EXTERN OwnershipToken* getSession(); - QPID_BROKER_EXTERN virtual bool deliver(QueuedMessage& msg); - QPID_BROKER_EXTERN bool filter(boost::intrusive_ptr<Message> msg); - QPID_BROKER_EXTERN bool accept(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&); + QPID_BROKER_EXTERN bool filter(const Message&); + QPID_BROKER_EXTERN bool accept(const Message&); QPID_BROKER_EXTERN void cancel() {} QPID_BROKER_EXTERN void disableNotify(); @@ -153,7 +157,7 @@ class SemanticState : private boost::noncopyable { SemanticState& getParent() { return *parent; } const SemanticState& getParent() const { return *parent; } - void acknowledged(const broker::QueuedMessage&) {} + void acknowledged(const DeliveryRecord&) {} // manageable entry points QPID_BROKER_EXTERN management::ManagementObject* @@ -168,8 +172,7 @@ class SemanticState : private boost::noncopyable { private: typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; - SessionContext& session; - DeliveryAdapter& deliveryAdapter; + SessionState& session; ConsumerImplMap consumers; NameGenerator tagGenerator; DeliveryRecords unacked; @@ -185,7 +188,6 @@ class SemanticState : private boost::noncopyable { //needed for queue delete events in auto-delete: const std::string connectionId; - void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void checkDtxTimeout(); bool complete(DeliveryRecord&); @@ -196,11 +198,11 @@ class SemanticState : private boost::noncopyable { public: - SemanticState(DeliveryAdapter&, SessionContext&); + SemanticState(SessionState&); ~SemanticState(); - SessionContext& getSession() { return session; } - const SessionContext& getSession() const { return session; } + SessionContext& getSession(); + const SessionContext& getSession() const; const ConsumerImpl::shared_ptr find(const std::string& destination) const; bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const; @@ -239,12 +241,12 @@ class SemanticState : private boost::noncopyable { void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void recover(bool requeue); - void deliver(DeliveryRecord& message, bool sync); + TxBuffer* getTxBuffer(); + void requeue(); void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired); void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); - void handle(boost::intrusive_ptr<Message> msg); + void route(Message& msg, Deliverable& strategy); void completed(const framing::SequenceSet& commands); void accepted(const framing::SequenceSet& commands); |