summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-06-16 21:21:09 +0000
committerAlan Conway <aconway@apache.org>2009-06-16 21:21:09 +0000
commit80d65b38008d9b7f31c825508819f9600d63b63c (patch)
tree316862bff35f1cae6f0d1152dcf4a6e3b0f967ed /cpp/src/qpid/broker/SemanticState.cpp
parentf5e98a6dfb8c4defe22755340f440e6f16c2559a (diff)
downloadqpid-python-80d65b38008d9b7f31c825508819f9600d63b63c.tar.gz
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection holding only the OutputTasks that are potentially active. Tasks are droped from the list as soon as they return false, and added back when they may have output. Inlined frequently-used SequenceNumber functions. Replace std::list in QueueListeners with std::vector. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp58
1 files changed, 30 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 8f918ff40f..40c9bf296e 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -61,7 +61,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
- outputTasks(ss),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
{
@@ -90,7 +89,6 @@ void SemanticState::consume(const string& tag,
{
ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
- outputTasks.addOutputTask(c.get());
consumers[tag] = c;
}
@@ -98,7 +96,7 @@ void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
cancel(i->second);
- consumers.erase(i);
+ consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
@@ -257,9 +255,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
- queueHasMessages(1),
syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
- deliveryCount(0) {}
+ deliveryCount(0)
+{}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
+ // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // checkCredit fails because the message is to big, we should
+ // remain on queue's listener list for possible smaller messages
+ // in future.
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -328,7 +331,8 @@ SemanticState::ConsumerImpl::~ConsumerImpl() {}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
- outputTasks.removeOutputTask(c.get());
+ if (session.isAttached())
+ session.getConnection().outputTasks.removeOutputTask(c.get());
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
@@ -397,16 +401,18 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
}
void SemanticState::requestDispatch()
-{
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(*(i->second));
- }
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++)
+ i->second->requestDispatch();
}
-void SemanticState::requestDispatch(ConsumerImpl& c)
-{
- if(c.isBlocked())
- outputTasks.activateOutput();
+void SemanticState::ConsumerImpl::requestDispatch()
+{
+ if (blocked) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ blocked = false;
+ }
}
bool SemanticState::complete(DeliveryRecord& delivery)
@@ -475,7 +481,7 @@ void SemanticState::addByteCredit(const std::string& destination, uint32_t value
{
ConsumerImpl& c = find(destination);
c.addByteCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
@@ -483,7 +489,7 @@ void SemanticState::addMessageCredit(const std::string& destination, uint32_t va
{
ConsumerImpl& c = find(destination);
c.addMessageCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
void SemanticState::flush(const std::string& destination)
@@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::hasOutput() {
bool SemanticState::ConsumerImpl::doOutput()
{
- if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0))
- return false;
- if (queue->dispatch(shared_from_this()))
- queueHasMessages.boolCompareAndSwap(0, 1);
- return queueHasMessages.get();
+ return haveCredit() && queue->dispatch(shared_from_this());
}
void SemanticState::ConsumerImpl::enableNotify()
@@ -619,14 +621,11 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
- queueHasMessages.boolCompareAndSwap(0, 1);
-
- //TODO: alter this, don't want to hold locks across external
- //calls; for now its is required to protect the notify() from
- //having part of the object chain of the invocation being
- //concurrently deleted
Mutex::ScopedLock l(lock);
- if (notifyEnabled) parent->outputTasks.activateOutput();
+ if (notifyEnabled) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ }
}
@@ -670,13 +669,16 @@ void SemanticState::attached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
+ session.getConnection().outputTasks.addOutputTask(i->second.get());
}
+ session.getConnection().outputTasks.activateOutput();
}
void SemanticState::detached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
+ session.getConnection().outputTasks.removeOutputTask(i->second.get());
}
}