diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-16 09:11:48 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-16 09:11:48 +0000 |
commit | 0ae648d78f3970eb7fc96f000a5ba4f6444e4b6e (patch) | |
tree | cec83f3aa78dfdebc8c1828af30d0f42d6a2d64a /cpp/src | |
parent | 83e8536cab73c5d5e176d31abee62fafa3ff251e (diff) | |
download | qpid-python-0ae648d78f3970eb7fc96f000a5ba4f6444e4b6e.tar.gz |
* Revised allocation algorithm to ensure all consumers are given the opportunity to consume a message
* If already have infinit credit, don't try to add to it
* If get disconnected while processing close, just finish off the close and don't signal the disconnection
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@585085 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 10 |
3 files changed, 22 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9586f6b994..3065041424 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -155,12 +155,14 @@ Consumer* Queue::allocate() bool Queue::dispatch(QueuedMessage& msg) { Consumer* c = allocate(); - int start = next; + Consumer* first = c; while(c){ if(c->deliver(msg)) { return true; + } else { + c = allocate(); + if (c == first) c = 0; } - c = next == start ? 0 : allocate(); } return false; } @@ -170,7 +172,10 @@ void Queue::dispatch(){ while(true){ { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) break; + if (messages.empty()) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + break; + } msg = messages.front(); } if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index cc03ebe48f..d826fef22c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -267,7 +267,8 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) { Mutex::ScopedLock l(lock); if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "', bytes: " << byteCredit << " msgs: " << msgCredit); + QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent + << ", bytes: " << byteCredit << " msgs: " << msgCredit); return false; } else { uint32_t originalMsgCredit = msgCredit; @@ -279,8 +280,8 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) if (byteCredit != 0xFFFFFFFF) { byteCredit -= msg->getRequiredCredit(); } - QPID_LOG(debug, "Credit available for '" << name - << "', was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit + QPID_LOG(debug, "Credit available for '" << name << "' on " << parent + << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); return true; } @@ -519,7 +520,9 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { { Mutex::ScopedLock l(lock); - byteCredit += value; + if (byteCredit != 0xFFFFFFFF) { + byteCredit += value; + } } requestDispatch(); } @@ -528,7 +531,9 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { { Mutex::ScopedLock l(lock); - msgCredit += value; + if (msgCredit != 0xFFFFFFFF) { + msgCredit += value; + } } requestDispatch(); } diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index ffb7c867e4..c8babaf421 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -292,12 +292,10 @@ void AsynchIO::writeable(DispatchHandle& h) { } void AsynchIO::disconnected(DispatchHandle& h) { - // If we've already queued close do it before callback - if (queuedClose) { - close(h); - } - - if (disCallback) { + // If we've already queued close do it instead of disconnected callback + if (queuedClose) { + close(h); + } else if (disCallback) { disCallback(*this); h.unwatch(); } |