diff options
author | Avital Fine <79420960+AvitalFineRedis@users.noreply.github.com> | 2021-08-05 12:25:25 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-05 12:25:25 +0300 |
commit | ba30d027a9a55a2ffd44dc8ca01d526b8705ab03 (patch) | |
tree | ac973e76f7f8a857b28b177eafb01e51b2d1faa6 /redis/client.py | |
parent | 9c60670deea5c593e20204bbd5f172ccfcd1d9db (diff) | |
download | redis-py-ba30d027a9a55a2ffd44dc8ca01d526b8705ab03.tar.gz |
xautoclaim (#1529)
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/redis/client.py b/redis/client.py index 995239e..019ff4f 100755 --- a/redis/client.py +++ b/redis/client.py @@ -316,6 +316,12 @@ def parse_xclaim(response, **options): return parse_stream_list(response) +def parse_xautoclaim(response, **options): + if options.get('parse_justid', False): + return response[1] + return parse_stream_list(response[1]) + + def parse_xinfo_stream(response): data = pairs_to_dict(response, decode_keys=True) first = data['first-entry'] @@ -684,6 +690,7 @@ class Redis: 'SSCAN': parse_scan, 'TIME': lambda x: (int(x[0]), int(x[1])), 'XCLAIM': parse_xclaim, + 'XAUTOCLAIM': parse_xautoclaim, 'XGROUP CREATE': bool_ok, 'XGROUP DELCONSUMER': int, 'XGROUP DESTROY': bool, @@ -2601,6 +2608,46 @@ class Redis: pieces.extend(pair) return self.execute_command('XADD', name, *pieces) + def xautoclaim(self, name, groupname, consumername, min_idle_time, + start_id=0, count=None, justid=False): + """ + Transfers ownership of pending stream entries that match the specified + criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM, + but provides a more straightforward way to deal with message delivery + failures via SCAN-like semantics. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds. + start_id: filter messages with equal or greater ID. + count: optional integer, upper limit of the number of entries that the + command attempts to claim. Set to 100 by default. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message + """ + try: + if int(min_idle_time) < 0: + raise DataError("XAUTOCLAIM min_idle_time must be a non" + "negative integer") + except TypeError: + pass + + kwargs = {} + pieces = [name, groupname, consumername, min_idle_time, start_id] + + try: + if int(count) < 0: + raise DataError("XPENDING count must be a integer >= 0") + pieces.extend([b'COUNT', count]) + except TypeError: + pass + if justid: + pieces.append(b'JUSTID') + kwargs['parse_justid'] = True + + return self.execute_command('XAUTOCLAIM', *pieces, **kwargs) + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, idle=None, time=None, retrycount=None, force=False, justid=False): |