summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xredis/client.py7
-rw-r--r--tests/test_commands.py22
2 files changed, 27 insertions, 2 deletions
diff --git a/redis/client.py b/redis/client.py
index 0cb6fca..e095455 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -1848,14 +1848,17 @@ class StrictRedis(object):
"""
return self.execute_command('XDEL', name, *ids)
- def xgroup_create(self, name, groupname, id):
+ def xgroup_create(self, name, groupname, id='$', mkstream=False):
"""
Create a new consumer group associated with a stream.
name: name of the stream.
groupname: name of the consumer group.
id: ID of the last item in the stream to consider already delivered.
"""
- return self.execute_command('XGROUP CREATE', name, groupname, id)
+ pieces = ['XGROUP CREATE', name, groupname, id]
+ if mkstream:
+ pieces.append('MKSTREAM')
+ return self.execute_command(*pieces)
def xgroup_delconsumer(self, name, groupname, consumername):
"""
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 1d86e93..6394a7a 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -1750,6 +1750,28 @@ class TestRedisCommands(object):
assert r.xinfo_groups(stream) == expected
@skip_if_server_version_lt('5.0.0')
+ def test_xgroup_create_mkstream(self, r):
+ # tests xgroup_create and xinfo_groups
+ stream = 'stream'
+ group = 'group'
+
+ # an error is raised if a group is created on a stream that
+ # doesn't already exist
+ with pytest.raises(exceptions.ResponseError):
+ r.xgroup_create(stream, group, 0)
+
+ # however, with mkstream=True, the underlying stream is created
+ # automatically
+ assert r.xgroup_create(stream, group, 0, mkstream=True)
+ expected = [{
+ 'name': b(group),
+ 'consumers': 0,
+ 'pending': 0,
+ 'last-delivered-id': b('0-0')
+ }]
+ assert r.xinfo_groups(stream) == expected
+
+ @skip_if_server_version_lt('5.0.0')
def test_xgroup_delconsumer(self, r):
stream = 'stream'
group = 'group'