summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorAvital Fine <79420960+AvitalFineRedis@users.noreply.github.com>2021-08-05 12:25:25 +0300
committerGitHub <noreply@github.com>2021-08-05 12:25:25 +0300
commitba30d027a9a55a2ffd44dc8ca01d526b8705ab03 (patch)
treeac973e76f7f8a857b28b177eafb01e51b2d1faa6 /redis/client.py
parent9c60670deea5c593e20204bbd5f172ccfcd1d9db (diff)
downloadredis-py-ba30d027a9a55a2ffd44dc8ca01d526b8705ab03.tar.gz
xautoclaim (#1529)
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/redis/client.py b/redis/client.py
index 995239e..019ff4f 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -316,6 +316,12 @@ def parse_xclaim(response, **options):
return parse_stream_list(response)
+def parse_xautoclaim(response, **options):
+ if options.get('parse_justid', False):
+ return response[1]
+ return parse_stream_list(response[1])
+
+
def parse_xinfo_stream(response):
data = pairs_to_dict(response, decode_keys=True)
first = data['first-entry']
@@ -684,6 +690,7 @@ class Redis:
'SSCAN': parse_scan,
'TIME': lambda x: (int(x[0]), int(x[1])),
'XCLAIM': parse_xclaim,
+ 'XAUTOCLAIM': parse_xautoclaim,
'XGROUP CREATE': bool_ok,
'XGROUP DELCONSUMER': int,
'XGROUP DESTROY': bool,
@@ -2601,6 +2608,46 @@ class Redis:
pieces.extend(pair)
return self.execute_command('XADD', name, *pieces)
+ def xautoclaim(self, name, groupname, consumername, min_idle_time,
+ start_id=0, count=None, justid=False):
+ """
+ Transfers ownership of pending stream entries that match the specified
+ criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM,
+ but provides a more straightforward way to deal with message delivery
+ failures via SCAN-like semantics.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of a consumer that claims the message.
+ min_idle_time: filter messages that were idle less than this amount of
+ milliseconds.
+ start_id: filter messages with equal or greater ID.
+ count: optional integer, upper limit of the number of entries that the
+ command attempts to claim. Set to 100 by default.
+ justid: optional boolean, false by default. Return just an array of IDs
+ of messages successfully claimed, without returning the actual message
+ """
+ try:
+ if int(min_idle_time) < 0:
+ raise DataError("XAUTOCLAIM min_idle_time must be a non"
+ "negative integer")
+ except TypeError:
+ pass
+
+ kwargs = {}
+ pieces = [name, groupname, consumername, min_idle_time, start_id]
+
+ try:
+ if int(count) < 0:
+ raise DataError("XPENDING count must be a integer >= 0")
+ pieces.extend([b'COUNT', count])
+ except TypeError:
+ pass
+ if justid:
+ pieces.append(b'JUSTID')
+ kwargs['parse_justid'] = True
+
+ return self.execute_command('XAUTOCLAIM', *pieces, **kwargs)
+
def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
idle=None, time=None, retrycount=None, force=False,
justid=False):