summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/client.py8
-rw-r--r--qpid/python/qpid/peer.py35
-rw-r--r--qpid/python/qpid/testlib.py4
3 files changed, 39 insertions, 8 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index 572eaaa076..24b238f05e 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -83,7 +83,8 @@ class Client:
def start(self, response=None, mechanism=None, locale="en_US", tune_params=None,
username=None, password=None,
- client_properties=None, connection_options=None, sasl_options = None):
+ client_properties=None, connection_options=None, sasl_options = None,
+ channel_options=None):
self.mechanism = mechanism
self.response = response
self.username = username
@@ -94,7 +95,7 @@ class Client:
self.sasl_options = sasl_options
self.socket = connect(self.host, self.port, connection_options)
self.conn = Connection(self.socket, self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self), Session)
+ self.peer = Peer(self.conn, ClientDelegate(self), Session, channel_options)
self.conn.init()
self.peer.start()
@@ -206,6 +207,9 @@ class ClientDelegate(Delegate):
def channel_close(self, ch, msg):
ch.closed(msg)
+ def channel_flow(self, ch, msg):
+ ch.set_flow_control(not msg.active)
+
def session_ack(self, ch, msg):
pass
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index a49dc3e661..02986dc9a0 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -31,7 +31,7 @@ from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
from time import time
-from exceptions import Closed
+from exceptions import Closed, Timeout
from logging import getLogger
log = getLogger("qpid.peer")
@@ -55,7 +55,7 @@ class Sequence:
class Peer:
- def __init__(self, conn, delegate, channel_factory=None):
+ def __init__(self, conn, delegate, channel_factory=None, channel_options=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
@@ -66,6 +66,9 @@ class Peer:
self.channel_factory = channel_factory
else:
self.channel_factory = Channel
+ if channel_options is None:
+ channel_options = {}
+ self.channel_options = channel_options
def channel(self, id):
self.lock.acquire()
@@ -73,7 +76,7 @@ class Peer:
try:
ch = self.channels[id]
except KeyError:
- ch = self.channel_factory(id, self.outgoing, self.conn.spec)
+ ch = self.channel_factory(id, self.outgoing, self.conn.spec, self.channel_options)
self.channels[id] = ch
finally:
self.lock.release()
@@ -205,7 +208,7 @@ class Responder:
class Channel:
- def __init__(self, id, outgoing, spec):
+ def __init__(self, id, outgoing, spec, options):
self.id = id
self.outgoing = outgoing
self.spec = spec
@@ -228,6 +231,10 @@ class Channel:
self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
self.synchronous = True
+ self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60)
+ self._flow_control_wc = threading.Condition()
+ self._flow_control = False
+
def closed(self, reason):
if self._closed:
return
@@ -339,6 +346,8 @@ class Channel:
future = Future()
self.futures[cmd_id] = future
+ if frame.method.klass.name == "basic" and frame.method.name == "publish":
+ self.check_flow_control()
self.write(frame, content)
try:
@@ -381,6 +390,24 @@ class Channel:
else:
raise e
+ # part of flow control for AMQP 0-8, 0-9, and 0-9-1
+ def set_flow_control(self, value):
+ self._flow_control_wc.acquire()
+ self._flow_control = value
+ if value == False:
+ self._flow_control_wc.notify()
+ self._flow_control_wc.release()
+
+ # part of flow control for AMQP 0-8, 0-9, and 0-9-1
+ def check_flow_control(self):
+ self._flow_control_wc.acquire()
+ if self._flow_control:
+ self._flow_control_wc.wait(self._flow_control_wait_failure)
+ if self._flow_control:
+ self._flow_control_wc.release()
+ raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control")
+ self._flow_control_wc.release()
+
def __getattr__(self, name):
type = self.spec.method(name)
if type == None: raise AttributeError(name)
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 841dcfbd88..256aa7b5e6 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -73,7 +73,7 @@ class TestBase(unittest.TestCase):
self.client.close()
- def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None):
"""Create a new connction, return the Client object"""
host = host or self.config.broker.host
port = port or self.config.broker.port or 5672
@@ -81,7 +81,7 @@ class TestBase(unittest.TestCase):
password = password or self.config.broker.password or "guest"
client = qpid.client.Client(host, port)
try:
- client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties)
+ client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options)
except qpid.client.Closed, e:
if isinstance(e.args[0], VersionError):
raise Skipped(e.args[0])