summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Farrell <nick.farrell@genesiscare.com.au>2017-11-04 12:57:27 +1100
committerRoey Prat <roey.prat@redislabs.com>2018-10-28 12:12:53 +0200
commit832c296f73bcdcf51b420bb3683ff62d1b43767b (patch)
treed1588505e6fa19a6446c5dbdb747694968233bcb
parent59f279cab86ea8f27b99d170556cd761163783f2 (diff)
downloadredis-py-832c296f73bcdcf51b420bb3683ff62d1b43767b.tar.gz
Added support for Streams
This includes: XADD, XREAD, XRANGE, XREVRANGE, XLEN. See http://antirez.com/news/114 for more information. Consumer groups is not yet supported, as its details are still being finalised upstream.
-rwxr-xr-xredis/client.py134
-rw-r--r--tests/test_commands.py62
2 files changed, 195 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py
index 79e94d0..95dc63f 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -232,6 +232,33 @@ def int_or_none(response):
return int(response)
+def stream_key(response):
+ return response
+
+
+def stream_list(response):
+ if response is None:
+ return None
+ result = []
+ for r in response:
+ kv_pairs = r[1]
+ kv_dict = dict()
+ while len(kv_pairs) > 1:
+ kv_dict[kv_pairs.pop()] = kv_pairs.pop()
+ result.append((r[0], kv_dict))
+
+ return result
+
+
+def multi_stream_list(response):
+ if response is None:
+ return None
+ result = dict()
+ for r in response:
+ result[r[0].decode('utf-8')] = stream_list(r[1])
+ return result
+
+
def float_or_none(response):
if response is None:
return None
@@ -366,9 +393,12 @@ class StrictRedis(object):
'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE '
'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD '
'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE '
- 'GEOADD',
+ 'GEOADD XLEN',
int
),
+ string_keys_to_dict('XADD', stream_key),
+ string_keys_to_dict('XREVRANGE XRANGE', stream_list),
+ string_keys_to_dict('XREAD', multi_stream_list),
string_keys_to_dict(
'INCRBYFLOAT HINCRBYFLOAT GEODIST',
float
@@ -1675,6 +1705,108 @@ class StrictRedis(object):
args = list_or_args(keys, args)
return self.execute_command('SUNIONSTORE', dest, *args)
+ # STREAMS COMMANDS
+ def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs):
+ """
+ Add to a stream.
+ _name: name of the stream (not using 'name' as this would
+ prevent 'name' used in the kwargs
+ id: Location to insert this record. By default it is appended.
+ maxlen: truncate old stream members beyond this size
+ approximate: actual stream length may be slightly more than maxlen
+ **kwargs: key/value pairs to insert into the stream
+
+ """
+ pieces = []
+ if maxlen is not None:
+ if not isinstance(maxlen, int) or maxlen < 1:
+ raise RedisError("XADD maxlen must be a positive integer")
+ pieces.append("MAXLEN")
+ if approximate:
+ pieces.append("~")
+ pieces.append(str(maxlen))
+ pieces.append(id)
+ for pair in iteritems(kwargs):
+ pieces.append(pair[0])
+ pieces.append(pair[1])
+ return self.execute_command('XADD', _name, *pieces)
+
+ def xrange(self, name, start='-', finish='+', count=None):
+ """
+ Read stream values within an interval.
+ name: name of the stream.
+ start: first stream ID. defaults to '-',
+ meaning the earliest available.
+ finish: last stream ID. defaults to '+',
+ meaning the latest available.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ """
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, int) or count < 1:
+ raise RedisError("XRANGE count must be a positive integer")
+ pieces.append("COUNT")
+ pieces.append(str(count))
+
+ return self.execute_command('XRANGE', name, *pieces)
+
+ def xrevrange(self, name, start='+', finish='-', count=None):
+ """
+ Read stream values within an interval, in reverse order.
+ name: name of the stream
+ start: first stream ID. defaults to '+',
+ meaning the latest available.
+ finish: last stream ID. defaults to '-',
+ meaning the earliest available.
+ count: if set, only return this many items, beginning with the
+ latest available.
+ """
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, int) or count < 1:
+ raise RedisError("XREVRANGE count must be a positive integer")
+ pieces.append("COUNT")
+ pieces.append(str(count))
+
+ return self.execute_command('XREVRANGE', name, *pieces)
+
+ def xlen(self, name):
+ """
+ Returns the number of elements in a given stream.
+ """
+ return self.execute_command('XLEN', name)
+
+ def xread(self, count=None, block=None, **streams):
+ """
+ Block and monitor multiple streams for new data.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ block: number of milliseconds to wait, if nothing already present.
+ **streams: a mapping of stream names to stream IDs, where
+ IDs indicate the last ID already seen.
+ """
+ pieces = []
+ if block is not None:
+ if not isinstance(block, int) or block < 1:
+ raise RedisError("XREAD block must be a positive integer")
+ pieces.append("BLOCK")
+ pieces.append(str(block))
+ if count is not None:
+ if not isinstance(count, int) or count < 1:
+ raise RedisError("XREAD count must be a positive integer")
+ pieces.append("COUNT")
+ pieces.append(str(count))
+
+ pieces.append("STREAMS")
+ ids = []
+ for partial_stream in iteritems(streams):
+ pieces.append(partial_stream[0])
+ ids.append(partial_stream[1])
+
+ pieces.extend(ids)
+ return self.execute_command('XREAD', *pieces)
+
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
"""
diff --git a/tests/test_commands.py b/tests/test_commands.py
index b9b9b66..17b34c7 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -1603,6 +1603,68 @@ class TestRedisCommands(object):
class TestStrictCommands(object):
+ @skip_if_server_version_lt('6.0.0')
+ def test_strict_xrange(self, sr):
+ varname = 'xrange_test'
+ sr.delete(varname)
+ assert sr.xlen(varname) == 0
+ stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4)
+ assert sr.xlen(varname) == 1
+ stamp2 = sr.xadd(varname, name="baz", other="zab")
+ assert sr.xlen(varname) == 2
+ assert stamp1 != stamp2
+
+ milli, offset = stamp2.decode('utf-8').split('-')
+ new_id = ("%s-0" % (milli + 10000)).encode('utf-8')
+ stamp3 = sr.xadd(varname, id=new_id, foo="bar")
+ assert sr.xlen(varname) == 3
+ assert stamp3 == new_id
+ stamp4 = sr.xadd(varname, foo="baz")
+ assert sr.xlen(varname) == 4
+
+ def get_ids(results):
+ return [result[0] for result in results]
+
+ results = sr.xrange(varname, start=stamp1)
+ assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4]
+
+ results = sr.xrange(varname, start=stamp2, finish=stamp3)
+ assert get_ids(results) == [stamp2, stamp3]
+
+ results = sr.xrange(varname, finish=stamp3)
+ assert get_ids(results) == [stamp1, stamp2, stamp3]
+
+ results = sr.xrange(varname, finish=stamp2, count=1)
+ assert get_ids(results) == [stamp1]
+
+ results = sr.xrevrange(varname, start=stamp1)
+ assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1]
+
+ results = sr.xrevrange(varname, start=stamp3, finish=stamp2)
+ assert get_ids(results) == [stamp3, stamp2]
+
+ results = sr.xrevrange(varname, finish=stamp3)
+ assert get_ids(results) == [stamp4, stamp3]
+
+ results = sr.xrevrange(varname, finish=stamp2, count=1)
+ assert get_ids(results) == [stamp4]
+
+ assert sr.xlen(varname) == 4
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xread(self, sr):
+ varname = 'xread_test'
+ sr.delete(varname)
+ stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4)
+ stamp2 = sr.xadd(varname, name="baz", other="zab")
+ assert stamp1 != stamp2
+
+ results = sr.xread(varname='$', count=10, block=10)
+ assert results is None
+
+ results = sr.xread(count=3, block=1000, **{varname: stamp1})
+ assert results[varname][0][0] == stamp2
+
def test_strict_zadd(self, sr):
sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0)
assert sr.zrange('a', 0, -1, withscores=True) == \