diff options
author | Nick Farrell <nick.farrell@genesiscare.com.au> | 2017-11-04 12:57:27 +1100 |
---|---|---|
committer | Roey Prat <roey.prat@redislabs.com> | 2018-10-28 12:12:53 +0200 |
commit | 832c296f73bcdcf51b420bb3683ff62d1b43767b (patch) | |
tree | d1588505e6fa19a6446c5dbdb747694968233bcb | |
parent | 59f279cab86ea8f27b99d170556cd761163783f2 (diff) | |
download | redis-py-832c296f73bcdcf51b420bb3683ff62d1b43767b.tar.gz |
Added support for Streams
This includes: XADD, XREAD, XRANGE, XREVRANGE, XLEN.
See http://antirez.com/news/114 for more information.
Consumer groups is not yet supported, as its details are still
being finalised upstream.
-rwxr-xr-x | redis/client.py | 134 | ||||
-rw-r--r-- | tests/test_commands.py | 62 |
2 files changed, 195 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py index 79e94d0..95dc63f 100755 --- a/redis/client.py +++ b/redis/client.py @@ -232,6 +232,33 @@ def int_or_none(response): return int(response) +def stream_key(response): + return response + + +def stream_list(response): + if response is None: + return None + result = [] + for r in response: + kv_pairs = r[1] + kv_dict = dict() + while len(kv_pairs) > 1: + kv_dict[kv_pairs.pop()] = kv_pairs.pop() + result.append((r[0], kv_dict)) + + return result + + +def multi_stream_list(response): + if response is None: + return None + result = dict() + for r in response: + result[r[0].decode('utf-8')] = stream_list(r[1]) + return result + + def float_or_none(response): if response is None: return None @@ -366,9 +393,12 @@ class StrictRedis(object): 'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE ' 'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD ' 'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ' - 'GEOADD', + 'GEOADD XLEN', int ), + string_keys_to_dict('XADD', stream_key), + string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XREAD', multi_stream_list), string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1675,6 +1705,108 @@ class StrictRedis(object): args = list_or_args(keys, args) return self.execute_command('SUNIONSTORE', dest, *args) + # STREAMS COMMANDS + def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs): + """ + Add to a stream. + _name: name of the stream (not using 'name' as this would + prevent 'name' used in the kwargs + id: Location to insert this record. By default it is appended. + maxlen: truncate old stream members beyond this size + approximate: actual stream length may be slightly more than maxlen + **kwargs: key/value pairs to insert into the stream + + """ + pieces = [] + if maxlen is not None: + if not isinstance(maxlen, int) or maxlen < 1: + raise RedisError("XADD maxlen must be a positive integer") + pieces.append("MAXLEN") + if approximate: + pieces.append("~") + pieces.append(str(maxlen)) + pieces.append(id) + for pair in iteritems(kwargs): + pieces.append(pair[0]) + pieces.append(pair[1]) + return self.execute_command('XADD', _name, *pieces) + + def xrange(self, name, start='-', finish='+', count=None): + """ + Read stream values within an interval. + name: name of the stream. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XRANGE count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + return self.execute_command('XRANGE', name, *pieces) + + def xrevrange(self, name, start='+', finish='-', count=None): + """ + Read stream values within an interval, in reverse order. + name: name of the stream + start: first stream ID. defaults to '+', + meaning the latest available. + finish: last stream ID. defaults to '-', + meaning the earliest available. + count: if set, only return this many items, beginning with the + latest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XREVRANGE count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + return self.execute_command('XREVRANGE', name, *pieces) + + def xlen(self, name): + """ + Returns the number of elements in a given stream. + """ + return self.execute_command('XLEN', name) + + def xread(self, count=None, block=None, **streams): + """ + Block and monitor multiple streams for new data. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + **streams: a mapping of stream names to stream IDs, where + IDs indicate the last ID already seen. + """ + pieces = [] + if block is not None: + if not isinstance(block, int) or block < 1: + raise RedisError("XREAD block must be a positive integer") + pieces.append("BLOCK") + pieces.append(str(block)) + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XREAD count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + pieces.append("STREAMS") + ids = [] + for partial_stream in iteritems(streams): + pieces.append(partial_stream[0]) + ids.append(partial_stream[1]) + + pieces.extend(ids) + return self.execute_command('XREAD', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index b9b9b66..17b34c7 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1603,6 +1603,68 @@ class TestRedisCommands(object): class TestStrictCommands(object): + @skip_if_server_version_lt('6.0.0') + def test_strict_xrange(self, sr): + varname = 'xrange_test' + sr.delete(varname) + assert sr.xlen(varname) == 0 + stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4) + assert sr.xlen(varname) == 1 + stamp2 = sr.xadd(varname, name="baz", other="zab") + assert sr.xlen(varname) == 2 + assert stamp1 != stamp2 + + milli, offset = stamp2.decode('utf-8').split('-') + new_id = ("%s-0" % (milli + 10000)).encode('utf-8') + stamp3 = sr.xadd(varname, id=new_id, foo="bar") + assert sr.xlen(varname) == 3 + assert stamp3 == new_id + stamp4 = sr.xadd(varname, foo="baz") + assert sr.xlen(varname) == 4 + + def get_ids(results): + return [result[0] for result in results] + + results = sr.xrange(varname, start=stamp1) + assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4] + + results = sr.xrange(varname, start=stamp2, finish=stamp3) + assert get_ids(results) == [stamp2, stamp3] + + results = sr.xrange(varname, finish=stamp3) + assert get_ids(results) == [stamp1, stamp2, stamp3] + + results = sr.xrange(varname, finish=stamp2, count=1) + assert get_ids(results) == [stamp1] + + results = sr.xrevrange(varname, start=stamp1) + assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1] + + results = sr.xrevrange(varname, start=stamp3, finish=stamp2) + assert get_ids(results) == [stamp3, stamp2] + + results = sr.xrevrange(varname, finish=stamp3) + assert get_ids(results) == [stamp4, stamp3] + + results = sr.xrevrange(varname, finish=stamp2, count=1) + assert get_ids(results) == [stamp4] + + assert sr.xlen(varname) == 4 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xread(self, sr): + varname = 'xread_test' + sr.delete(varname) + stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4) + stamp2 = sr.xadd(varname, name="baz", other="zab") + assert stamp1 != stamp2 + + results = sr.xread(varname='$', count=10, block=10) + assert results is None + + results = sr.xread(count=3, block=1000, **{varname: stamp1}) + assert results[varname][0][0] == stamp2 + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ |