summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-16 16:47:18 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-16 16:47:18 +0000
commite06516ab7638b0ee1351bf1374818b4e6184e362 (patch)
tree2b57570c3b14254e75140e4fe3bfee5a45506427 /python
parentb7dfc1a379178e746cee2519cd95a67bce0ae2d7 (diff)
downloadqpid-python-e06516ab7638b0ee1351bf1374818b4e6184e362.tar.gz
performance tweaks for receive: added configurable threshold for issuing credit; don't disable byte credit more than necessary; avoided n-squared loop for generating acks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@955296 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging/driver.py19
-rw-r--r--python/qpid/messaging/endpoints.py7
2 files changed, 21 insertions, 5 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 8463aead13..16f1b2902e 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -208,6 +208,7 @@ class LinkIn:
_rcv.destination = str(rcv.id)
sst.destinations[_rcv.destination] = _rcv
_rcv.draining = False
+ _rcv.bytes_open = False
_rcv.on_unlink = []
def do_link(self, sst, rcv, _rcv, type, subtype, action):
@@ -762,6 +763,7 @@ class Engine:
sst.write_op(SessionCommandPoint(sst.sent, 0))
sst.outgoing_idx = 0
sst.acked = []
+ sst.acked_idx = 0
if ssn.transactional:
sst.write_cmd(TxSelect())
self._attachments[ssn] = sst
@@ -965,7 +967,8 @@ class Engine:
self.process_receiver(rcv)
if ssn.acked:
- messages = [m for m in ssn.acked if m not in sst.acked]
+ messages = ssn.acked[sst.acked_idx:]
+ delta = len(messages)
if messages:
ids = RangedSet()
@@ -975,6 +978,7 @@ class Engine:
# could we deal this via some message-id based purge?
if m._transfer_id is None:
ssn.acked.remove(m)
+ delta -= 1
continue
ids.add(m._transfer_id)
disp = m._disposition or DEFAULT_DISPOSITION
@@ -992,6 +996,7 @@ class Engine:
def ack_ack():
for m in msgs:
ssn.acked.remove(m)
+ sst.acked_idx -= 1
if not ssn.transactional:
sst.acked.remove(m)
return ack_ack
@@ -1011,7 +1016,9 @@ class Engine:
for m in msgs:
log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
+ # XXX: could add messages with _transfer_id of None
sst.acked.extend(messages)
+ sst.acked_idx += delta
if ssn.committing and not sst.committing:
def commit_ok():
@@ -1076,11 +1083,15 @@ class Engine:
delta = max(rcv.granted, rcv.received) - rcv.impending
if delta is UNLIMITED:
- sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ if not _rcv.bytes_open:
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ _rcv.bytes_open = True
sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
rcv.impending = UNLIMITED
elif delta > 0:
- sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ if not _rcv.bytes_open:
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ _rcv.bytes_open = True
sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
rcv.impending += delta
elif delta < 0 and not rcv.draining:
@@ -1088,6 +1099,7 @@ class Engine:
def do_stop():
rcv.impending = rcv.received
_rcv.draining = False
+ _rcv.bytes_open = False
self.grant(rcv)
sst.write_cmd(MessageStop(_rcv.destination), do_stop)
@@ -1097,6 +1109,7 @@ class Engine:
rcv.impending = rcv.received
rcv.granted = rcv.impending
_rcv.draining = False
+ _rcv.bytes_open = False
rcv.draining = False
sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index f5f957c821..707aee3ed6 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -29,6 +29,7 @@ Areas that still need work:
"""
from logging import getLogger
+from math import ceil
from qpid.codec010 import StringCodec
from qpid.concurrency import synchronized, Waiter, Condition
from qpid.datatypes import Serial, uuid4
@@ -843,6 +844,7 @@ class Receiver(object):
self._lock = self.session._lock
self._capacity = 0
self._set_capacity(options.get("capacity", 0), False)
+ self.threshold = 0.5
@synchronized
def _set_capacity(self, c, wakeup=True):
@@ -931,8 +933,9 @@ class Receiver(object):
if msg is None:
raise Empty()
elif self._capacity not in (0, UNLIMITED.value):
- self.granted += 1
- self._wakeup()
+ if self.received - self.returned <= int(ceil(self.threshold * self._capacity)):
+ self.granted = self.received + self._capacity
+ self._wakeup()
return msg
def _grant(self):