summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-01-27 10:30:56 -0800
committerGitHub <noreply@github.com>2019-01-27 10:30:56 -0800
commitbfa2e772566e973f8329ca9754d4ef7fbdbce1b0 (patch)
tree6ce3dfb141ae53fbb0457fc51d0801d4aa7a8f49
parente5d53dd7bffc7d0876efafc3445746e534b21855 (diff)
parentf9cb36af086510cab95e84a1e75d7f09d508040e (diff)
downloadredis-py-bfa2e772566e973f8329ca9754d4ef7fbdbce1b0.tar.gz
Merge branch 'master' into xreadgroup_handle_nil_fields
-rwxr-xr-xredis/client.py5
-rw-r--r--tests/test_commands.py9
2 files changed, 13 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py
index 154ddc1..1f96246 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -2201,7 +2201,7 @@ class Redis(object):
return self.execute_command('XREAD', *pieces)
def xreadgroup(self, groupname, consumername, streams, count=None,
- block=None):
+ block=None, noack=False):
"""
Read from a stream via a consumer group.
groupname: name of the consumer group.
@@ -2211,6 +2211,7 @@ class Redis(object):
count: if set, only return this many items, beginning with the
earliest available.
block: number of milliseconds to wait, if nothing already present.
+ noack: do not add messages to the PEL
"""
pieces = [Token.get_token('GROUP'), groupname, consumername]
if count is not None:
@@ -2224,6 +2225,8 @@ class Redis(object):
"integer")
pieces.append(Token.get_token("BLOCK"))
pieces.append(str(block))
+ if noack:
+ pieces.append(Token.get_token("NOACK"))
if not isinstance(streams, dict) or len(streams) == 0:
raise DataError('XREADGROUP streams must be a non empty dict')
pieces.append(Token.get_token('STREAMS'))
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 73a0052..60d1e28 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -2205,6 +2205,15 @@ class TestRedisCommands(object):
# xread starting after the last message returns an empty message list
assert r.xreadgroup(group, consumer, streams={stream: '>'}) == expected
+ # xreadgroup with noack does not have any items in the PEL
+ r.xgroup_destroy(stream, group)
+ r.xgroup_create(stream, group, '0')
+ assert len(r.xreadgroup(group, consumer, streams={stream: '>'},
+ noack=True)[0][1]) == 2
+ # now there should be nothing pending
+ assert len(r.xreadgroup(group, consumer,
+ streams={stream: '0'})[0][1]) == 0
+
r.xgroup_destroy(stream, group)
r.xgroup_create(stream, group, '0')
# delete all the messages in the stream