diff options
author | Keith Wall <kwall@apache.org> | 2015-06-03 21:45:58 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-06-03 21:45:58 +0000 |
commit | f016b0e42f1862fb05b65cb6a7e7f03d013b31c3 (patch) | |
tree | bebee59dc4405c7791349e426a32bed62ee059fe | |
parent | 3fb144cc7da60cf0d7a8f1bbb684b89e71349e14 (diff) | |
download | qpid-python-f016b0e42f1862fb05b65cb6a7e7f03d013b31c3.tar.gz |
QPID-6567: [Python Client 0-8..0-91] Support producer side flow control in the Python client
* Like the Qpid Java Client, this implementation does not send channel.flow-ok.
Work by Lorenz Quack <quack.lorenz@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1683432 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/client.py | 8 | ||||
-rw-r--r-- | qpid/python/qpid/peer.py | 35 | ||||
-rw-r--r-- | qpid/python/qpid/testlib.py | 4 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_9/queue.py | 37 |
4 files changed, 76 insertions, 8 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index 572eaaa076..24b238f05e 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -83,7 +83,8 @@ class Client: def start(self, response=None, mechanism=None, locale="en_US", tune_params=None, username=None, password=None, - client_properties=None, connection_options=None, sasl_options = None): + client_properties=None, connection_options=None, sasl_options = None, + channel_options=None): self.mechanism = mechanism self.response = response self.username = username @@ -94,7 +95,7 @@ class Client: self.sasl_options = sasl_options self.socket = connect(self.host, self.port, connection_options) self.conn = Connection(self.socket, self.spec) - self.peer = Peer(self.conn, ClientDelegate(self), Session) + self.peer = Peer(self.conn, ClientDelegate(self), Session, channel_options) self.conn.init() self.peer.start() @@ -206,6 +207,9 @@ class ClientDelegate(Delegate): def channel_close(self, ch, msg): ch.closed(msg) + def channel_flow(self, ch, msg): + ch.set_flow_control(not msg.active) + def session_ack(self, ch, msg): pass diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index a49dc3e661..02986dc9a0 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -31,7 +31,7 @@ from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO from time import time -from exceptions import Closed +from exceptions import Closed, Timeout from logging import getLogger log = getLogger("qpid.peer") @@ -55,7 +55,7 @@ class Sequence: class Peer: - def __init__(self, conn, delegate, channel_factory=None): + def __init__(self, conn, delegate, channel_factory=None, channel_options=None): self.conn = conn self.delegate = delegate self.outgoing = Queue(0) @@ -66,6 +66,9 @@ class Peer: self.channel_factory = channel_factory else: self.channel_factory = Channel + if channel_options is None: + channel_options = {} + self.channel_options = channel_options def channel(self, id): self.lock.acquire() @@ -73,7 +76,7 @@ class Peer: try: ch = self.channels[id] except KeyError: - ch = self.channel_factory(id, self.outgoing, self.conn.spec) + ch = self.channel_factory(id, self.outgoing, self.conn.spec, self.channel_options) self.channels[id] = ch finally: self.lock.release() @@ -205,7 +208,7 @@ class Responder: class Channel: - def __init__(self, id, outgoing, spec): + def __init__(self, id, outgoing, spec, options): self.id = id self.outgoing = outgoing self.spec = spec @@ -228,6 +231,10 @@ class Channel: self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) self.synchronous = True + self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60) + self._flow_control_wc = threading.Condition() + self._flow_control = False + def closed(self, reason): if self._closed: return @@ -339,6 +346,8 @@ class Channel: future = Future() self.futures[cmd_id] = future + if frame.method.klass.name == "basic" and frame.method.name == "publish": + self.check_flow_control() self.write(frame, content) try: @@ -381,6 +390,24 @@ class Channel: else: raise e + # part of flow control for AMQP 0-8, 0-9, and 0-9-1 + def set_flow_control(self, value): + self._flow_control_wc.acquire() + self._flow_control = value + if value == False: + self._flow_control_wc.notify() + self._flow_control_wc.release() + + # part of flow control for AMQP 0-8, 0-9, and 0-9-1 + def check_flow_control(self): + self._flow_control_wc.acquire() + if self._flow_control: + self._flow_control_wc.wait(self._flow_control_wait_failure) + if self._flow_control: + self._flow_control_wc.release() + raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control") + self._flow_control_wc.release() + def __getattr__(self, name): type = self.spec.method(name) if type == None: raise AttributeError(name) diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 841dcfbd88..256aa7b5e6 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -73,7 +73,7 @@ class TestBase(unittest.TestCase): self.client.close() - def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None): + def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None): """Create a new connction, return the Client object""" host = host or self.config.broker.host port = port or self.config.broker.port or 5672 @@ -81,7 +81,7 @@ class TestBase(unittest.TestCase): password = password or self.config.broker.password or "guest" client = qpid.client.Client(host, port) try: - client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties) + client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options) except qpid.client.Closed, e: if isinstance(e.args[0], VersionError): raise Skipped(e.args[0]) diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py index de1153307c..249850caf9 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py @@ -16,10 +16,12 @@ # specific language governing permissions and limitations # under the License. # +import time from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content from qpid.testlib import TestBase +from qpid.exceptions import Timeout class QueueTests(TestBase): """Tests for 'methods' on the amqp queue 'class'""" @@ -109,3 +111,38 @@ class QueueTests(TestBase): self.fail("Expected queue to have been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) + + def test_flow_control(self): + queue_name="flow-controled-queue" + + connection = self.connect(channel_options={"qpid.flow_control_wait_failure" : 1}) + channel = connection.channel(1) + channel.channel_open() + channel.queue_declare(queue=queue_name, arguments={"x-qpid-capacity" : 25, "x-qpid-flow-resume-capacity" : 15}) + + try: + for i in xrange(100): + channel.basic_publish(exchange="", routing_key=queue_name, + content=Content("This is a message with more than 25 bytes. This should trigger flow control.")) + time.sleep(.1) + self.fail("Flow Control did not work") + except Timeout: + # this is expected + pass + + consumer_reply = channel.basic_consume(queue=queue_name, consumer_tag="consumer", no_ack=True) + queue = self.client.queue(consumer_reply.consumer_tag) + while True: + try: + msg = queue.get(timeout=1) + except Empty: + break + channel.basic_cancel(consumer_tag=consumer_reply.consumer_tag) + + try: + channel.basic_publish(exchange="", routing_key=queue_name, + content=Content("This should not block because we have just cleared the queue.")) + except Timeout: + self.fail("Unexpected Timeout. Flow Control should not be in effect.") + + connection.close() |