diff options
author | Nick Farrell <nick.farrell@genesiscare.com.au> | 2017-11-04 12:57:27 +1100 |
---|---|---|
committer | Roey Prat <roey.prat@redislabs.com> | 2018-10-28 12:12:53 +0200 |
commit | 832c296f73bcdcf51b420bb3683ff62d1b43767b (patch) | |
tree | d1588505e6fa19a6446c5dbdb747694968233bcb /redis/client.py | |
parent | 59f279cab86ea8f27b99d170556cd761163783f2 (diff) | |
download | redis-py-832c296f73bcdcf51b420bb3683ff62d1b43767b.tar.gz |
Added support for Streams
This includes: XADD, XREAD, XRANGE, XREVRANGE, XLEN.
See http://antirez.com/news/114 for more information.
Consumer groups is not yet supported, as its details are still
being finalised upstream.
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 134 |
1 files changed, 133 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py index 79e94d0..95dc63f 100755 --- a/redis/client.py +++ b/redis/client.py @@ -232,6 +232,33 @@ def int_or_none(response): return int(response) +def stream_key(response): + return response + + +def stream_list(response): + if response is None: + return None + result = [] + for r in response: + kv_pairs = r[1] + kv_dict = dict() + while len(kv_pairs) > 1: + kv_dict[kv_pairs.pop()] = kv_pairs.pop() + result.append((r[0], kv_dict)) + + return result + + +def multi_stream_list(response): + if response is None: + return None + result = dict() + for r in response: + result[r[0].decode('utf-8')] = stream_list(r[1]) + return result + + def float_or_none(response): if response is None: return None @@ -366,9 +393,12 @@ class StrictRedis(object): 'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE ' 'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD ' 'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ' - 'GEOADD', + 'GEOADD XLEN', int ), + string_keys_to_dict('XADD', stream_key), + string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XREAD', multi_stream_list), string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1675,6 +1705,108 @@ class StrictRedis(object): args = list_or_args(keys, args) return self.execute_command('SUNIONSTORE', dest, *args) + # STREAMS COMMANDS + def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs): + """ + Add to a stream. + _name: name of the stream (not using 'name' as this would + prevent 'name' used in the kwargs + id: Location to insert this record. By default it is appended. + maxlen: truncate old stream members beyond this size + approximate: actual stream length may be slightly more than maxlen + **kwargs: key/value pairs to insert into the stream + + """ + pieces = [] + if maxlen is not None: + if not isinstance(maxlen, int) or maxlen < 1: + raise RedisError("XADD maxlen must be a positive integer") + pieces.append("MAXLEN") + if approximate: + pieces.append("~") + pieces.append(str(maxlen)) + pieces.append(id) + for pair in iteritems(kwargs): + pieces.append(pair[0]) + pieces.append(pair[1]) + return self.execute_command('XADD', _name, *pieces) + + def xrange(self, name, start='-', finish='+', count=None): + """ + Read stream values within an interval. + name: name of the stream. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XRANGE count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + return self.execute_command('XRANGE', name, *pieces) + + def xrevrange(self, name, start='+', finish='-', count=None): + """ + Read stream values within an interval, in reverse order. + name: name of the stream + start: first stream ID. defaults to '+', + meaning the latest available. + finish: last stream ID. defaults to '-', + meaning the earliest available. + count: if set, only return this many items, beginning with the + latest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XREVRANGE count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + return self.execute_command('XREVRANGE', name, *pieces) + + def xlen(self, name): + """ + Returns the number of elements in a given stream. + """ + return self.execute_command('XLEN', name) + + def xread(self, count=None, block=None, **streams): + """ + Block and monitor multiple streams for new data. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + **streams: a mapping of stream names to stream IDs, where + IDs indicate the last ID already seen. + """ + pieces = [] + if block is not None: + if not isinstance(block, int) or block < 1: + raise RedisError("XREAD block must be a positive integer") + pieces.append("BLOCK") + pieces.append(str(block)) + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XREAD count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + pieces.append("STREAMS") + ids = [] + for partial_stream in iteritems(streams): + pieces.append(partial_stream[0]) + ids.append(partial_stream[1]) + + pieces.extend(ids) + return self.execute_command('XREAD', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ |