diff options
author | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
commit | 80d65b38008d9b7f31c825508819f9600d63b63c (patch) | |
tree | 316862bff35f1cae6f0d1152dcf4a6e3b0f967ed /cpp/src/qpid/broker/SemanticState.cpp | |
parent | f5e98a6dfb8c4defe22755340f440e6f16c2559a (diff) | |
download | qpid-python-80d65b38008d9b7f31c825508819f9600d63b63c.tar.gz |
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection
holding only the OutputTasks that are potentially active. Tasks are
droped from the list as soon as they return false, and added back when
they may have output.
Inlined frequently-used SequenceNumber functions.
Replace std::list in QueueListeners with std::vector.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 58 |
1 files changed, 30 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 8f918ff40f..40c9bf296e 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -61,7 +61,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), - outputTasks(ss), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))) { @@ -90,7 +89,6 @@ void SemanticState::consume(const string& tag, { ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception - outputTasks.addOutputTask(c.get()); consumers[tag] = c; } @@ -98,7 +96,7 @@ void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); - consumers.erase(i); + consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); @@ -257,9 +255,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, msgCredit(0), byteCredit(0), notifyEnabled(true), - queueHasMessages(1), syncFrequency(_arguments.getAsInt("qpid.sync_frequency")), - deliveryCount(0) {} + deliveryCount(0) +{} OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { + // FIXME aconway 2009-06-08: if we have byte & message credit but + // checkCredit fails because the message is to big, we should + // remain on queue's listener list for possible smaller messages + // in future. + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -328,7 +331,8 @@ SemanticState::ConsumerImpl::~ConsumerImpl() {} void SemanticState::cancel(ConsumerImpl::shared_ptr c) { c->disableNotify(); - outputTasks.removeOutputTask(c.get()); + if (session.isAttached()) + session.getConnection().outputTasks.removeOutputTask(c.get()); Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -397,16 +401,18 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { } void SemanticState::requestDispatch() -{ - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(*(i->second)); - } +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) + i->second->requestDispatch(); } -void SemanticState::requestDispatch(ConsumerImpl& c) -{ - if(c.isBlocked()) - outputTasks.activateOutput(); +void SemanticState::ConsumerImpl::requestDispatch() +{ + if (blocked) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + blocked = false; + } } bool SemanticState::complete(DeliveryRecord& delivery) @@ -475,7 +481,7 @@ void SemanticState::addByteCredit(const std::string& destination, uint32_t value { ConsumerImpl& c = find(destination); c.addByteCredit(value); - requestDispatch(c); + c.requestDispatch(); } @@ -483,7 +489,7 @@ void SemanticState::addMessageCredit(const std::string& destination, uint32_t va { ConsumerImpl& c = find(destination); c.addMessageCredit(value); - requestDispatch(c); + c.requestDispatch(); } void SemanticState::flush(const std::string& destination) @@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::hasOutput() { bool SemanticState::ConsumerImpl::doOutput() { - if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0)) - return false; - if (queue->dispatch(shared_from_this())) - queueHasMessages.boolCompareAndSwap(0, 1); - return queueHasMessages.get(); + return haveCredit() && queue->dispatch(shared_from_this()); } void SemanticState::ConsumerImpl::enableNotify() @@ -619,14 +621,11 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const { void SemanticState::ConsumerImpl::notify() { - queueHasMessages.boolCompareAndSwap(0, 1); - - //TODO: alter this, don't want to hold locks across external - //calls; for now its is required to protect the notify() from - //having part of the object chain of the invocation being - //concurrently deleted Mutex::ScopedLock l(lock); - if (notifyEnabled) parent->outputTasks.activateOutput(); + if (notifyEnabled) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + } } @@ -670,13 +669,16 @@ void SemanticState::attached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); + session.getConnection().outputTasks.addOutputTask(i->second.get()); } + session.getConnection().outputTasks.activateOutput(); } void SemanticState::detached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); + session.getConnection().outputTasks.removeOutputTask(i->second.get()); } } |