summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAvital Fine <79420960+AvitalFineRedis@users.noreply.github.com>2021-08-05 12:25:25 +0300
committerGitHub <noreply@github.com>2021-08-05 12:25:25 +0300
commitba30d027a9a55a2ffd44dc8ca01d526b8705ab03 (patch)
treeac973e76f7f8a857b28b177eafb01e51b2d1faa6
parent9c60670deea5c593e20204bbd5f172ccfcd1d9db (diff)
downloadredis-py-ba30d027a9a55a2ffd44dc8ca01d526b8705ab03.tar.gz
xautoclaim (#1529)
-rwxr-xr-xredis/client.py47
-rw-r--r--tests/test_commands.py48
2 files changed, 94 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py
index 995239e..019ff4f 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -316,6 +316,12 @@ def parse_xclaim(response, **options):
return parse_stream_list(response)
+def parse_xautoclaim(response, **options):
+ if options.get('parse_justid', False):
+ return response[1]
+ return parse_stream_list(response[1])
+
+
def parse_xinfo_stream(response):
data = pairs_to_dict(response, decode_keys=True)
first = data['first-entry']
@@ -684,6 +690,7 @@ class Redis:
'SSCAN': parse_scan,
'TIME': lambda x: (int(x[0]), int(x[1])),
'XCLAIM': parse_xclaim,
+ 'XAUTOCLAIM': parse_xautoclaim,
'XGROUP CREATE': bool_ok,
'XGROUP DELCONSUMER': int,
'XGROUP DESTROY': bool,
@@ -2601,6 +2608,46 @@ class Redis:
pieces.extend(pair)
return self.execute_command('XADD', name, *pieces)
+ def xautoclaim(self, name, groupname, consumername, min_idle_time,
+ start_id=0, count=None, justid=False):
+ """
+ Transfers ownership of pending stream entries that match the specified
+ criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM,
+ but provides a more straightforward way to deal with message delivery
+ failures via SCAN-like semantics.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of a consumer that claims the message.
+ min_idle_time: filter messages that were idle less than this amount of
+ milliseconds.
+ start_id: filter messages with equal or greater ID.
+ count: optional integer, upper limit of the number of entries that the
+ command attempts to claim. Set to 100 by default.
+ justid: optional boolean, false by default. Return just an array of IDs
+ of messages successfully claimed, without returning the actual message
+ """
+ try:
+ if int(min_idle_time) < 0:
+ raise DataError("XAUTOCLAIM min_idle_time must be a non"
+ "negative integer")
+ except TypeError:
+ pass
+
+ kwargs = {}
+ pieces = [name, groupname, consumername, min_idle_time, start_id]
+
+ try:
+ if int(count) < 0:
+ raise DataError("XPENDING count must be a integer >= 0")
+ pieces.extend([b'COUNT', count])
+ except TypeError:
+ pass
+ if justid:
+ pieces.append(b'JUSTID')
+ kwargs['parse_justid'] = True
+
+ return self.execute_command('XAUTOCLAIM', *pieces, **kwargs)
+
def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
idle=None, time=None, retrycount=None, force=False,
justid=False):
diff --git a/tests/test_commands.py b/tests/test_commands.py
index e92be62..60e667d 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -2374,13 +2374,59 @@ class TestRedisCommands:
r.xadd(stream, {'some': 'other'}, nomkstream=True)
assert r.xlen(stream) == 3
+ @skip_if_server_version_lt('6.2.0')
+ def test_xautoclaim(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer1 = 'consumer1'
+ consumer2 = 'consumer2'
+
+ message_id1 = r.xadd(stream, {'john': 'wick'})
+ message_id2 = r.xadd(stream, {'johny': 'deff'})
+ message = get_stream_message(r, stream, message_id1)
+ r.xgroup_create(stream, group, 0)
+
+ # trying to claim a message that isn't already pending doesn't
+ # do anything
+ response = r.xautoclaim(stream, group, consumer2, min_idle_time=0)
+ assert response == []
+
+ # read the group as consumer1 to initially claim the messages
+ r.xreadgroup(group, consumer1, streams={stream: '>'})
+
+ # claim one message as consumer2
+ response = r.xautoclaim(stream, group, consumer2,
+ min_idle_time=0, count=1)
+ assert response == [message]
+
+ # reclaim the messages as consumer1, but use the justid argument
+ # which only returns message ids
+ assert r.xautoclaim(stream, group, consumer1, min_idle_time=0,
+ start_id=0, justid=True) == \
+ [message_id1, message_id2]
+ assert r.xautoclaim(stream, group, consumer1, min_idle_time=0,
+ start_id=message_id2, justid=True) == \
+ [message_id2]
+
+ @skip_if_server_version_lt('6.2.0')
+ def test_xautoclaim_negative(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer = 'consumer'
+ with pytest.raises(redis.DataError):
+ r.xautoclaim(stream, group, consumer, min_idle_time=-1)
+ with pytest.raises(ValueError):
+ r.xautoclaim(stream, group, consumer, min_idle_time="wrong")
+ with pytest.raises(redis.DataError):
+ r.xautoclaim(stream, group, consumer, min_idle_time=0,
+ count=-1)
+
@skip_if_server_version_lt('5.0.0')
def test_xclaim(self, r):
stream = 'stream'
group = 'group'
consumer1 = 'consumer1'
consumer2 = 'consumer2'
-
message_id = r.xadd(stream, {'john': 'wick'})
message = get_stream_message(r, stream, message_id)
r.xgroup_create(stream, group, 0)