From ba30d027a9a55a2ffd44dc8ca01d526b8705ab03 Mon Sep 17 00:00:00 2001 From: Avital Fine <79420960+AvitalFineRedis@users.noreply.github.com> Date: Thu, 5 Aug 2021 12:25:25 +0300 Subject: xautoclaim (#1529) --- redis/client.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ tests/test_commands.py | 48 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 995239e..019ff4f 100755 --- a/redis/client.py +++ b/redis/client.py @@ -316,6 +316,12 @@ def parse_xclaim(response, **options): return parse_stream_list(response) +def parse_xautoclaim(response, **options): + if options.get('parse_justid', False): + return response[1] + return parse_stream_list(response[1]) + + def parse_xinfo_stream(response): data = pairs_to_dict(response, decode_keys=True) first = data['first-entry'] @@ -684,6 +690,7 @@ class Redis: 'SSCAN': parse_scan, 'TIME': lambda x: (int(x[0]), int(x[1])), 'XCLAIM': parse_xclaim, + 'XAUTOCLAIM': parse_xautoclaim, 'XGROUP CREATE': bool_ok, 'XGROUP DELCONSUMER': int, 'XGROUP DESTROY': bool, @@ -2601,6 +2608,46 @@ class Redis: pieces.extend(pair) return self.execute_command('XADD', name, *pieces) + def xautoclaim(self, name, groupname, consumername, min_idle_time, + start_id=0, count=None, justid=False): + """ + Transfers ownership of pending stream entries that match the specified + criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM, + but provides a more straightforward way to deal with message delivery + failures via SCAN-like semantics. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds. + start_id: filter messages with equal or greater ID. + count: optional integer, upper limit of the number of entries that the + command attempts to claim. Set to 100 by default. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message + """ + try: + if int(min_idle_time) < 0: + raise DataError("XAUTOCLAIM min_idle_time must be a non" + "negative integer") + except TypeError: + pass + + kwargs = {} + pieces = [name, groupname, consumername, min_idle_time, start_id] + + try: + if int(count) < 0: + raise DataError("XPENDING count must be a integer >= 0") + pieces.extend([b'COUNT', count]) + except TypeError: + pass + if justid: + pieces.append(b'JUSTID') + kwargs['parse_justid'] = True + + return self.execute_command('XAUTOCLAIM', *pieces, **kwargs) + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, idle=None, time=None, retrycount=None, force=False, justid=False): diff --git a/tests/test_commands.py b/tests/test_commands.py index e92be62..60e667d 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2374,13 +2374,59 @@ class TestRedisCommands: r.xadd(stream, {'some': 'other'}, nomkstream=True) assert r.xlen(stream) == 3 + @skip_if_server_version_lt('6.2.0') + def test_xautoclaim(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + + message_id1 = r.xadd(stream, {'john': 'wick'}) + message_id2 = r.xadd(stream, {'johny': 'deff'}) + message = get_stream_message(r, stream, message_id1) + r.xgroup_create(stream, group, 0) + + # trying to claim a message that isn't already pending doesn't + # do anything + response = r.xautoclaim(stream, group, consumer2, min_idle_time=0) + assert response == [] + + # read the group as consumer1 to initially claim the messages + r.xreadgroup(group, consumer1, streams={stream: '>'}) + + # claim one message as consumer2 + response = r.xautoclaim(stream, group, consumer2, + min_idle_time=0, count=1) + assert response == [message] + + # reclaim the messages as consumer1, but use the justid argument + # which only returns message ids + assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, + start_id=0, justid=True) == \ + [message_id1, message_id2] + assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, + start_id=message_id2, justid=True) == \ + [message_id2] + + @skip_if_server_version_lt('6.2.0') + def test_xautoclaim_negative(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + with pytest.raises(redis.DataError): + r.xautoclaim(stream, group, consumer, min_idle_time=-1) + with pytest.raises(ValueError): + r.xautoclaim(stream, group, consumer, min_idle_time="wrong") + with pytest.raises(redis.DataError): + r.xautoclaim(stream, group, consumer, min_idle_time=0, + count=-1) + @skip_if_server_version_lt('5.0.0') def test_xclaim(self, r): stream = 'stream' group = 'group' consumer1 = 'consumer1' consumer2 = 'consumer2' - message_id = r.xadd(stream, {'john': 'wick'}) message = get_stream_message(r, stream, message_id) r.xgroup_create(stream, group, 0) -- cgit v1.2.1