diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 93 |
1 files changed, 49 insertions, 44 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index d826fef22c..d605e92b72 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -69,7 +69,11 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) } SemanticState::~SemanticState() { - consumers.clear(); + //cancel all consumers + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + cancel(i->second); + } + if (dtxBuffer.get()) { dtxBuffer->fail(); } @@ -86,16 +90,15 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); - queue->consume(c.get(), exclusive);//may throw exception - consumers.insert(tagInOut, c.release()); + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); + queue->consume(c, exclusive);//may throw exception + consumers[tagInOut] = c; } void SemanticState::cancel(const string& tag){ - // consumers is a ptr_map so erase will delete the consumer - // which will call cancel. ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { + cancel(i->second); consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery @@ -287,28 +290,19 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) } } -SemanticState::ConsumerImpl::~ConsumerImpl() { - cancel(); -} +SemanticState::ConsumerImpl::~ConsumerImpl() {} -void SemanticState::ConsumerImpl::cancel() +void SemanticState::cancel(ConsumerImpl::shared_ptr c) { + Queue::shared_ptr queue = c->getQueue(); if(queue) { - queue->cancel(this); + queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - parent->getSession().getBroker().getQueues().destroyIf( - queue->getName(), - boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); + Queue::tryAutoDelete(getSession().getBroker(), queue); } } } -void SemanticState::ConsumerImpl::requestDispatch() -{ - if(blocked) - queue->requestDispatch(this); -} - void SemanticState::handle(Message::shared_ptr msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); @@ -389,7 +383,21 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) //if the prefetch limit had previously been reached, or credit //had expired in windowing mode there may be messages that can //be now be delivered - for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); + requestDispatch(); +} + +void SemanticState::requestDispatch() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + requestDispatch(i->second); + } +} + +void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c) +{ + if(c->isBlocked()) { + c->getQueue()->requestDispatch(c); + } } void SemanticState::acknowledged(const DeliveryRecord& delivery) @@ -397,7 +405,7 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery) delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - i->acknowledged(delivery); + i->second->acknowledged(delivery); } } @@ -458,52 +466,55 @@ void SemanticState::flow(bool active) flowActive = active; if (requestDelivery) { //there may be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); + requestDispatch(); } } -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) { ConsumerImplMap::iterator i = consumers.find(destination); if (i == consumers.end()) { throw NotFoundException(QPID_MSG("Unknown destination " << destination)); } else { - return *i; + return i->second; } } void SemanticState::setWindowMode(const std::string& destination) { - find(destination).setWindowMode(); + find(destination)->setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination).setCreditMode(); + find(destination)->setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - find(destination).addByteCredit(value); + ConsumerImpl::shared_ptr c = find(destination); + c->addByteCredit(value); + requestDispatch(c); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - find(destination).addMessageCredit(value); + ConsumerImpl::shared_ptr c = find(destination); + c->addMessageCredit(value); + requestDispatch(c); } void SemanticState::flush(const std::string& destination) { - ConsumerImpl& c = find(destination); - c.flush(); + find(destination)->flush(); } void SemanticState::stop(const std::string& destination) { - find(destination).stop(); + find(destination)->stop(); } void SemanticState::ConsumerImpl::setWindowMode() @@ -518,24 +529,18 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { - { - Mutex::ScopedLock l(lock); - if (byteCredit != 0xFFFFFFFF) { - byteCredit += value; - } + Mutex::ScopedLock l(lock); + if (byteCredit != 0xFFFFFFFF) { + byteCredit += value; } - requestDispatch(); } void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { - { - Mutex::ScopedLock l(lock); - if (msgCredit != 0xFFFFFFFF) { - msgCredit += value; - } + Mutex::ScopedLock l(lock); + if (msgCredit != 0xFFFFFFFF) { + msgCredit += value; } - requestDispatch(); } void SemanticState::ConsumerImpl::flush() |