summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp93
1 files changed, 49 insertions, 44 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index d826fef22c..d605e92b72 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -69,7 +69,11 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
}
SemanticState::~SemanticState() {
- consumers.clear();
+ //cancel all consumers
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ cancel(i->second);
+ }
+
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
@@ -86,16 +90,15 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
- queue->consume(c.get(), exclusive);//may throw exception
- consumers.insert(tagInOut, c.release());
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
+ queue->consume(c, exclusive);//may throw exception
+ consumers[tagInOut] = c;
}
void SemanticState::cancel(const string& tag){
- // consumers is a ptr_map so erase will delete the consumer
- // which will call cancel.
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
+ cancel(i->second);
consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
@@ -287,28 +290,19 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
}
}
-SemanticState::ConsumerImpl::~ConsumerImpl() {
- cancel();
-}
+SemanticState::ConsumerImpl::~ConsumerImpl() {}
-void SemanticState::ConsumerImpl::cancel()
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
+ Queue::shared_ptr queue = c->getQueue();
if(queue) {
- queue->cancel(this);
+ queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- parent->getSession().getBroker().getQueues().destroyIf(
- queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ Queue::tryAutoDelete(getSession().getBroker(), queue);
}
}
}
-void SemanticState::ConsumerImpl::requestDispatch()
-{
- if(blocked)
- queue->requestDispatch(this);
-}
-
void SemanticState::handle(Message::shared_ptr msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
@@ -389,7 +383,21 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
//if the prefetch limit had previously been reached, or credit
//had expired in windowing mode there may be messages that can
//be now be delivered
- for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+ requestDispatch();
+}
+
+void SemanticState::requestDispatch()
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ requestDispatch(i->second);
+ }
+}
+
+void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c)
+{
+ if(c->isBlocked()) {
+ c->getQueue()->requestDispatch(c);
+ }
}
void SemanticState::acknowledged(const DeliveryRecord& delivery)
@@ -397,7 +405,7 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery)
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- i->acknowledged(delivery);
+ i->second->acknowledged(delivery);
}
}
@@ -458,52 +466,55 @@ void SemanticState::flow(bool active)
flowActive = active;
if (requestDelivery) {
//there may be messages that can be now be delivered
- std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+ requestDispatch();
}
}
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination)
{
ConsumerImplMap::iterator i = consumers.find(destination);
if (i == consumers.end()) {
throw NotFoundException(QPID_MSG("Unknown destination " << destination));
} else {
- return *i;
+ return i->second;
}
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination).setWindowMode();
+ find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination).setCreditMode();
+ find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- find(destination).addByteCredit(value);
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addByteCredit(value);
+ requestDispatch(c);
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- find(destination).addMessageCredit(value);
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addMessageCredit(value);
+ requestDispatch(c);
}
void SemanticState::flush(const std::string& destination)
{
- ConsumerImpl& c = find(destination);
- c.flush();
+ find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination).stop();
+ find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
@@ -518,24 +529,18 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
- {
- Mutex::ScopedLock l(lock);
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit += value;
- }
+ Mutex::ScopedLock l(lock);
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit += value;
}
- requestDispatch();
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
- {
- Mutex::ScopedLock l(lock);
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit += value;
- }
+ Mutex::ScopedLock l(lock);
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit += value;
}
- requestDispatch();
}
void SemanticState::ConsumerImpl::flush()