summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis/commands.py12
-rw-r--r--tests/test_commands.py16
2 files changed, 28 insertions, 0 deletions
diff --git a/redis/commands.py b/redis/commands.py
index a9b90f0..29877db 100644
--- a/redis/commands.py
+++ b/redis/commands.py
@@ -1870,6 +1870,18 @@ class Commands:
"""
return self.execute_command('XGROUP DESTROY', name, groupname)
+ def xgroup_createconsumer(self, name, groupname, consumername):
+ """
+ Consumers in a consumer group are auto-created every time a new
+ consumer name is mentioned by some command.
+ They can be explicitly created by using this command.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of consumer to create.
+ """
+ return self.execute_command('XGROUP CREATECONSUMER', name, groupname,
+ consumername)
+
def xgroup_setid(self, name, groupname, id):
"""
Set the consumer group last delivered ID to something else.
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 0e71fcf..0c9ca1e 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -2731,6 +2731,22 @@ class TestRedisCommands:
# deleting the consumer should return 2 pending messages
assert r.xgroup_delconsumer(stream, group, consumer) == 2
+ @skip_if_server_version_lt('6.2.0')
+ def test_xgroup_createconsumer(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer = 'consumer'
+ r.xadd(stream, {'foo': 'bar'})
+ r.xadd(stream, {'foo': 'bar'})
+ r.xgroup_create(stream, group, 0)
+ assert r.xgroup_createconsumer(stream, group, consumer) == 1
+
+ # read all messages from the group
+ r.xreadgroup(group, consumer, streams={stream: '>'})
+
+ # deleting the consumer should return 2 pending messages
+ assert r.xgroup_delconsumer(stream, group, consumer) == 2
+
@skip_if_server_version_lt('5.0.0')
def test_xgroup_destroy(self, r):
stream = 'stream'