summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoey Prat <roey.prat@redislabs.com>2018-10-16 13:24:21 +0300
committerRoey Prat <roey.prat@redislabs.com>2018-10-20 10:33:10 +0300
commit7f6de586a1ca52230d39debf9e42ddd4416fd2ca (patch)
tree78358916c59a7f155bcd7d40fd4c0701b1e35287
parent0eee00ac355f97493d670041cd96975e890d8f19 (diff)
downloadredis-py-7f6de586a1ca52230d39debf9e42ddd4416fd2ca.tar.gz
Implements XCLAIM
-rwxr-xr-xredis/client.py67
1 files changed, 59 insertions, 8 deletions
diff --git a/redis/client.py b/redis/client.py
index 9600bc7..a66470d 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -326,7 +326,6 @@ def parse_zscan(response, **options):
it = iter(r)
return long(cursor), list(izip(it, imap(score_cast_func, it)))
-
def parse_slowlog_get(response, **options):
return [{
'id': item[0],
@@ -2008,17 +2007,17 @@ class StrictRedis(object):
if streams is None:
streams = {}
pieces = ['GROUP', groupname, consumername]
- if block is not None:
- if not isinstance(block, int) or block < 0:
- raise RedisError("XREAD block must be a non-negative integer")
- pieces.append("BLOCK")
- pieces.append(str(block))
if count is not None:
if not isinstance(count, int) or count < 1:
- raise RedisError("XREAD count must be a positive integer")
+ raise RedisError("XREADGROUP count must be a positive integer")
pieces.append("COUNT")
pieces.append(str(count))
-
+ if block is not None:
+ if not isinstance(block, int) or block < 0:
+ raise RedisError("XREADGROUP block must be a non-negative "
+ "integer")
+ pieces.append("BLOCK")
+ pieces.append(str(block))
pieces.append("STREAMS")
ids = []
for partial_stream in iteritems(streams):
@@ -2051,6 +2050,58 @@ class StrictRedis(object):
pieces.append(consumername)
return self.execute_command('XPENDING', *pieces)
+ def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
+ idle=None, time=None, retrycount=None, force=False,
+ justid=False):
+ """
+ Changes the ownership of a pending message.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of a consumer that claims the message.
+ min_idle_time: filter messages that were idle less than this amount of
+ milliseconds
+ message_ids: non-empty list or tuple of message IDs to claim
+ idle: optional. Set the idle time (last time it was delivered) of the
+ message in ms
+ time: optional integer. This is the same as idle but instead of a
+ relative amount of milliseconds, it sets the idle time to a specific
+ Unix time (in milliseconds).
+ retrycount: optional integer. set the retry counter to the specified
+ value. This counter is incremented every time a message is delivered
+ again.
+ force: optional boolean, false by default. Creates the pending message
+ entry in the PEL even if certain specified IDs are not already in the
+ PEL assigned to a different client.
+ justid: optional boolean, false by default. Return just an array of IDs
+ of messages successfully claimed, without returning the actual message
+ """
+ if not isinstance(min_idle_time, int) or min_idle_time < 0:
+ raise RedisError("XCLAIM min_idle_time must be a non negative "
+ "integer")
+ if not isinstance(message_ids, (list, tuple)) or not message_ids:
+ raise RedisError("XCLAIM message_ids must be a non empty list or "
+ "tuple of message IDs to claim")
+
+ pieces = [name, groupname, consumername, str(min_idle_time)]
+ pieces.extend(list(message_ids))
+
+ optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'}
+ for param, param_name in optional_ints.items():
+ if param is not None:
+ if not isinstance(param, int):
+ raise RedisError("XCLAIM {} must be an integer"
+ .format(param_name))
+ pieces.append(str(param))
+
+ optional_bools = {force: 'force', justid: 'justid'}
+ for param, param_name in optional_bools.items():
+ if param:
+ if not isinstance(param, bool):
+ raise RedisError("XCLAIM {} must be a boolean"
+ .format(param_name))
+ pieces.append(param_name.upper())
+ return self.execute_command('XCLAIM', *pieces)
+
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
"""