diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2019-01-10 11:56:06 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-10 11:56:06 -0800 |
commit | f9cb36af086510cab95e84a1e75d7f09d508040e (patch) | |
tree | f1fb26bb6837d0dc9312018d280b72214653b808 | |
parent | a3cfded93afa2a65908f05ac251b18d77fa84dd2 (diff) | |
parent | e6e52ee50cfc8234fb6775122b2149c7b3a002d6 (diff) | |
download | redis-py-f9cb36af086510cab95e84a1e75d7f09d508040e.tar.gz |
Merge pull request #1120 from johntmyers/add-noack
Added noack option for XREADGROUP
-rwxr-xr-x | redis/client.py | 5 | ||||
-rw-r--r-- | tests/test_commands.py | 10 |
2 files changed, 14 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py index 2645e83..656df26 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2199,7 +2199,7 @@ class Redis(object): return self.execute_command('XREAD', *pieces) def xreadgroup(self, groupname, consumername, streams, count=None, - block=None): + block=None, noack=False): """ Read from a stream via a consumer group. groupname: name of the consumer group. @@ -2209,6 +2209,7 @@ class Redis(object): count: if set, only return this many items, beginning with the earliest available. block: number of milliseconds to wait, if nothing already present. + noack: do not add messages to the PEL """ pieces = [Token.get_token('GROUP'), groupname, consumername] if count is not None: @@ -2222,6 +2223,8 @@ class Redis(object): "integer") pieces.append(Token.get_token("BLOCK")) pieces.append(str(block)) + if noack: + pieces.append(Token.get_token("NOACK")) if not isinstance(streams, dict) or len(streams) == 0: raise DataError('XREADGROUP streams must be a non empty dict') pieces.append(Token.get_token('STREAMS')) diff --git a/tests/test_commands.py b/tests/test_commands.py index 566437d..60f3e51 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2205,6 +2205,16 @@ class TestRedisCommands(object): # xread starting after the last message returns an empty message list assert r.xreadgroup(group, consumer, streams={stream: '>'}) == expected + r.xgroup_destroy(stream, group) + + # xreadgroup with noack does not have any items in the PEL + r.xgroup_create(stream, group, '0') + assert len(r.xreadgroup(group, consumer, streams={stream: '>'}, + noack=True)[0][1]) == 2 + # now there should be nothing pending + assert len(r.xreadgroup(group, consumer, + streams={stream: '0'})[0][1]) == 0 + @skip_if_server_version_lt('5.0.0') def test_xrevrange(self, r): stream = 'stream' |