summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/python/qpid/client.py4
-rw-r--r--qpid/python/qpid/peer.py20
2 files changed, 13 insertions, 11 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index 3bbc097d25..5fedaa2cb1 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -208,12 +208,12 @@ class ClientDelegate(Delegate):
ch.closed(msg)
def channel_flow(self, ch, msg):
- # On resuming we want to minimize the possibility of sending a message before flow-ok has been sent.
+ # On resuming we don't want to send a message before flow-ok has been sent.
# Therefore, we send flow-ok before we set the flow_control flag.
if msg.active:
msg.flow_ok()
ch.set_flow_control(not msg.active)
- # On pausing we want to minimize the possibility of sending a message after flow-ok has been sent.
+ # On suspending we don't want to send a message after flow-ok has been sent.
# Therefore, we send flow-ok after we set the flow_control flag.
if not msg.active:
msg.flow_ok()
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 02986dc9a0..7b1faff190 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -232,7 +232,7 @@ class Channel:
self.synchronous = True
self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60)
- self._flow_control_wc = threading.Condition()
+ self._flow_control_wait_condition = threading.Condition()
self._flow_control = False
def closed(self, reason):
@@ -347,8 +347,12 @@ class Channel:
self.futures[cmd_id] = future
if frame.method.klass.name == "basic" and frame.method.name == "publish":
+ self._flow_control_wait_condition.acquire()
self.check_flow_control()
- self.write(frame, content)
+ self.write(frame, content)
+ self._flow_control_wait_condition.release()
+ else:
+ self.write(frame, content)
try:
# here we depend on all nowait fields being named nowait
@@ -392,21 +396,19 @@ class Channel:
# part of flow control for AMQP 0-8, 0-9, and 0-9-1
def set_flow_control(self, value):
- self._flow_control_wc.acquire()
+ self._flow_control_wait_condition.acquire()
self._flow_control = value
if value == False:
- self._flow_control_wc.notify()
- self._flow_control_wc.release()
+ self._flow_control_wait_condition.notify()
+ self._flow_control_wait_condition.release()
# part of flow control for AMQP 0-8, 0-9, and 0-9-1
def check_flow_control(self):
- self._flow_control_wc.acquire()
if self._flow_control:
- self._flow_control_wc.wait(self._flow_control_wait_failure)
+ self._flow_control_wait_condition.wait(self._flow_control_wait_failure)
if self._flow_control:
- self._flow_control_wc.release()
+ self._flow_control_wait_condition.release()
raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control")
- self._flow_control_wc.release()
def __getattr__(self, name):
type = self.spec.method(name)