diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 58 |
1 files changed, 30 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 8f918ff40f..40c9bf296e 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -61,7 +61,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), - outputTasks(ss), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))) { @@ -90,7 +89,6 @@ void SemanticState::consume(const string& tag, { ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception - outputTasks.addOutputTask(c.get()); consumers[tag] = c; } @@ -98,7 +96,7 @@ void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); - consumers.erase(i); + consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); @@ -257,9 +255,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, msgCredit(0), byteCredit(0), notifyEnabled(true), - queueHasMessages(1), syncFrequency(_arguments.getAsInt("qpid.sync_frequency")), - deliveryCount(0) {} + deliveryCount(0) +{} OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { + // FIXME aconway 2009-06-08: if we have byte & message credit but + // checkCredit fails because the message is to big, we should + // remain on queue's listener list for possible smaller messages + // in future. + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -328,7 +331,8 @@ SemanticState::ConsumerImpl::~ConsumerImpl() {} void SemanticState::cancel(ConsumerImpl::shared_ptr c) { c->disableNotify(); - outputTasks.removeOutputTask(c.get()); + if (session.isAttached()) + session.getConnection().outputTasks.removeOutputTask(c.get()); Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -397,16 +401,18 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { } void SemanticState::requestDispatch() -{ - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(*(i->second)); - } +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) + i->second->requestDispatch(); } -void SemanticState::requestDispatch(ConsumerImpl& c) -{ - if(c.isBlocked()) - outputTasks.activateOutput(); +void SemanticState::ConsumerImpl::requestDispatch() +{ + if (blocked) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + blocked = false; + } } bool SemanticState::complete(DeliveryRecord& delivery) @@ -475,7 +481,7 @@ void SemanticState::addByteCredit(const std::string& destination, uint32_t value { ConsumerImpl& c = find(destination); c.addByteCredit(value); - requestDispatch(c); + c.requestDispatch(); } @@ -483,7 +489,7 @@ void SemanticState::addMessageCredit(const std::string& destination, uint32_t va { ConsumerImpl& c = find(destination); c.addMessageCredit(value); - requestDispatch(c); + c.requestDispatch(); } void SemanticState::flush(const std::string& destination) @@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::hasOutput() { bool SemanticState::ConsumerImpl::doOutput() { - if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0)) - return false; - if (queue->dispatch(shared_from_this())) - queueHasMessages.boolCompareAndSwap(0, 1); - return queueHasMessages.get(); + return haveCredit() && queue->dispatch(shared_from_this()); } void SemanticState::ConsumerImpl::enableNotify() @@ -619,14 +621,11 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const { void SemanticState::ConsumerImpl::notify() { - queueHasMessages.boolCompareAndSwap(0, 1); - - //TODO: alter this, don't want to hold locks across external - //calls; for now its is required to protect the notify() from - //having part of the object chain of the invocation being - //concurrently deleted Mutex::ScopedLock l(lock); - if (notifyEnabled) parent->outputTasks.activateOutput(); + if (notifyEnabled) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + } } @@ -670,13 +669,16 @@ void SemanticState::attached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); + session.getConnection().outputTasks.addOutputTask(i->second.get()); } + session.getConnection().outputTasks.activateOutput(); } void SemanticState::detached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); + session.getConnection().outputTasks.removeOutputTask(i->second.get()); } } |