summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2015-06-05 13:37:53 +0000
committerAlex Rudyy <orudyy@apache.org>2015-06-05 13:37:53 +0000
commite494a245b2174915f13eaadc45566e0467c96bb9 (patch)
treeb05deac1a7fb0499c6c764790b494c0634b2d1cb
parentdf9eb5f63c04baa2f5c7ecec61635537f64d9013 (diff)
downloadqpid-python-e494a245b2174915f13eaadc45566e0467c96bb9.tar.gz
QPID-6567: [Python Client 0-8..0-91] ensure client won't send messages after/before sending flow-ok on suspending/resuming respectively. work by Lorenz Quack <quack.lorenz@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1683751 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/client.py4
-rw-r--r--qpid/python/qpid/peer.py20
2 files changed, 13 insertions, 11 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index 3bbc097d25..5fedaa2cb1 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -208,12 +208,12 @@ class ClientDelegate(Delegate):
ch.closed(msg)
def channel_flow(self, ch, msg):
- # On resuming we want to minimize the possibility of sending a message before flow-ok has been sent.
+ # On resuming we don't want to send a message before flow-ok has been sent.
# Therefore, we send flow-ok before we set the flow_control flag.
if msg.active:
msg.flow_ok()
ch.set_flow_control(not msg.active)
- # On pausing we want to minimize the possibility of sending a message after flow-ok has been sent.
+ # On suspending we don't want to send a message after flow-ok has been sent.
# Therefore, we send flow-ok after we set the flow_control flag.
if not msg.active:
msg.flow_ok()
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 02986dc9a0..7b1faff190 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -232,7 +232,7 @@ class Channel:
self.synchronous = True
self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60)
- self._flow_control_wc = threading.Condition()
+ self._flow_control_wait_condition = threading.Condition()
self._flow_control = False
def closed(self, reason):
@@ -347,8 +347,12 @@ class Channel:
self.futures[cmd_id] = future
if frame.method.klass.name == "basic" and frame.method.name == "publish":
+ self._flow_control_wait_condition.acquire()
self.check_flow_control()
- self.write(frame, content)
+ self.write(frame, content)
+ self._flow_control_wait_condition.release()
+ else:
+ self.write(frame, content)
try:
# here we depend on all nowait fields being named nowait
@@ -392,21 +396,19 @@ class Channel:
# part of flow control for AMQP 0-8, 0-9, and 0-9-1
def set_flow_control(self, value):
- self._flow_control_wc.acquire()
+ self._flow_control_wait_condition.acquire()
self._flow_control = value
if value == False:
- self._flow_control_wc.notify()
- self._flow_control_wc.release()
+ self._flow_control_wait_condition.notify()
+ self._flow_control_wait_condition.release()
# part of flow control for AMQP 0-8, 0-9, and 0-9-1
def check_flow_control(self):
- self._flow_control_wc.acquire()
if self._flow_control:
- self._flow_control_wc.wait(self._flow_control_wait_failure)
+ self._flow_control_wait_condition.wait(self._flow_control_wait_failure)
if self._flow_control:
- self._flow_control_wc.release()
+ self._flow_control_wait_condition.release()
raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control")
- self._flow_control_wc.release()
def __getattr__(self, name):
type = self.spec.method(name)