summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Daskalakis <td@axiros.com>2019-07-15 14:20:27 +0200
committerAndy McCurdy <andy@andymccurdy.com>2019-07-17 15:49:02 -0700
commit5faed95b3b248d7d076298afd357a042cf916757 (patch)
tree7c1f1c2fa8ec5fe476ef31ead77ad6ccac5741a3
parentacac4db0c064d2618d75f27a58384c97c16458fd (diff)
downloadredis-py-5faed95b3b248d7d076298afd357a042cf916757.tar.gz
Handle removed claimed messages without an exception
Fixes #1191
-rw-r--r--CHANGES2
-rwxr-xr-xredis/client.py8
-rw-r--r--tests/test_commands.py25
3 files changed, 34 insertions, 1 deletions
diff --git a/CHANGES b/CHANGES
index e2d06c1..33309eb 100644
--- a/CHANGES
+++ b/CHANGES
@@ -20,6 +20,8 @@
* Remove selectors in favor of nonblocking sockets. Selectors had
issues in some environments including eventlet and gevent. This should
resolve those issues with no other side effects.
+ * Fixed an issue with XCLAIM and previously claimed but not removed
+ messages. Thanks @thomdask. #1192/#1191
* 3.2.1
* Fix SentinelConnectionPool to work in multiprocess/forked environments.
* 3.2.0
diff --git a/redis/client.py b/redis/client.py
index aa1ca7e..c57af3b 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -283,7 +283,13 @@ def nativestr_or_none(response):
def parse_stream_list(response):
if response is None:
return None
- return [(r[0], pairs_to_dict(r[1])) for r in response]
+ data = []
+ for r in response:
+ if r is not None:
+ data.append((r[0], pairs_to_dict(r[1])))
+ else:
+ data.append((None, None))
+ return data
def pairs_to_dict_with_nativestr_keys(response):
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 64cabdf..a41e1a2 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -1909,6 +1909,31 @@ class TestRedisCommands(object):
justid=True) == [message_id]
@skip_if_server_version_lt('5.0.0')
+ def test_xclaim_trimmed(self, r):
+ # xclaim should not raise an exception if the item is not there
+ stream = 'stream'
+ group = 'group'
+
+ r.xgroup_create(stream, group, id="$", mkstream=True)
+
+ # add a couple of new items
+ sid1 = r.xadd(stream, {"item": 0})
+ sid2 = r.xadd(stream, {"item": 0})
+
+ # read them from consumer1
+ r.xreadgroup(group, 'consumer1', {stream: ">"})
+
+ # add a 3rd and trim the stream down to 2 items
+ r.xadd(stream, {"item": 3}, maxlen=2, approximate=False)
+
+ # xclaim them from consumer2
+ # the item that is still in the stream should be returned
+ item = r.xclaim(stream, group, 'consumer2', 0, [sid1, sid2])
+ assert len(item) == 2
+ assert item[0] == (None, None)
+ assert item[1][0] == sid2
+
+ @skip_if_server_version_lt('5.0.0')
def test_xdel(self, r):
stream = 'stream'