diff options
-rw-r--r-- | CHANGES | 2 | ||||
-rwxr-xr-x | redis/client.py | 8 | ||||
-rw-r--r-- | tests/test_commands.py | 25 |
3 files changed, 34 insertions, 1 deletions
@@ -20,6 +20,8 @@ * Remove selectors in favor of nonblocking sockets. Selectors had issues in some environments including eventlet and gevent. This should resolve those issues with no other side effects. + * Fixed an issue with XCLAIM and previously claimed but not removed + messages. Thanks @thomdask. #1192/#1191 * 3.2.1 * Fix SentinelConnectionPool to work in multiprocess/forked environments. * 3.2.0 diff --git a/redis/client.py b/redis/client.py index aa1ca7e..c57af3b 100755 --- a/redis/client.py +++ b/redis/client.py @@ -283,7 +283,13 @@ def nativestr_or_none(response): def parse_stream_list(response): if response is None: return None - return [(r[0], pairs_to_dict(r[1])) for r in response] + data = [] + for r in response: + if r is not None: + data.append((r[0], pairs_to_dict(r[1]))) + else: + data.append((None, None)) + return data def pairs_to_dict_with_nativestr_keys(response): diff --git a/tests/test_commands.py b/tests/test_commands.py index 64cabdf..a41e1a2 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1909,6 +1909,31 @@ class TestRedisCommands(object): justid=True) == [message_id] @skip_if_server_version_lt('5.0.0') + def test_xclaim_trimmed(self, r): + # xclaim should not raise an exception if the item is not there + stream = 'stream' + group = 'group' + + r.xgroup_create(stream, group, id="$", mkstream=True) + + # add a couple of new items + sid1 = r.xadd(stream, {"item": 0}) + sid2 = r.xadd(stream, {"item": 0}) + + # read them from consumer1 + r.xreadgroup(group, 'consumer1', {stream: ">"}) + + # add a 3rd and trim the stream down to 2 items + r.xadd(stream, {"item": 3}, maxlen=2, approximate=False) + + # xclaim them from consumer2 + # the item that is still in the stream should be returned + item = r.xclaim(stream, group, 'consumer2', 0, [sid1, sid2]) + assert len(item) == 2 + assert item[0] == (None, None) + assert item[1][0] == sid2 + + @skip_if_server_version_lt('5.0.0') def test_xdel(self, r): stream = 'stream' |