summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp11
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp15
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp10
3 files changed, 22 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 9586f6b994..3065041424 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -155,12 +155,14 @@ Consumer* Queue::allocate()
bool Queue::dispatch(QueuedMessage& msg)
{
Consumer* c = allocate();
- int start = next;
+ Consumer* first = c;
while(c){
if(c->deliver(msg)) {
return true;
+ } else {
+ c = allocate();
+ if (c == first) c = 0;
}
- c = next == start ? 0 : allocate();
}
return false;
}
@@ -170,7 +172,10 @@ void Queue::dispatch(){
while(true){
{
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) break;
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ break;
+ }
msg = messages.front();
}
if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index cc03ebe48f..d826fef22c 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -267,7 +267,8 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
{
Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "', bytes: " << byteCredit << " msgs: " << msgCredit);
+ QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
+ << ", bytes: " << byteCredit << " msgs: " << msgCredit);
return false;
} else {
uint32_t originalMsgCredit = msgCredit;
@@ -279,8 +280,8 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
- QPID_LOG(debug, "Credit available for '" << name
- << "', was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
+ QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
+ << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
return true;
}
@@ -519,7 +520,9 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
{
Mutex::ScopedLock l(lock);
- byteCredit += value;
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit += value;
+ }
}
requestDispatch();
}
@@ -528,7 +531,9 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
{
Mutex::ScopedLock l(lock);
- msgCredit += value;
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit += value;
+ }
}
requestDispatch();
}
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index ffb7c867e4..c8babaf421 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -292,12 +292,10 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
void AsynchIO::disconnected(DispatchHandle& h) {
- // If we've already queued close do it before callback
- if (queuedClose) {
- close(h);
- }
-
- if (disCallback) {
+ // If we've already queued close do it instead of disconnected callback
+ if (queuedClose) {
+ close(h);
+ } else if (disCallback) {
disCallback(*this);
h.unwatch();
}