summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorNick Farrell <nick.farrell@genesiscare.com.au>2017-11-04 12:57:27 +1100
committerRoey Prat <roey.prat@redislabs.com>2018-10-28 12:12:53 +0200
commit832c296f73bcdcf51b420bb3683ff62d1b43767b (patch)
treed1588505e6fa19a6446c5dbdb747694968233bcb /redis/client.py
parent59f279cab86ea8f27b99d170556cd761163783f2 (diff)
downloadredis-py-832c296f73bcdcf51b420bb3683ff62d1b43767b.tar.gz
Added support for Streams
This includes: XADD, XREAD, XRANGE, XREVRANGE, XLEN. See http://antirez.com/news/114 for more information. Consumer groups is not yet supported, as its details are still being finalised upstream.
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py134
1 files changed, 133 insertions, 1 deletions
diff --git a/redis/client.py b/redis/client.py
index 79e94d0..95dc63f 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -232,6 +232,33 @@ def int_or_none(response):
return int(response)
+def stream_key(response):
+ return response
+
+
+def stream_list(response):
+ if response is None:
+ return None
+ result = []
+ for r in response:
+ kv_pairs = r[1]
+ kv_dict = dict()
+ while len(kv_pairs) > 1:
+ kv_dict[kv_pairs.pop()] = kv_pairs.pop()
+ result.append((r[0], kv_dict))
+
+ return result
+
+
+def multi_stream_list(response):
+ if response is None:
+ return None
+ result = dict()
+ for r in response:
+ result[r[0].decode('utf-8')] = stream_list(r[1])
+ return result
+
+
def float_or_none(response):
if response is None:
return None
@@ -366,9 +393,12 @@ class StrictRedis(object):
'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE '
'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD '
'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE '
- 'GEOADD',
+ 'GEOADD XLEN',
int
),
+ string_keys_to_dict('XADD', stream_key),
+ string_keys_to_dict('XREVRANGE XRANGE', stream_list),
+ string_keys_to_dict('XREAD', multi_stream_list),
string_keys_to_dict(
'INCRBYFLOAT HINCRBYFLOAT GEODIST',
float
@@ -1675,6 +1705,108 @@ class StrictRedis(object):
args = list_or_args(keys, args)
return self.execute_command('SUNIONSTORE', dest, *args)
+ # STREAMS COMMANDS
+ def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs):
+ """
+ Add to a stream.
+ _name: name of the stream (not using 'name' as this would
+ prevent 'name' used in the kwargs
+ id: Location to insert this record. By default it is appended.
+ maxlen: truncate old stream members beyond this size
+ approximate: actual stream length may be slightly more than maxlen
+ **kwargs: key/value pairs to insert into the stream
+
+ """
+ pieces = []
+ if maxlen is not None:
+ if not isinstance(maxlen, int) or maxlen < 1:
+ raise RedisError("XADD maxlen must be a positive integer")
+ pieces.append("MAXLEN")
+ if approximate:
+ pieces.append("~")
+ pieces.append(str(maxlen))
+ pieces.append(id)
+ for pair in iteritems(kwargs):
+ pieces.append(pair[0])
+ pieces.append(pair[1])
+ return self.execute_command('XADD', _name, *pieces)
+
+ def xrange(self, name, start='-', finish='+', count=None):
+ """
+ Read stream values within an interval.
+ name: name of the stream.
+ start: first stream ID. defaults to '-',
+ meaning the earliest available.
+ finish: last stream ID. defaults to '+',
+ meaning the latest available.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ """
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, int) or count < 1:
+ raise RedisError("XRANGE count must be a positive integer")
+ pieces.append("COUNT")
+ pieces.append(str(count))
+
+ return self.execute_command('XRANGE', name, *pieces)
+
+ def xrevrange(self, name, start='+', finish='-', count=None):
+ """
+ Read stream values within an interval, in reverse order.
+ name: name of the stream
+ start: first stream ID. defaults to '+',
+ meaning the latest available.
+ finish: last stream ID. defaults to '-',
+ meaning the earliest available.
+ count: if set, only return this many items, beginning with the
+ latest available.
+ """
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, int) or count < 1:
+ raise RedisError("XREVRANGE count must be a positive integer")
+ pieces.append("COUNT")
+ pieces.append(str(count))
+
+ return self.execute_command('XREVRANGE', name, *pieces)
+
+ def xlen(self, name):
+ """
+ Returns the number of elements in a given stream.
+ """
+ return self.execute_command('XLEN', name)
+
+ def xread(self, count=None, block=None, **streams):
+ """
+ Block and monitor multiple streams for new data.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ block: number of milliseconds to wait, if nothing already present.
+ **streams: a mapping of stream names to stream IDs, where
+ IDs indicate the last ID already seen.
+ """
+ pieces = []
+ if block is not None:
+ if not isinstance(block, int) or block < 1:
+ raise RedisError("XREAD block must be a positive integer")
+ pieces.append("BLOCK")
+ pieces.append(str(block))
+ if count is not None:
+ if not isinstance(count, int) or count < 1:
+ raise RedisError("XREAD count must be a positive integer")
+ pieces.append("COUNT")
+ pieces.append(str(count))
+
+ pieces.append("STREAMS")
+ ids = []
+ for partial_stream in iteritems(streams):
+ pieces.append(partial_stream[0])
+ ids.append(partial_stream[1])
+
+ pieces.extend(ids)
+ return self.execute_command('XREAD', *pieces)
+
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
"""