summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2018-10-31 08:24:52 -0700
committerAndy McCurdy <andy@andymccurdy.com>2018-10-31 08:24:52 -0700
commit0bb9ab9f57f708b5c32c8e70bdbf19f92fc3e49f (patch)
treee667adf42a54291a34347a2c5a652c94e5a05ffc /redis/client.py
parent7295ee0f29253ed19be01a69e6d1efc225242a1e (diff)
downloadredis-py-0bb9ab9f57f708b5c32c8e70bdbf19f92fc3e49f.tar.gz
alphabetize stream functions
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py360
1 files changed, 180 insertions, 180 deletions
diff --git a/redis/client.py b/redis/client.py
index 291a428..c5e769d 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -1783,78 +1783,80 @@ class StrictRedis(object):
pieces.extend(pair)
return self.execute_command('XADD', name, *pieces)
- def xrange(self, name, start='-', finish='+', count=None):
+ def xack(self, name, groupname, *ids):
"""
- Read stream values within an interval.
+ Acknowledges the successful processing of one or more messages.
name: name of the stream.
- start: first stream ID. defaults to '-',
- meaning the earliest available.
- finish: last stream ID. defaults to '+',
- meaning the latest available.
- count: if set, only return this many items, beginning with the
- earliest available.
+ groupname: name of the consumer group.
+ *ids: message ids to acknowlege.
"""
- pieces = [start, finish]
- if count is not None:
- if not isinstance(count, (int, long)) or count < 1:
- raise RedisError('XRANGE count must be a positive integer')
- pieces.append(Token.get_token('COUNT'))
- pieces.append(str(count))
-
- return self.execute_command('XRANGE', name, *pieces)
+ return self.execute_command('XACK', name, groupname, *ids)
- def xrevrange(self, name, start='+', finish='-', count=None):
+ def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
+ idle=None, time=None, retrycount=None, force=False,
+ justid=False):
"""
- Read stream values within an interval, in reverse order.
- name: name of the stream
- start: first stream ID. defaults to '+',
- meaning the latest available.
- finish: last stream ID. defaults to '-',
- meaning the earliest available.
- count: if set, only return this many items, beginning with the
- latest available.
+ Changes the ownership of a pending message.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of a consumer that claims the message.
+ min_idle_time: filter messages that were idle less than this amount of
+ milliseconds
+ message_ids: non-empty list or tuple of message IDs to claim
+ idle: optional. Set the idle time (last time it was delivered) of the
+ message in ms
+ time: optional integer. This is the same as idle but instead of a
+ relative amount of milliseconds, it sets the idle time to a specific
+ Unix time (in milliseconds).
+ retrycount: optional integer. set the retry counter to the specified
+ value. This counter is incremented every time a message is delivered
+ again.
+ force: optional boolean, false by default. Creates the pending message
+ entry in the PEL even if certain specified IDs are not already in the
+ PEL assigned to a different client.
+ justid: optional boolean, false by default. Return just an array of IDs
+ of messages successfully claimed, without returning the actual message
"""
- pieces = [start, finish]
- if count is not None:
- if not isinstance(count, (int, long)) or count < 1:
- raise RedisError('XREVRANGE count must be a positive integer')
- pieces.append(Token.get_token('COUNT'))
- pieces.append(str(count))
+ if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0:
+ raise RedisError("XCLAIM min_idle_time must be a non negative "
+ "integer")
+ if not isinstance(message_ids, (list, tuple)) or not message_ids:
+ raise RedisError("XCLAIM message_ids must be a non empty list or "
+ "tuple of message IDs to claim")
- return self.execute_command('XREVRANGE', name, *pieces)
+ pieces = [name, groupname, consumername, str(min_idle_time)]
+ pieces.extend(list(message_ids))
- def xlen(self, name):
- """
- Returns the number of elements in a given stream.
- """
- return self.execute_command('XLEN', name)
+ if idle is not None:
+ if not isinstance(idle, (int, long)):
+ raise RedisError("XCLAIM idle must be an integer")
+ pieces.extend((Token.get_token('IDLE'), str(idle)))
+ if time is not None:
+ if not isinstance(time, (int, long)):
+ raise RedisError("XCLAIM time must be an integer")
+ pieces.extend((Token.get_token('TIME'), str(time)))
+ if retrycount is not None:
+ if not isinstance(retrycount, (int, long)):
+ raise RedisError("XCLAIM retrycount must be an integer")
+ pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount)))
- def xread(self, streams, count=None, block=None):
+ if force:
+ if not isinstance(force, bool):
+ raise RedisError("XCLAIM force must be a boolean")
+ pieces.append(Token.get_token('FORCE'))
+ if justid:
+ if not isinstance(justid, bool):
+ raise RedisError("XCLAIM justid must be a boolean")
+ pieces.append(Token.get_token('JUSTID'))
+ return self.execute_command('XCLAIM', *pieces)
+
+ def xdel(self, name, *ids):
"""
- Block and monitor multiple streams for new data.
- streams: a dict of stream names to stream IDs, where
- IDs indicate the last ID already seen.
- count: if set, only return this many items, beginning with the
- earliest available.
- block: number of milliseconds to wait, if nothing already present.
+ Deletes one or more messages from a stream.
+ name: name of the stream.
+ *ids: message ids to delete.
"""
- pieces = []
- if block is not None:
- if not isinstance(block, (int, long)) or block < 0:
- raise RedisError('XREAD block must be a non-negative integer')
- pieces.append(Token.get_token('BLOCK'))
- pieces.append(str(block))
- if count is not None:
- if not isinstance(count, (int, long)) or count < 1:
- raise RedisError('XREAD count must be a positive integer')
- pieces.append(Token.get_token('COUNT'))
- pieces.append(str(count))
- if not isinstance(streams, dict) or len(streams) == 0:
- raise RedisError('XREAD streams must be a non empty dict')
- pieces.append(Token.get_token('STREAMS'))
- pieces.extend(streams.keys())
- pieces.extend(streams.values())
- return self.execute_command('XREAD', *pieces)
+ return self.execute_command('XDEL', name, *ids)
def xgroup_create(self, name, groupname, id):
"""
@@ -1865,6 +1867,18 @@ class StrictRedis(object):
"""
return self.execute_command('XGROUP CREATE', name, groupname, id)
+ def xgroup_delconsumer(self, name, groupname, consumername):
+ """
+ Remove a specific consumer from a consumer group.
+ Returns the number of pending messages that the consumer had before it
+ was deleted.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of consumer to delete
+ """
+ return self.execute_command('XGROUP DELCONSUMER', name, groupname,
+ consumername)
+
def xgroup_destroy(self, name, groupname):
"""
Destroy a consumer group.
@@ -1882,17 +1896,20 @@ class StrictRedis(object):
"""
return self.execute_command('XGROUP SETID', name, groupname, id)
- def xgroup_delconsumer(self, name, groupname, consumername):
+ def xinfo_consumers(self, name, groupname):
"""
- Remove a specific consumer from a consumer group.
- Returns the number of pending messages that the consumer had before it
- was deleted.
+ Returns general information about the consumers in the group.
name: name of the stream.
groupname: name of the consumer group.
- consumername: name of consumer to delete
"""
- return self.execute_command('XGROUP DELCONSUMER', name, groupname,
- consumername)
+ return self.execute_command('XINFO CONSUMERS', name, groupname)
+
+ def xinfo_groups(self, name):
+ """
+ Returns general information about the consumer groups of the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO GROUPS', name)
def xinfo_stream(self, name):
"""
@@ -1901,37 +1918,116 @@ class StrictRedis(object):
"""
return self.execute_command('XINFO STREAM', name)
- def xinfo_consumers(self, name, groupname):
+ def xlen(self, name):
"""
- Returns general information about the consumers in the group.
- name: name of the stream.
- groupname: name of the consumer group.
+ Returns the number of elements in a given stream.
"""
- return self.execute_command('XINFO CONSUMERS', name, groupname)
+ return self.execute_command('XLEN', name)
- def xinfo_groups(self, name):
+ def xpending(self, name, groupname):
"""
- Returns general information about the consumer groups of the stream.
+ Returns information about pending messages of a group.
name: name of the stream.
+ groupname: name of the consumer group.
"""
- return self.execute_command('XINFO GROUPS', name)
+ return self.execute_command('XPENDING', name, groupname)
- def xack(self, name, groupname, *ids):
+ def xpending_range(self, name, groupname, start='-', end='+', count=-1,
+ consumername=None):
"""
- Acknowledges the successful processing of one or more messages.
+ Returns information about pending messages, in a range.
name: name of the stream.
groupname: name of the consumer group.
- *ids: message ids to acknowlege.
+ start: first stream ID. defaults to '-',
+ meaning the earliest available.
+ finish: last stream ID. defaults to '+',
+ meaning the latest available.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ consumername: name of a consumer to filter by (optional).
"""
- return self.execute_command('XACK', name, groupname, *ids)
+ pieces = [name, groupname]
+ if start is not None or end is not None or count is not None:
+ if start is None or end is None or count is None:
+ raise RedisError("XPENDING must be provided with start, end "
+ "and count parameters, or none of them. ")
+ if not isinstance(count, (int, long)) or count < -1:
+ raise RedisError("XPENDING count must be a integer >= -1")
+ pieces.extend((start, end, str(count)))
+ if consumername is not None:
+ if start is None or end is None or count is None:
+ raise RedisError("if XPENDING is provided with consumername,"
+ " it must be provided with start, end and"
+ " count parameters")
+ pieces.append(consumername)
+ return self.execute_command('XPENDING', *pieces, parse_detail=True)
- def xdel(self, name, *ids):
+ def xrange(self, name, start='-', finish='+', count=None):
"""
- Deletes one or more messages from a stream.
+ Read stream values within an interval.
name: name of the stream.
- *ids: message ids to delete.
+ start: first stream ID. defaults to '-',
+ meaning the earliest available.
+ finish: last stream ID. defaults to '+',
+ meaning the latest available.
+ count: if set, only return this many items, beginning with the
+ earliest available.
"""
- return self.execute_command('XDEL', name, *ids)
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, (int, long)) or count < 1:
+ raise RedisError('XRANGE count must be a positive integer')
+ pieces.append(Token.get_token('COUNT'))
+ pieces.append(str(count))
+
+ return self.execute_command('XRANGE', name, *pieces)
+
+ def xread(self, streams, count=None, block=None):
+ """
+ Block and monitor multiple streams for new data.
+ streams: a dict of stream names to stream IDs, where
+ IDs indicate the last ID already seen.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ block: number of milliseconds to wait, if nothing already present.
+ """
+ pieces = []
+ if block is not None:
+ if not isinstance(block, (int, long)) or block < 0:
+ raise RedisError('XREAD block must be a non-negative integer')
+ pieces.append(Token.get_token('BLOCK'))
+ pieces.append(str(block))
+ if count is not None:
+ if not isinstance(count, (int, long)) or count < 1:
+ raise RedisError('XREAD count must be a positive integer')
+ pieces.append(Token.get_token('COUNT'))
+ pieces.append(str(count))
+ if not isinstance(streams, dict) or len(streams) == 0:
+ raise RedisError('XREAD streams must be a non empty dict')
+ pieces.append(Token.get_token('STREAMS'))
+ pieces.extend(streams.keys())
+ pieces.extend(streams.values())
+ return self.execute_command('XREAD', *pieces)
+
+ def xrevrange(self, name, start='+', finish='-', count=None):
+ """
+ Read stream values within an interval, in reverse order.
+ name: name of the stream
+ start: first stream ID. defaults to '+',
+ meaning the latest available.
+ finish: last stream ID. defaults to '-',
+ meaning the earliest available.
+ count: if set, only return this many items, beginning with the
+ latest available.
+ """
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, (int, long)) or count < 1:
+ raise RedisError('XREVRANGE count must be a positive integer')
+ pieces.append(Token.get_token('COUNT'))
+ pieces.append(str(count))
+
+ return self.execute_command('XREVRANGE', name, *pieces)
def xtrim(self, name, maxlen, approximate=True):
"""
@@ -1977,102 +2073,6 @@ class StrictRedis(object):
pieces.extend(streams.values())
return self.execute_command('XREADGROUP', *pieces)
- def xpending(self, name, groupname):
- """
- Returns information about pending messages of a group.
- name: name of the stream.
- groupname: name of the consumer group.
- """
- return self.execute_command('XPENDING', name, groupname)
-
- def xpending_range(self, name, groupname, start='-', end='+', count=-1,
- consumername=None):
- """
- Returns information about pending messages, in a range.
- name: name of the stream.
- groupname: name of the consumer group.
- start: first stream ID. defaults to '-',
- meaning the earliest available.
- finish: last stream ID. defaults to '+',
- meaning the latest available.
- count: if set, only return this many items, beginning with the
- earliest available.
- consumername: name of a consumer to filter by (optional).
- """
- pieces = [name, groupname]
- if start is not None or end is not None or count is not None:
- if start is None or end is None or count is None:
- raise RedisError("XPENDING must be provided with start, end "
- "and count parameters, or none of them. ")
- if not isinstance(count, (int, long)) or count < -1:
- raise RedisError("XPENDING count must be a integer >= -1")
- pieces.extend((start, end, str(count)))
- if consumername is not None:
- if start is None or end is None or count is None:
- raise RedisError("if XPENDING is provided with consumername,"
- " it must be provided with start, end and"
- " count parameters")
- pieces.append(consumername)
- return self.execute_command('XPENDING', *pieces, parse_detail=True)
-
- def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
- idle=None, time=None, retrycount=None, force=False,
- justid=False):
- """
- Changes the ownership of a pending message.
- name: name of the stream.
- groupname: name of the consumer group.
- consumername: name of a consumer that claims the message.
- min_idle_time: filter messages that were idle less than this amount of
- milliseconds
- message_ids: non-empty list or tuple of message IDs to claim
- idle: optional. Set the idle time (last time it was delivered) of the
- message in ms
- time: optional integer. This is the same as idle but instead of a
- relative amount of milliseconds, it sets the idle time to a specific
- Unix time (in milliseconds).
- retrycount: optional integer. set the retry counter to the specified
- value. This counter is incremented every time a message is delivered
- again.
- force: optional boolean, false by default. Creates the pending message
- entry in the PEL even if certain specified IDs are not already in the
- PEL assigned to a different client.
- justid: optional boolean, false by default. Return just an array of IDs
- of messages successfully claimed, without returning the actual message
- """
- if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0:
- raise RedisError("XCLAIM min_idle_time must be a non negative "
- "integer")
- if not isinstance(message_ids, (list, tuple)) or not message_ids:
- raise RedisError("XCLAIM message_ids must be a non empty list or "
- "tuple of message IDs to claim")
-
- pieces = [name, groupname, consumername, str(min_idle_time)]
- pieces.extend(list(message_ids))
-
- if idle is not None:
- if not isinstance(idle, (int, long)):
- raise RedisError("XCLAIM idle must be an integer")
- pieces.extend((Token.get_token('IDLE'), str(idle)))
- if time is not None:
- if not isinstance(time, (int, long)):
- raise RedisError("XCLAIM time must be an integer")
- pieces.extend((Token.get_token('TIME'), str(time)))
- if retrycount is not None:
- if not isinstance(retrycount, (int, long)):
- raise RedisError("XCLAIM retrycount must be an integer")
- pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount)))
-
- if force:
- if not isinstance(force, bool):
- raise RedisError("XCLAIM force must be a boolean")
- pieces.append(Token.get_token('FORCE'))
- if justid:
- if not isinstance(justid, bool):
- raise RedisError("XCLAIM justid must be a boolean")
- pieces.append(Token.get_token('JUSTID'))
- return self.execute_command('XCLAIM', *pieces)
-
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
"""