From 7f6de586a1ca52230d39debf9e42ddd4416fd2ca Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Tue, 16 Oct 2018 13:24:21 +0300 Subject: Implements XCLAIM --- redis/client.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/redis/client.py b/redis/client.py index 9600bc7..a66470d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -326,7 +326,6 @@ def parse_zscan(response, **options): it = iter(r) return long(cursor), list(izip(it, imap(score_cast_func, it))) - def parse_slowlog_get(response, **options): return [{ 'id': item[0], @@ -2008,17 +2007,17 @@ class StrictRedis(object): if streams is None: streams = {} pieces = ['GROUP', groupname, consumername] - if block is not None: - if not isinstance(block, int) or block < 0: - raise RedisError("XREAD block must be a non-negative integer") - pieces.append("BLOCK") - pieces.append(str(block)) if count is not None: if not isinstance(count, int) or count < 1: - raise RedisError("XREAD count must be a positive integer") + raise RedisError("XREADGROUP count must be a positive integer") pieces.append("COUNT") pieces.append(str(count)) - + if block is not None: + if not isinstance(block, int) or block < 0: + raise RedisError("XREADGROUP block must be a non-negative " + "integer") + pieces.append("BLOCK") + pieces.append(str(block)) pieces.append("STREAMS") ids = [] for partial_stream in iteritems(streams): @@ -2051,6 +2050,58 @@ class StrictRedis(object): pieces.append(consumername) return self.execute_command('XPENDING', *pieces) + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, + idle=None, time=None, retrycount=None, force=False, + justid=False): + """ + Changes the ownership of a pending message. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds + message_ids: non-empty list or tuple of message IDs to claim + idle: optional. Set the idle time (last time it was delivered) of the + message in ms + time: optional integer. This is the same as idle but instead of a + relative amount of milliseconds, it sets the idle time to a specific + Unix time (in milliseconds). + retrycount: optional integer. set the retry counter to the specified + value. This counter is incremented every time a message is delivered + again. + force: optional boolean, false by default. Creates the pending message + entry in the PEL even if certain specified IDs are not already in the + PEL assigned to a different client. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message + """ + if not isinstance(min_idle_time, int) or min_idle_time < 0: + raise RedisError("XCLAIM min_idle_time must be a non negative " + "integer") + if not isinstance(message_ids, (list, tuple)) or not message_ids: + raise RedisError("XCLAIM message_ids must be a non empty list or " + "tuple of message IDs to claim") + + pieces = [name, groupname, consumername, str(min_idle_time)] + pieces.extend(list(message_ids)) + + optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'} + for param, param_name in optional_ints.items(): + if param is not None: + if not isinstance(param, int): + raise RedisError("XCLAIM {} must be an integer" + .format(param_name)) + pieces.append(str(param)) + + optional_bools = {force: 'force', justid: 'justid'} + for param, param_name in optional_bools.items(): + if param: + if not isinstance(param, bool): + raise RedisError("XCLAIM {} must be a boolean" + .format(param_name)) + pieces.append(param_name.upper()) + return self.execute_command('XCLAIM', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ -- cgit v1.2.1