diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 41 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 1 |
3 files changed, 37 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index b8981b4877..c91cfba2f8 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -73,21 +73,34 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId()), userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), - isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())) + isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), + closeComplete(false) { acl = getSession().getBroker().getAcl(); } SemanticState::~SemanticState() { - //cancel all consumers - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(i->second); - } + closed(); +} - if (dtxBuffer.get()) { - dtxBuffer->fail(); +void SemanticState::closed() { + if (!closeComplete) { + //prevent requeued messages being redelivered to consumers + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + disable(i->second); + } + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); + + //now unsubscribe, which may trigger queue deletion and thus + //needs to occur after the requeueing of unacked messages + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + unsubscribe(i->second); + } + closeComplete = true; } - recover(true); } bool SemanticState::exists(const string& consumerTag){ @@ -389,11 +402,15 @@ SemanticState::ConsumerImpl::~ConsumerImpl() mgmtObject->resourceDestroy (); } -void SemanticState::cancel(ConsumerImpl::shared_ptr c) +void SemanticState::disable(ConsumerImpl::shared_ptr c) { c->disableNotify(); if (session.isAttached()) session.getConnection().outputTasks.removeOutputTask(c.get()); +} + +void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) +{ Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -403,6 +420,12 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) } } +void SemanticState::cancel(ConsumerImpl::shared_ptr c) +{ + disable(c); + unsubscribe(c); +} + void SemanticState::handle(intrusive_ptr<Message> msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index cae852732d..2b314920e6 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -157,6 +157,7 @@ class SemanticState : private boost::noncopyable { const string userID; const string userName; const bool isDefaultRealm; + bool closeComplete; void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void checkDtxTimeout(); @@ -165,6 +166,8 @@ class SemanticState : private boost::noncopyable { AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void cancel(ConsumerImpl::shared_ptr); + void unsubscribe(ConsumerImpl::shared_ptr); + void disable(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionContext&); @@ -220,6 +223,7 @@ class SemanticState : private boost::noncopyable { void attached(); void detached(); + void closed(); // Used by cluster to re-create sessions template <class F> void eachConsumer(F f) { diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index ddf68cad2f..be4f8c7b40 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -88,6 +88,7 @@ SessionState::SessionState( } SessionState::~SessionState() { + semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); |