summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-11-23 16:01:25 +0000
committerGordon Sim <gsim@apache.org>2011-11-23 16:01:25 +0000
commitc8f2e652489ca0fab6b4ce15416b16afdcd6b556 (patch)
treea434a0aaf73630fafebd5038e112d7e4efd7f5bc
parentceca56258e7d3b8aabd2eadbd00857eb69e825f5 (diff)
downloadqpid-python-c8f2e652489ca0fab6b4ce15416b16afdcd6b556.tar.gz
QPID-3629: Changed management of credit window
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1205467 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Credit.cpp151
-rw-r--r--cpp/src/qpid/broker/Credit.h96
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp61
-rw-r--r--cpp/src/qpid/broker/SemanticState.h11
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/Connection.h3
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp10
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp2
-rw-r--r--cpp/xml/cluster.xml2
-rw-r--r--python/qpid/testlib.py1
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/message.py78
14 files changed, 367 insertions, 57 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index deae682647..6b6133c562 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -993,6 +993,7 @@ set (qpidbroker_SOURCES
qpid/amqp_0_10/Connection.h
qpid/amqp_0_10/Connection.cpp
qpid/broker/Broker.cpp
+ qpid/broker/Credit.cpp
qpid/broker/Exchange.cpp
qpid/broker/ExpiryPolicy.cpp
qpid/broker/Fairshare.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 91c787502d..cd038a96d5 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -534,6 +534,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionToken.h \
qpid/broker/Consumer.h \
+ qpid/broker/Credit.h \
+ qpid/broker/Credit.cpp \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
qpid/broker/Deliverable.h \
diff --git a/cpp/src/qpid/broker/Credit.cpp b/cpp/src/qpid/broker/Credit.cpp
new file mode 100644
index 0000000000..c0e0b3b3d3
--- /dev/null
+++ b/cpp/src/qpid/broker/Credit.cpp
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Credit.h"
+
+namespace qpid {
+namespace broker {
+
+const uint32_t CreditBalance::INFINITE_CREDIT(0xFFFFFFFF);
+CreditBalance::CreditBalance() : balance(0) {}
+CreditBalance::~CreditBalance() {}
+void CreditBalance::clear() { balance = 0; }
+void CreditBalance::grant(uint32_t value)
+{
+ if (balance != INFINITE_CREDIT) {
+ if (value == INFINITE_CREDIT) {
+ balance = INFINITE_CREDIT;
+ } else if (INFINITE_CREDIT - balance > value) {
+ balance += value;
+ } else {
+ balance = INFINITE_CREDIT - 1;
+ }
+ }
+}
+void CreditBalance::consume(uint32_t value) { if (!unlimited()) balance -= value; }
+bool CreditBalance::check(uint32_t required) const { return balance >= required; }
+uint32_t CreditBalance::remaining() const { return balance; }
+uint32_t CreditBalance::allocated() const { return balance; }
+bool CreditBalance::unlimited() const { return balance == INFINITE_CREDIT; }
+
+CreditWindow::CreditWindow() : used(0) {}
+bool CreditWindow::check(uint32_t required) const { return CreditBalance::check(used + required); }
+void CreditWindow::consume(uint32_t value) { if (!unlimited()) used += value; }
+void CreditWindow::move(uint32_t value) { if (!unlimited()) used -= value; }
+uint32_t CreditWindow::remaining() const { return allocated() - used; }
+uint32_t CreditWindow::consumed() const { return used; }
+
+Credit::Credit() : windowing(true) {}
+void Credit::setWindowMode(bool b) { windowing = b; }
+bool Credit::isWindowMode() const { return windowing; }
+void Credit::addByteCredit(uint32_t value)
+{
+ bytes().grant(value);
+}
+void Credit::addMessageCredit(uint32_t value)
+{
+ messages().grant(value);
+}
+void Credit::cancel()
+{
+ messages().clear();
+ bytes().clear();
+}
+void Credit::moveWindow(uint32_t m, uint32_t b)
+{
+ if (windowing) {
+ window.messages.move(m);
+ window.bytes.move(b);
+ }
+}
+void Credit::consume(uint32_t m, uint32_t b)
+{
+ messages().consume(m);
+ bytes().consume(b);
+}
+bool Credit::check(uint32_t m, uint32_t b) const
+{
+ return messages().check(m) && bytes().check(b);
+}
+CreditPair<uint32_t> Credit::used() const
+{
+ CreditPair<uint32_t> result;
+ if (windowing) {
+ result.messages = window.messages.consumed();
+ result.bytes = window.bytes.consumed();
+ } else {
+ result.messages = 0;
+ result.bytes = 0;
+ }
+ return result;
+}
+CreditPair<uint32_t> Credit::allocated() const
+{
+ CreditPair<uint32_t> result;
+ result.messages = messages().allocated();
+ result.bytes = bytes().allocated();
+ return result;
+}
+Credit::operator bool() const
+{
+ return check(1,1);
+}
+CreditBalance& Credit::messages()
+{
+ if (windowing) return window.messages;
+ else return balance.messages;
+}
+CreditBalance& Credit::bytes()
+{
+ if (windowing) return window.bytes;
+ else return balance.bytes;
+}
+const CreditBalance& Credit::messages() const
+{
+ if (windowing) return window.messages;
+ else return balance.messages;
+}
+const CreditBalance& Credit::bytes() const
+{
+ if (windowing) return window.bytes;
+ else return balance.bytes;
+}
+std::ostream& operator<<(std::ostream& out, const CreditBalance& b)
+{
+ if (b.unlimited()) return out << "unlimited";
+ else return out << b.balance;
+}
+std::ostream& operator<<(std::ostream& out, const CreditWindow& w)
+{
+ if (w.unlimited()) return out << ((CreditBalance) w);
+ else return out << w.remaining() << " (from window of " << w.allocated() << ")";
+}
+template <class T>
+std::ostream& operator<<(std::ostream& out, const CreditPair<T>& pair)
+{
+ return out << "messages: " << pair.messages << " bytes: " << pair.bytes;
+}
+std::ostream& operator<<(std::ostream& out, const Credit& c)
+{
+ if (c.windowing) return out << c.window;
+ else return out << c.balance;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Credit.h b/cpp/src/qpid/broker/Credit.h
new file mode 100644
index 0000000000..7f98c8d071
--- /dev/null
+++ b/cpp/src/qpid/broker/Credit.h
@@ -0,0 +1,96 @@
+#ifndef QPID_BROKER_CREDIT_H
+#define QPID_BROKER_CREDIT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/IntegerTypes.h"
+#include <memory>
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+class CreditBalance {
+ public:
+ CreditBalance();
+ virtual ~CreditBalance();
+ void clear();
+ void grant(uint32_t value);
+ virtual void consume(uint32_t value);
+ virtual bool check(uint32_t required) const;
+ virtual uint32_t remaining() const;
+ uint32_t allocated() const;
+ bool unlimited() const;
+ static const uint32_t INFINITE_CREDIT;
+ friend std::ostream& operator<<(std::ostream&, const CreditBalance&);
+ private:
+ uint32_t balance;
+};
+
+class CreditWindow : public CreditBalance {
+ public:
+ CreditWindow();
+ bool check(uint32_t required) const;
+ void consume(uint32_t value);
+ void move(uint32_t value);
+ uint32_t remaining() const;
+ uint32_t consumed() const;
+ friend std::ostream& operator<<(std::ostream&, const CreditWindow&);
+ private:
+ uint32_t used;
+};
+
+template<class T> struct CreditPair
+{
+ T messages;
+ T bytes;
+};
+
+class Credit {
+ public:
+ Credit();
+ void setWindowMode(bool);
+ bool isWindowMode() const;
+ void addByteCredit(uint32_t);
+ void addMessageCredit(uint32_t);
+ void consume(uint32_t messages, uint32_t bytes);
+ void moveWindow(uint32_t messages, uint32_t bytes);
+ bool check(uint32_t messages, uint32_t bytes) const;
+ void cancel();
+ operator bool() const;
+ CreditPair<uint32_t> allocated() const;
+ CreditPair<uint32_t> used() const;
+ friend std::ostream& operator<<(std::ostream&, const Credit&);
+ private:
+ CreditPair<CreditBalance> balance;
+ CreditPair<CreditWindow> window;
+ bool windowing;
+ CreditBalance& bytes();
+ CreditBalance& messages();
+ const CreditBalance& bytes() const;
+ const CreditBalance& messages() const;
+};
+
+std::ostream& operator<<(std::ostream&, const Credit&);
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CREDIT_H*/
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index fbcb21eab9..aa1face18d 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -285,15 +285,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
ackExpected(ack),
acquire(_acquire),
blocked(true),
- windowing(true),
- windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
- byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
deliveryCount(0),
@@ -338,11 +334,11 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
- if (windowing || ackExpected || !acquire) {
+ if (credit.isWindowMode() || ackExpected || !acquire) {
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
@@ -385,28 +381,19 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) {
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
assertClusterSafe();
- uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit--;
- }
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit -= msg->getRequiredCredit();
- }
+ Credit original = credit;
+ credit.consume(1, msg->getRequiredCredit());
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
- << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
- << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+ << ", was " << original << " now " << credit);
}
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- bool enoughCredit = msgCredit > 0 &&
- (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
- QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ")
- << ConsumerName(*this)
- << ", have bytes: " << byteCredit << " msgs: " << msgCredit
- << ", need " << msg->getRequiredCredit() << " bytes");
+ bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+ QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
+ << " credit for message of " << msg->getRequiredCredit() << " bytes: "
+ << credit);
return enoughCredit;
}
@@ -539,9 +526,8 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
if (!delivery.isComplete()) {
delivery.complete();
- if (windowing && windowActive) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+ if (credit.isWindowMode()) {
+ credit.moveWindow(1, delivery.getCredit());
}
}
}
@@ -628,7 +614,7 @@ void SemanticState::stop(const std::string& destination)
void SemanticState::ConsumerImpl::setWindowMode()
{
assertClusterSafe();
- windowing = true;
+ credit.setWindowMode(true);
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
}
@@ -637,7 +623,7 @@ void SemanticState::ConsumerImpl::setWindowMode()
void SemanticState::ConsumerImpl::setCreditMode()
{
assertClusterSafe();
- windowing = false;
+ credit.setWindowMode(false);
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
}
@@ -646,26 +632,18 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
assertClusterSafe();
- if (windowing) windowActive = true;
- if (byteCredit != 0xFFFFFFFF) {
- if (value == 0xFFFFFFFF) byteCredit = value;
- else byteCredit += value;
- }
+ credit.addByteCredit(value);
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
assertClusterSafe();
- if (windowing) windowActive = true;
- if (msgCredit != 0xFFFFFFFF) {
- if (value == 0xFFFFFFFF) msgCredit = value;
- else msgCredit += value;
- }
+ credit.addMessageCredit(value);
}
bool SemanticState::ConsumerImpl::haveCredit()
{
- if (msgCredit && byteCredit) {
+ if (credit) {
return true;
} else {
blocked = true;
@@ -677,16 +655,13 @@ void SemanticState::ConsumerImpl::flush()
{
while(haveCredit() && queue->dispatch(shared_from_this()))
;
- msgCredit = 0;
- byteCredit = 0;
+ credit.cancel();
}
void SemanticState::ConsumerImpl::stop()
{
assertClusterSafe();
- msgCredit = 0;
- byteCredit = 0;
- windowActive = false;
+ credit.cancel();
}
Queue::shared_ptr SemanticState::getQueue(const string& name) const {
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 6d88dd56d9..7e97a5c4cf 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -23,6 +23,7 @@
*/
#include "qpid/broker/Consumer.h"
+#include "qpid/broker/Credit.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/DeliveryAdapter.h"
#include "qpid/broker/DeliveryRecord.h"
@@ -79,15 +80,12 @@ class SemanticState : private boost::noncopyable {
const bool ackExpected;
const bool acquire;
bool blocked;
- bool windowing;
- bool windowActive;
bool exclusive;
std::string resumeId;
const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
uint64_t resumeTtl;
framing::FieldTable arguments;
- uint32_t msgCredit;
- uint32_t byteCredit;
+ Credit credit;
bool notifyEnabled;
const int syncFrequency;
int deliveryCount;
@@ -131,12 +129,11 @@ class SemanticState : private boost::noncopyable {
bool doOutput();
+ Credit& getCredit() { return credit; }
+ const Credit& getCredit() const { return credit; }
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
- bool isWindowing() const { return windowing; }
bool isExclusive() const { return exclusive; }
- uint32_t getMsgCredit() const { return msgCredit; }
- uint32_t getByteCredit() const { return byteCredit; }
std::string getResumeId() const { return resumeId; };
const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e6e3de64f2..0241b0946b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1159329;
+const uint32_t Cluster::CLUSTER_VERSION = 1159330;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 394749aad2..17fcf6deb5 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -404,11 +404,13 @@ void Connection::shadowSetUser(const std::string& userId) {
connection->setUserId(userId);
}
-void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
+void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position,
+ uint32_t usedMsgCredit, uint32_t usedByteCredit)
{
broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
c->position = position;
c->setBlocked(blocked);
+ if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit);
if (notifyEnabled) c->enableNotify(); else c->disableNotify();
updateIn.consumerNumbering.add(c);
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index fe66b77238..f656ace45e 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -109,7 +109,8 @@ class Connection :
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
- void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position);
+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position,
+ uint32_t usedMsgCredit, uint32_t usedByteCredit);
// ==== Used in catch-up mode to build initial state.
//
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 2446c12f2b..25a3a3327c 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -535,14 +535,16 @@ void UpdateClient::updateConsumer(
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
- shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ shadowSession.messageSetFlowMode(ci->getTag(), ci->getCredit().isWindowMode() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getCredit().allocated().messages);
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getCredit().allocated().bytes);
ClusterConnectionProxy(shadowSession).consumerState(
ci->getTag(),
ci->isBlocked(),
ci->isNotifyEnabled(),
- ci->position
+ ci->position,
+ ci->getCredit().used().messages,
+ ci->getCredit().used().bytes
);
consumerNumbering.add(ci.get());
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 30441cd03c..1905219bf2 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -354,6 +354,8 @@ QPID_AUTO_TEST_CASE(testCompleteOnAccept) {
BOOST_CHECK(!q.get(m));
s.accept(accepted);
+ //need to reallocate credit as we have flushed it all out
+ s.setFlowControl(FlowControl::messageWindow(chunk));
fix.session.messageFlush(arg::destination=s.getName());
accepted.clear();
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 899625f5ec..3865916d97 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -176,6 +176,8 @@
<field name="blocked" type="bit"/>
<field name="notifyEnabled" type="bit"/>
<field name="position" type="sequence-no"/>
+ <field name="used-msg-credit" type="uint32"/>
+ <field name="used-byte-credit" type="uint32"/>
</control>
<!-- Delivery-record for outgoing messages sent but not yet accepted. -->
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 1439b892ea..1da53b3378 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -187,6 +187,7 @@ class TestBase010(unittest.TestCase):
self.conn = self.connect()
self.session = self.conn.session("test-session", timeout=10)
self.qmf = None
+ self.test_queue_name = self.id()
def startQmf(self, handler=None):
self.qmf = qmf.console.Session(handler)
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py
index 6c864bcd13..204b6ebd23 100644
--- a/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -611,6 +611,84 @@ class MessageTests(TestBase010):
msg = q.get(timeout = 1)
self.assertDataEquals(session, msg, "Message %d" % (i+6))
+ def test_credit_window_after_messagestop(self):
+ """
+ Tests that the broker's credit window size doesnt exceed the requested value when completing
+ previous messageTransfer commands after a message_stop and message_flow.
+ """
+
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True)
+
+ #send 11 messages
+ for i in range(1, 12):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i)))
+
+
+ #subscribe:
+ session.message_subscribe(queue=self.test_queue_name, destination="a")
+ a = session.incoming("a")
+ session.message_set_flow_mode(flow_mode = 1, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ # issue 5 message credits
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
+
+ # get 5 messages
+ ids = RangedSet()
+ for i in range(1, 6):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ ids.add(msg.id)
+
+ # now try and read a 6th message. we expect this to fail due to exhausted message credit.
+ try:
+ extra = a.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session.message_stop(destination = "a")
+
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
+
+ # complete earlier messages after setting the window to 5 message credits
+ session.channel.session_completed(ids)
+
+ # Now continue to read the next 5 messages
+ for i in range(6, 11):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
+ # now try and read the 11th message. we expect this to fail due to exhausted message credit. If we receive an
+ # 11th this indicates the broker is not respecting the client's requested window size.
+ try:
+ extra = a.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ def test_no_credit_wrap(self):
+ """
+ Ensure that adding credit does not result in wrapround, lowering the balance.
+ """
+ session = self.session
+
+ session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True)
+ session.message_subscribe(queue=self.test_queue_name, destination="a")
+ a = session.incoming("a")
+ session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFAL, destination = "a")
+ #test wraparound of credit balance does not occur
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ for i in range(1, 50):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i)))
+ session.message_flush(destination = "a")
+ for i in range(1, 50):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
def test_subscribe_not_acquired(self):
"""