summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp19
1 files changed, 16 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 8ad63e325f..173fcba552 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -188,6 +188,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
+ wakeupDriver();
}
return true;
} else {
@@ -195,12 +196,24 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
- while (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)) {
- QPID_LOG(notice, "Waiting for credit to be drained: " << (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)));
+ while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+ QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
wait();
}
+ if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
+ pn_link_flow(lnk->receiver, lnk->capacity);
+ }
+ }
+ if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (lnk->capacity) {
+ pn_link_flow(lnk->receiver, 1);
+ wakeupDriver();
+ }
+ return true;
+ } else {
+ return false;
}
- return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
}
}