summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-11-28 14:13:38 +0000
committerGordon Sim <gsim@apache.org>2012-11-28 14:13:38 +0000
commitfc6c37f46a02d5a85743c3432b11358a10e01c4a (patch)
treed1782a3be75adce9cf0da6391cbaab1b1a4af9c9
parent8b8226a06e72cdf414a7ddb14421ea7111d4c0a5 (diff)
downloadqpid-python-fc6c37f46a02d5a85743c3432b11358a10e01c4a.tar.gz
QPID-4460: Replenish credit to cover specified prefetch if it is drained
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1414708 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp19
2 files changed, 18 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 665bf2def4..9605cacac1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -145,6 +145,7 @@ void Outgoing::detached()
bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
{
Record& r = deliveries[current++];
+ if (current >= deliveries.capacity()) current = 0;
r.cursor = cursor;
r.msg = msg;
pn_delivery(link, r.tag);
@@ -161,7 +162,7 @@ void Outgoing::notify()
bool Outgoing::accept(const qpid::broker::Message&)
{
- return canDeliver();
+ return true;
}
void Outgoing::setSubjectFilter(const std::string& f)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 8ad63e325f..173fcba552 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -188,6 +188,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
+ wakeupDriver();
}
return true;
} else {
@@ -195,12 +196,24 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
- while (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)) {
- QPID_LOG(notice, "Waiting for credit to be drained: " << (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)));
+ while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+ QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
wait();
}
+ if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
+ pn_link_flow(lnk->receiver, lnk->capacity);
+ }
+ }
+ if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (lnk->capacity) {
+ pn_link_flow(lnk->receiver, 1);
+ wakeupDriver();
+ }
+ return true;
+ } else {
+ return false;
}
- return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
}
}