diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 41 |
1 files changed, 32 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index b8981b4877..c91cfba2f8 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -73,21 +73,34 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId()), userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), - isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())) + isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), + closeComplete(false) { acl = getSession().getBroker().getAcl(); } SemanticState::~SemanticState() { - //cancel all consumers - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(i->second); - } + closed(); +} - if (dtxBuffer.get()) { - dtxBuffer->fail(); +void SemanticState::closed() { + if (!closeComplete) { + //prevent requeued messages being redelivered to consumers + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + disable(i->second); + } + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); + + //now unsubscribe, which may trigger queue deletion and thus + //needs to occur after the requeueing of unacked messages + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + unsubscribe(i->second); + } + closeComplete = true; } - recover(true); } bool SemanticState::exists(const string& consumerTag){ @@ -389,11 +402,15 @@ SemanticState::ConsumerImpl::~ConsumerImpl() mgmtObject->resourceDestroy (); } -void SemanticState::cancel(ConsumerImpl::shared_ptr c) +void SemanticState::disable(ConsumerImpl::shared_ptr c) { c->disableNotify(); if (session.isAttached()) session.getConnection().outputTasks.removeOutputTask(c.get()); +} + +void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) +{ Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -403,6 +420,12 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) } } +void SemanticState::cancel(ConsumerImpl::shared_ptr c) +{ + disable(c); + unsubscribe(c); +} + void SemanticState::handle(intrusive_ptr<Message> msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); |