summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-01-10 11:56:06 -0800
committerGitHub <noreply@github.com>2019-01-10 11:56:06 -0800
commitf9cb36af086510cab95e84a1e75d7f09d508040e (patch)
treef1fb26bb6837d0dc9312018d280b72214653b808
parenta3cfded93afa2a65908f05ac251b18d77fa84dd2 (diff)
parente6e52ee50cfc8234fb6775122b2149c7b3a002d6 (diff)
downloadredis-py-f9cb36af086510cab95e84a1e75d7f09d508040e.tar.gz
Merge pull request #1120 from johntmyers/add-noack
Added noack option for XREADGROUP
-rwxr-xr-xredis/client.py5
-rw-r--r--tests/test_commands.py10
2 files changed, 14 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py
index 2645e83..656df26 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -2199,7 +2199,7 @@ class Redis(object):
return self.execute_command('XREAD', *pieces)
def xreadgroup(self, groupname, consumername, streams, count=None,
- block=None):
+ block=None, noack=False):
"""
Read from a stream via a consumer group.
groupname: name of the consumer group.
@@ -2209,6 +2209,7 @@ class Redis(object):
count: if set, only return this many items, beginning with the
earliest available.
block: number of milliseconds to wait, if nothing already present.
+ noack: do not add messages to the PEL
"""
pieces = [Token.get_token('GROUP'), groupname, consumername]
if count is not None:
@@ -2222,6 +2223,8 @@ class Redis(object):
"integer")
pieces.append(Token.get_token("BLOCK"))
pieces.append(str(block))
+ if noack:
+ pieces.append(Token.get_token("NOACK"))
if not isinstance(streams, dict) or len(streams) == 0:
raise DataError('XREADGROUP streams must be a non empty dict')
pieces.append(Token.get_token('STREAMS'))
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 566437d..60f3e51 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -2205,6 +2205,16 @@ class TestRedisCommands(object):
# xread starting after the last message returns an empty message list
assert r.xreadgroup(group, consumer, streams={stream: '>'}) == expected
+ r.xgroup_destroy(stream, group)
+
+ # xreadgroup with noack does not have any items in the PEL
+ r.xgroup_create(stream, group, '0')
+ assert len(r.xreadgroup(group, consumer, streams={stream: '>'},
+ noack=True)[0][1]) == 2
+ # now there should be nothing pending
+ assert len(r.xreadgroup(group, consumer,
+ streams={stream: '0'})[0][1]) == 0
+
@skip_if_server_version_lt('5.0.0')
def test_xrevrange(self, r):
stream = 'stream'