diff options
-rw-r--r-- | redis/commands.py | 12 | ||||
-rw-r--r-- | tests/test_commands.py | 16 |
2 files changed, 28 insertions, 0 deletions
diff --git a/redis/commands.py b/redis/commands.py index a9b90f0..29877db 100644 --- a/redis/commands.py +++ b/redis/commands.py @@ -1870,6 +1870,18 @@ class Commands: """ return self.execute_command('XGROUP DESTROY', name, groupname) + def xgroup_createconsumer(self, name, groupname, consumername): + """ + Consumers in a consumer group are auto-created every time a new + consumer name is mentioned by some command. + They can be explicitly created by using this command. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to create. + """ + return self.execute_command('XGROUP CREATECONSUMER', name, groupname, + consumername) + def xgroup_setid(self, name, groupname, id): """ Set the consumer group last delivered ID to something else. diff --git a/tests/test_commands.py b/tests/test_commands.py index 0e71fcf..0c9ca1e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2731,6 +2731,22 @@ class TestRedisCommands: # deleting the consumer should return 2 pending messages assert r.xgroup_delconsumer(stream, group, consumer) == 2 + @skip_if_server_version_lt('6.2.0') + def test_xgroup_createconsumer(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + assert r.xgroup_createconsumer(stream, group, consumer) == 1 + + # read all messages from the group + r.xreadgroup(group, consumer, streams={stream: '>'}) + + # deleting the consumer should return 2 pending messages + assert r.xgroup_delconsumer(stream, group, consumer) == 2 + @skip_if_server_version_lt('5.0.0') def test_xgroup_destroy(self, r): stream = 'stream' |