diff options
Diffstat (limited to 'qpid/python')
-rw-r--r-- | qpid/python/qpid/client.py | 8 | ||||
-rw-r--r-- | qpid/python/qpid/peer.py | 35 | ||||
-rw-r--r-- | qpid/python/qpid/testlib.py | 4 |
3 files changed, 39 insertions, 8 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index 572eaaa076..24b238f05e 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -83,7 +83,8 @@ class Client: def start(self, response=None, mechanism=None, locale="en_US", tune_params=None, username=None, password=None, - client_properties=None, connection_options=None, sasl_options = None): + client_properties=None, connection_options=None, sasl_options = None, + channel_options=None): self.mechanism = mechanism self.response = response self.username = username @@ -94,7 +95,7 @@ class Client: self.sasl_options = sasl_options self.socket = connect(self.host, self.port, connection_options) self.conn = Connection(self.socket, self.spec) - self.peer = Peer(self.conn, ClientDelegate(self), Session) + self.peer = Peer(self.conn, ClientDelegate(self), Session, channel_options) self.conn.init() self.peer.start() @@ -206,6 +207,9 @@ class ClientDelegate(Delegate): def channel_close(self, ch, msg): ch.closed(msg) + def channel_flow(self, ch, msg): + ch.set_flow_control(not msg.active) + def session_ack(self, ch, msg): pass diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index a49dc3e661..02986dc9a0 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -31,7 +31,7 @@ from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO from time import time -from exceptions import Closed +from exceptions import Closed, Timeout from logging import getLogger log = getLogger("qpid.peer") @@ -55,7 +55,7 @@ class Sequence: class Peer: - def __init__(self, conn, delegate, channel_factory=None): + def __init__(self, conn, delegate, channel_factory=None, channel_options=None): self.conn = conn self.delegate = delegate self.outgoing = Queue(0) @@ -66,6 +66,9 @@ class Peer: self.channel_factory = channel_factory else: self.channel_factory = Channel + if channel_options is None: + channel_options = {} + self.channel_options = channel_options def channel(self, id): self.lock.acquire() @@ -73,7 +76,7 @@ class Peer: try: ch = self.channels[id] except KeyError: - ch = self.channel_factory(id, self.outgoing, self.conn.spec) + ch = self.channel_factory(id, self.outgoing, self.conn.spec, self.channel_options) self.channels[id] = ch finally: self.lock.release() @@ -205,7 +208,7 @@ class Responder: class Channel: - def __init__(self, id, outgoing, spec): + def __init__(self, id, outgoing, spec, options): self.id = id self.outgoing = outgoing self.spec = spec @@ -228,6 +231,10 @@ class Channel: self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) self.synchronous = True + self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60) + self._flow_control_wc = threading.Condition() + self._flow_control = False + def closed(self, reason): if self._closed: return @@ -339,6 +346,8 @@ class Channel: future = Future() self.futures[cmd_id] = future + if frame.method.klass.name == "basic" and frame.method.name == "publish": + self.check_flow_control() self.write(frame, content) try: @@ -381,6 +390,24 @@ class Channel: else: raise e + # part of flow control for AMQP 0-8, 0-9, and 0-9-1 + def set_flow_control(self, value): + self._flow_control_wc.acquire() + self._flow_control = value + if value == False: + self._flow_control_wc.notify() + self._flow_control_wc.release() + + # part of flow control for AMQP 0-8, 0-9, and 0-9-1 + def check_flow_control(self): + self._flow_control_wc.acquire() + if self._flow_control: + self._flow_control_wc.wait(self._flow_control_wait_failure) + if self._flow_control: + self._flow_control_wc.release() + raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control") + self._flow_control_wc.release() + def __getattr__(self, name): type = self.spec.method(name) if type == None: raise AttributeError(name) diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 841dcfbd88..256aa7b5e6 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -73,7 +73,7 @@ class TestBase(unittest.TestCase): self.client.close() - def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None): + def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None): """Create a new connction, return the Client object""" host = host or self.config.broker.host port = port or self.config.broker.port or 5672 @@ -81,7 +81,7 @@ class TestBase(unittest.TestCase): password = password or self.config.broker.password or "guest" client = qpid.client.Client(host, port) try: - client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties) + client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options) except qpid.client.Closed, e: if isinstance(e.args[0], VersionError): raise Skipped(e.args[0]) |