diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2019-01-27 10:30:56 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-27 10:30:56 -0800 |
commit | bfa2e772566e973f8329ca9754d4ef7fbdbce1b0 (patch) | |
tree | 6ce3dfb141ae53fbb0457fc51d0801d4aa7a8f49 | |
parent | e5d53dd7bffc7d0876efafc3445746e534b21855 (diff) | |
parent | f9cb36af086510cab95e84a1e75d7f09d508040e (diff) | |
download | redis-py-bfa2e772566e973f8329ca9754d4ef7fbdbce1b0.tar.gz |
Merge branch 'master' into xreadgroup_handle_nil_fields
-rwxr-xr-x | redis/client.py | 5 | ||||
-rw-r--r-- | tests/test_commands.py | 9 |
2 files changed, 13 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py index 154ddc1..1f96246 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2201,7 +2201,7 @@ class Redis(object): return self.execute_command('XREAD', *pieces) def xreadgroup(self, groupname, consumername, streams, count=None, - block=None): + block=None, noack=False): """ Read from a stream via a consumer group. groupname: name of the consumer group. @@ -2211,6 +2211,7 @@ class Redis(object): count: if set, only return this many items, beginning with the earliest available. block: number of milliseconds to wait, if nothing already present. + noack: do not add messages to the PEL """ pieces = [Token.get_token('GROUP'), groupname, consumername] if count is not None: @@ -2224,6 +2225,8 @@ class Redis(object): "integer") pieces.append(Token.get_token("BLOCK")) pieces.append(str(block)) + if noack: + pieces.append(Token.get_token("NOACK")) if not isinstance(streams, dict) or len(streams) == 0: raise DataError('XREADGROUP streams must be a non empty dict') pieces.append(Token.get_token('STREAMS')) diff --git a/tests/test_commands.py b/tests/test_commands.py index 73a0052..60d1e28 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2205,6 +2205,15 @@ class TestRedisCommands(object): # xread starting after the last message returns an empty message list assert r.xreadgroup(group, consumer, streams={stream: '>'}) == expected + # xreadgroup with noack does not have any items in the PEL + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, '0') + assert len(r.xreadgroup(group, consumer, streams={stream: '>'}, + noack=True)[0][1]) == 2 + # now there should be nothing pending + assert len(r.xreadgroup(group, consumer, + streams={stream: '0'})[0][1]) == 0 + r.xgroup_destroy(stream, group) r.xgroup_create(stream, group, '0') # delete all the messages in the stream |