summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-06-03 21:45:58 +0000
committerKeith Wall <kwall@apache.org>2015-06-03 21:45:58 +0000
commitf016b0e42f1862fb05b65cb6a7e7f03d013b31c3 (patch)
treebebee59dc4405c7791349e426a32bed62ee059fe
parent3fb144cc7da60cf0d7a8f1bbb684b89e71349e14 (diff)
downloadqpid-python-f016b0e42f1862fb05b65cb6a7e7f03d013b31c3.tar.gz
QPID-6567: [Python Client 0-8..0-91] Support producer side flow control in the Python client
* Like the Qpid Java Client, this implementation does not send channel.flow-ok. Work by Lorenz Quack <quack.lorenz@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1683432 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/client.py8
-rw-r--r--qpid/python/qpid/peer.py35
-rw-r--r--qpid/python/qpid/testlib.py4
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_9/queue.py37
4 files changed, 76 insertions, 8 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index 572eaaa076..24b238f05e 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -83,7 +83,8 @@ class Client:
def start(self, response=None, mechanism=None, locale="en_US", tune_params=None,
username=None, password=None,
- client_properties=None, connection_options=None, sasl_options = None):
+ client_properties=None, connection_options=None, sasl_options = None,
+ channel_options=None):
self.mechanism = mechanism
self.response = response
self.username = username
@@ -94,7 +95,7 @@ class Client:
self.sasl_options = sasl_options
self.socket = connect(self.host, self.port, connection_options)
self.conn = Connection(self.socket, self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self), Session)
+ self.peer = Peer(self.conn, ClientDelegate(self), Session, channel_options)
self.conn.init()
self.peer.start()
@@ -206,6 +207,9 @@ class ClientDelegate(Delegate):
def channel_close(self, ch, msg):
ch.closed(msg)
+ def channel_flow(self, ch, msg):
+ ch.set_flow_control(not msg.active)
+
def session_ack(self, ch, msg):
pass
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index a49dc3e661..02986dc9a0 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -31,7 +31,7 @@ from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
from time import time
-from exceptions import Closed
+from exceptions import Closed, Timeout
from logging import getLogger
log = getLogger("qpid.peer")
@@ -55,7 +55,7 @@ class Sequence:
class Peer:
- def __init__(self, conn, delegate, channel_factory=None):
+ def __init__(self, conn, delegate, channel_factory=None, channel_options=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
@@ -66,6 +66,9 @@ class Peer:
self.channel_factory = channel_factory
else:
self.channel_factory = Channel
+ if channel_options is None:
+ channel_options = {}
+ self.channel_options = channel_options
def channel(self, id):
self.lock.acquire()
@@ -73,7 +76,7 @@ class Peer:
try:
ch = self.channels[id]
except KeyError:
- ch = self.channel_factory(id, self.outgoing, self.conn.spec)
+ ch = self.channel_factory(id, self.outgoing, self.conn.spec, self.channel_options)
self.channels[id] = ch
finally:
self.lock.release()
@@ -205,7 +208,7 @@ class Responder:
class Channel:
- def __init__(self, id, outgoing, spec):
+ def __init__(self, id, outgoing, spec, options):
self.id = id
self.outgoing = outgoing
self.spec = spec
@@ -228,6 +231,10 @@ class Channel:
self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
self.synchronous = True
+ self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60)
+ self._flow_control_wc = threading.Condition()
+ self._flow_control = False
+
def closed(self, reason):
if self._closed:
return
@@ -339,6 +346,8 @@ class Channel:
future = Future()
self.futures[cmd_id] = future
+ if frame.method.klass.name == "basic" and frame.method.name == "publish":
+ self.check_flow_control()
self.write(frame, content)
try:
@@ -381,6 +390,24 @@ class Channel:
else:
raise e
+ # part of flow control for AMQP 0-8, 0-9, and 0-9-1
+ def set_flow_control(self, value):
+ self._flow_control_wc.acquire()
+ self._flow_control = value
+ if value == False:
+ self._flow_control_wc.notify()
+ self._flow_control_wc.release()
+
+ # part of flow control for AMQP 0-8, 0-9, and 0-9-1
+ def check_flow_control(self):
+ self._flow_control_wc.acquire()
+ if self._flow_control:
+ self._flow_control_wc.wait(self._flow_control_wait_failure)
+ if self._flow_control:
+ self._flow_control_wc.release()
+ raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control")
+ self._flow_control_wc.release()
+
def __getattr__(self, name):
type = self.spec.method(name)
if type == None: raise AttributeError(name)
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 841dcfbd88..256aa7b5e6 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -73,7 +73,7 @@ class TestBase(unittest.TestCase):
self.client.close()
- def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None):
"""Create a new connction, return the Client object"""
host = host or self.config.broker.host
port = port or self.config.broker.port or 5672
@@ -81,7 +81,7 @@ class TestBase(unittest.TestCase):
password = password or self.config.broker.password or "guest"
client = qpid.client.Client(host, port)
try:
- client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties)
+ client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options)
except qpid.client.Closed, e:
if isinstance(e.args[0], VersionError):
raise Skipped(e.args[0])
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
index de1153307c..249850caf9 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
@@ -16,10 +16,12 @@
# specific language governing permissions and limitations
# under the License.
#
+import time
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
from qpid.testlib import TestBase
+from qpid.exceptions import Timeout
class QueueTests(TestBase):
"""Tests for 'methods' on the amqp queue 'class'"""
@@ -109,3 +111,38 @@ class QueueTests(TestBase):
self.fail("Expected queue to have been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
+
+ def test_flow_control(self):
+ queue_name="flow-controled-queue"
+
+ connection = self.connect(channel_options={"qpid.flow_control_wait_failure" : 1})
+ channel = connection.channel(1)
+ channel.channel_open()
+ channel.queue_declare(queue=queue_name, arguments={"x-qpid-capacity" : 25, "x-qpid-flow-resume-capacity" : 15})
+
+ try:
+ for i in xrange(100):
+ channel.basic_publish(exchange="", routing_key=queue_name,
+ content=Content("This is a message with more than 25 bytes. This should trigger flow control."))
+ time.sleep(.1)
+ self.fail("Flow Control did not work")
+ except Timeout:
+ # this is expected
+ pass
+
+ consumer_reply = channel.basic_consume(queue=queue_name, consumer_tag="consumer", no_ack=True)
+ queue = self.client.queue(consumer_reply.consumer_tag)
+ while True:
+ try:
+ msg = queue.get(timeout=1)
+ except Empty:
+ break
+ channel.basic_cancel(consumer_tag=consumer_reply.consumer_tag)
+
+ try:
+ channel.basic_publish(exchange="", routing_key=queue_name,
+ content=Content("This should not block because we have just cleared the queue."))
+ except Timeout:
+ self.fail("Unexpected Timeout. Flow Control should not be in effect.")
+
+ connection.close()