diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 161 |
1 files changed, 72 insertions, 89 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index e790e087f0..76775d03d5 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -62,7 +62,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) tagGenerator("sgen"), dtxSelected(false), accumulatedAck(0), - flowActive(true) + flowActive(true), + outputTasks(ss) { outstanding.reset(); } @@ -70,7 +71,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) SemanticState::~SemanticState() { //cancel all consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(i->second); + cancel(*i); } if (dtxBuffer.get()) { @@ -89,19 +90,19 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); - queue->consume(c, exclusive);//may throw exception - consumers[tagInOut] = c; + std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); + queue->consume(*c, exclusive);//may throw exception + outputTasks.addOutputTask(c.get()); + consumers.insert(tagInOut, c.release()); } void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { - cancel(i->second); + cancel(*i); consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery - Mutex::ScopedLock locker(deliveryLock); for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag)); } @@ -232,7 +233,6 @@ void SemanticState::record(const DeliveryRecord& delivery) bool SemanticState::checkPrefetch(intrusive_ptr<Message>& msg) { - Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; @@ -254,37 +254,27 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ackExpected(ack), nolocal(_nolocal), acquire(_acquire), - blocked(false), + blocked(true), windowing(true), msgCredit(0), byteCredit(0) {} bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { - if (!parent->getSession().isAttached()) { - return false; - } - - if (nolocal && - &parent->getSession().getConnection() == msg.payload->getPublisher()) { - return false; - } else { - if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { - blocked = true; - } else { - blocked = false; - Mutex::ScopedLock locker(parent->deliveryLock); - - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); - if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg.payload); - } + if (parent->getSession().isAttached() && accept(msg.payload)) { + allocateCredit(msg.payload); + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + } + if (acquire && !ackExpected) { + queue->dequeue(0, msg.payload); } - return !blocked; + return true; + } else { + QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent); + return false; } } @@ -294,35 +284,48 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) &parent->getSession().getConnection() == msg->getPublisher()); } +bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) +{ + //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed): + blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg))); + return !blocked; +} + +void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) +{ + uint32_t originalMsgCredit = msgCredit; + uint32_t originalByteCredit = byteCredit; + if (msgCredit != 0xFFFFFFFF) { + msgCredit--; + } + if (byteCredit != 0xFFFFFFFF) { + byteCredit -= msg->getRequiredCredit(); + } + QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent + << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit + << " now bytes: " << byteCredit << " msgs: " << msgCredit); + +} + bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - Mutex::ScopedLock l(lock); if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent << ", bytes: " << byteCredit << " msgs: " << msgCredit); return false; } else { - uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; - - if (msgCredit != 0xFFFFFFFF) { - msgCredit--; - } - if (byteCredit != 0xFFFFFFFF) { - byteCredit -= msg->getRequiredCredit(); - } QPID_LOG(debug, "Credit available for '" << name << "' on " << parent - << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit - << " now bytes: " << byteCredit << " msgs: " << msgCredit); + << " bytes: " << byteCredit << " msgs: " << msgCredit); return true; } } SemanticState::ConsumerImpl::~ConsumerImpl() {} -void SemanticState::cancel(ConsumerImpl::shared_ptr c) +void SemanticState::cancel(ConsumerImpl& c) { - Queue::shared_ptr queue = c->getQueue(); + outputTasks.removeOutputTask(&c); + Queue::shared_ptr queue = c.getQueue(); if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { @@ -374,8 +377,6 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last) void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) { { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - ack_iterator start = cumulative ? unacked.begin() : find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); ack_iterator end = start; @@ -417,14 +418,14 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) void SemanticState::requestDispatch() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(i->second); + requestDispatch(*i); } } -void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c) +void SemanticState::requestDispatch(ConsumerImpl& c) { - if(c->isBlocked()) { - c->getQueue()->requestDispatch(c); + if(c.isBlocked()) { + c.doOutput(); } } @@ -433,14 +434,13 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery) delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - i->second->acknowledged(delivery); + i->acknowledged(delivery); } } void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) { if (windowing) { - Mutex::ScopedLock l(lock); if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); } @@ -448,8 +448,6 @@ void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) void SemanticState::recover(bool requeue) { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - if(requeue){ outstanding.reset(); //take copy and clear unacked as requeue may result in redelivery to this session @@ -470,7 +468,6 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue { QueuedMessage msg = queue->dequeue(); if(msg.payload){ - Mutex::ScopedLock locker(deliveryLock); DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); @@ -483,13 +480,11 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - Mutex::ScopedLock locker(deliveryLock); return deliveryAdapter.deliver(msg, token); } void SemanticState::flow(bool active) { - Mutex::ScopedLock locker(deliveryLock); bool requestDelivery(!flowActive && active); flowActive = active; if (requestDelivery) { @@ -499,50 +494,50 @@ void SemanticState::flow(bool active) } -SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) +SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) { ConsumerImplMap::iterator i = consumers.find(destination); if (i == consumers.end()) { throw NotFoundException(QPID_MSG("Unknown destination " << destination)); } else { - return i->second; + return *i; } } void SemanticState::setWindowMode(const std::string& destination) { - find(destination)->setWindowMode(); + find(destination).setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination)->setCreditMode(); + find(destination).setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - ConsumerImpl::shared_ptr c = find(destination); - c->addByteCredit(value); + ConsumerImpl& c = find(destination); + c.addByteCredit(value); requestDispatch(c); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - ConsumerImpl::shared_ptr c = find(destination); - c->addMessageCredit(value); + ConsumerImpl& c = find(destination); + c.addMessageCredit(value); requestDispatch(c); } void SemanticState::flush(const std::string& destination) { - find(destination)->flush(); + find(destination).flush(); } void SemanticState::stop(const std::string& destination) { - find(destination)->stop(); + find(destination).stop(); } void SemanticState::ConsumerImpl::setWindowMode() @@ -557,7 +552,6 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { - Mutex::ScopedLock l(lock); if (byteCredit != 0xFFFFFFFF) { byteCredit += value; } @@ -565,7 +559,6 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { - Mutex::ScopedLock l(lock); if (msgCredit != 0xFFFFFFFF) { msgCredit += value; } @@ -573,16 +566,12 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) void SemanticState::ConsumerImpl::flush() { - //need to prevent delivery after requestDispatch returns but - //before credit is reduced to zero - FlushCompletion completion(*this); - queue->flush(completion); - completion.wait(); + while(queue->dispatch(*this)); + stop(); } void SemanticState::ConsumerImpl::stop() { - Mutex::ScopedLock l(lock); msgCredit = 0; byteCredit = 0; } @@ -618,14 +607,12 @@ AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired) { - Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); for_each(range.start, range.end, AcquireFunctor(acquired)); } void SemanticState::release(DeliveryId first, DeliveryId last) { - Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); //release results in the message being added to the head so want //to release in reverse order to keep the original transfer order @@ -636,26 +623,22 @@ void SemanticState::release(DeliveryId first, DeliveryId last) void SemanticState::reject(DeliveryId first, DeliveryId last) { - Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); //need to remove the delivery records as well unacked.erase(range.start, range.end); } - -void SemanticState::FlushCompletion::wait() +bool SemanticState::ConsumerImpl::doOutput() { - Monitor::ScopedLock locker(lock); - while (!complete) lock.wait(); + //TODO: think through properly + return queue->dispatch(*this); } -void SemanticState::FlushCompletion::completed() +void SemanticState::ConsumerImpl::notify() { - Monitor::ScopedLock locker(lock); - consumer.stop(); - complete = true; - lock.notifyAll(); + //TODO: think through properly + parent->outputTasks.activateOutput(); } }} // namespace qpid::broker |