diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 08:24:52 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 08:24:52 -0700 |
commit | 0bb9ab9f57f708b5c32c8e70bdbf19f92fc3e49f (patch) | |
tree | e667adf42a54291a34347a2c5a652c94e5a05ffc /redis/client.py | |
parent | 7295ee0f29253ed19be01a69e6d1efc225242a1e (diff) | |
download | redis-py-0bb9ab9f57f708b5c32c8e70bdbf19f92fc3e49f.tar.gz |
alphabetize stream functions
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 360 |
1 files changed, 180 insertions, 180 deletions
diff --git a/redis/client.py b/redis/client.py index 291a428..c5e769d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1783,78 +1783,80 @@ class StrictRedis(object): pieces.extend(pair) return self.execute_command('XADD', name, *pieces) - def xrange(self, name, start='-', finish='+', count=None): + def xack(self, name, groupname, *ids): """ - Read stream values within an interval. + Acknowledges the successful processing of one or more messages. name: name of the stream. - start: first stream ID. defaults to '-', - meaning the earliest available. - finish: last stream ID. defaults to '+', - meaning the latest available. - count: if set, only return this many items, beginning with the - earliest available. + groupname: name of the consumer group. + *ids: message ids to acknowlege. """ - pieces = [start, finish] - if count is not None: - if not isinstance(count, (int, long)) or count < 1: - raise RedisError('XRANGE count must be a positive integer') - pieces.append(Token.get_token('COUNT')) - pieces.append(str(count)) - - return self.execute_command('XRANGE', name, *pieces) + return self.execute_command('XACK', name, groupname, *ids) - def xrevrange(self, name, start='+', finish='-', count=None): + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, + idle=None, time=None, retrycount=None, force=False, + justid=False): """ - Read stream values within an interval, in reverse order. - name: name of the stream - start: first stream ID. defaults to '+', - meaning the latest available. - finish: last stream ID. defaults to '-', - meaning the earliest available. - count: if set, only return this many items, beginning with the - latest available. + Changes the ownership of a pending message. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds + message_ids: non-empty list or tuple of message IDs to claim + idle: optional. Set the idle time (last time it was delivered) of the + message in ms + time: optional integer. This is the same as idle but instead of a + relative amount of milliseconds, it sets the idle time to a specific + Unix time (in milliseconds). + retrycount: optional integer. set the retry counter to the specified + value. This counter is incremented every time a message is delivered + again. + force: optional boolean, false by default. Creates the pending message + entry in the PEL even if certain specified IDs are not already in the + PEL assigned to a different client. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message """ - pieces = [start, finish] - if count is not None: - if not isinstance(count, (int, long)) or count < 1: - raise RedisError('XREVRANGE count must be a positive integer') - pieces.append(Token.get_token('COUNT')) - pieces.append(str(count)) + if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0: + raise RedisError("XCLAIM min_idle_time must be a non negative " + "integer") + if not isinstance(message_ids, (list, tuple)) or not message_ids: + raise RedisError("XCLAIM message_ids must be a non empty list or " + "tuple of message IDs to claim") - return self.execute_command('XREVRANGE', name, *pieces) + pieces = [name, groupname, consumername, str(min_idle_time)] + pieces.extend(list(message_ids)) - def xlen(self, name): - """ - Returns the number of elements in a given stream. - """ - return self.execute_command('XLEN', name) + if idle is not None: + if not isinstance(idle, (int, long)): + raise RedisError("XCLAIM idle must be an integer") + pieces.extend((Token.get_token('IDLE'), str(idle))) + if time is not None: + if not isinstance(time, (int, long)): + raise RedisError("XCLAIM time must be an integer") + pieces.extend((Token.get_token('TIME'), str(time))) + if retrycount is not None: + if not isinstance(retrycount, (int, long)): + raise RedisError("XCLAIM retrycount must be an integer") + pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount))) - def xread(self, streams, count=None, block=None): + if force: + if not isinstance(force, bool): + raise RedisError("XCLAIM force must be a boolean") + pieces.append(Token.get_token('FORCE')) + if justid: + if not isinstance(justid, bool): + raise RedisError("XCLAIM justid must be a boolean") + pieces.append(Token.get_token('JUSTID')) + return self.execute_command('XCLAIM', *pieces) + + def xdel(self, name, *ids): """ - Block and monitor multiple streams for new data. - streams: a dict of stream names to stream IDs, where - IDs indicate the last ID already seen. - count: if set, only return this many items, beginning with the - earliest available. - block: number of milliseconds to wait, if nothing already present. + Deletes one or more messages from a stream. + name: name of the stream. + *ids: message ids to delete. """ - pieces = [] - if block is not None: - if not isinstance(block, (int, long)) or block < 0: - raise RedisError('XREAD block must be a non-negative integer') - pieces.append(Token.get_token('BLOCK')) - pieces.append(str(block)) - if count is not None: - if not isinstance(count, (int, long)) or count < 1: - raise RedisError('XREAD count must be a positive integer') - pieces.append(Token.get_token('COUNT')) - pieces.append(str(count)) - if not isinstance(streams, dict) or len(streams) == 0: - raise RedisError('XREAD streams must be a non empty dict') - pieces.append(Token.get_token('STREAMS')) - pieces.extend(streams.keys()) - pieces.extend(streams.values()) - return self.execute_command('XREAD', *pieces) + return self.execute_command('XDEL', name, *ids) def xgroup_create(self, name, groupname, id): """ @@ -1865,6 +1867,18 @@ class StrictRedis(object): """ return self.execute_command('XGROUP CREATE', name, groupname, id) + def xgroup_delconsumer(self, name, groupname, consumername): + """ + Remove a specific consumer from a consumer group. + Returns the number of pending messages that the consumer had before it + was deleted. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to delete + """ + return self.execute_command('XGROUP DELCONSUMER', name, groupname, + consumername) + def xgroup_destroy(self, name, groupname): """ Destroy a consumer group. @@ -1882,17 +1896,20 @@ class StrictRedis(object): """ return self.execute_command('XGROUP SETID', name, groupname, id) - def xgroup_delconsumer(self, name, groupname, consumername): + def xinfo_consumers(self, name, groupname): """ - Remove a specific consumer from a consumer group. - Returns the number of pending messages that the consumer had before it - was deleted. + Returns general information about the consumers in the group. name: name of the stream. groupname: name of the consumer group. - consumername: name of consumer to delete """ - return self.execute_command('XGROUP DELCONSUMER', name, groupname, - consumername) + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) def xinfo_stream(self, name): """ @@ -1901,37 +1918,116 @@ class StrictRedis(object): """ return self.execute_command('XINFO STREAM', name) - def xinfo_consumers(self, name, groupname): + def xlen(self, name): """ - Returns general information about the consumers in the group. - name: name of the stream. - groupname: name of the consumer group. + Returns the number of elements in a given stream. """ - return self.execute_command('XINFO CONSUMERS', name, groupname) + return self.execute_command('XLEN', name) - def xinfo_groups(self, name): + def xpending(self, name, groupname): """ - Returns general information about the consumer groups of the stream. + Returns information about pending messages of a group. name: name of the stream. + groupname: name of the consumer group. """ - return self.execute_command('XINFO GROUPS', name) + return self.execute_command('XPENDING', name, groupname) - def xack(self, name, groupname, *ids): + def xpending_range(self, name, groupname, start='-', end='+', count=-1, + consumername=None): """ - Acknowledges the successful processing of one or more messages. + Returns information about pending messages, in a range. name: name of the stream. groupname: name of the consumer group. - *ids: message ids to acknowlege. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + consumername: name of a consumer to filter by (optional). """ - return self.execute_command('XACK', name, groupname, *ids) + pieces = [name, groupname] + if start is not None or end is not None or count is not None: + if start is None or end is None or count is None: + raise RedisError("XPENDING must be provided with start, end " + "and count parameters, or none of them. ") + if not isinstance(count, (int, long)) or count < -1: + raise RedisError("XPENDING count must be a integer >= -1") + pieces.extend((start, end, str(count))) + if consumername is not None: + if start is None or end is None or count is None: + raise RedisError("if XPENDING is provided with consumername," + " it must be provided with start, end and" + " count parameters") + pieces.append(consumername) + return self.execute_command('XPENDING', *pieces, parse_detail=True) - def xdel(self, name, *ids): + def xrange(self, name, start='-', finish='+', count=None): """ - Deletes one or more messages from a stream. + Read stream values within an interval. name: name of the stream. - *ids: message ids to delete. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. """ - return self.execute_command('XDEL', name, *ids) + pieces = [start, finish] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XRANGE', name, *pieces) + + def xread(self, streams, count=None, block=None): + """ + Block and monitor multiple streams for new data. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + """ + pieces = [] + if block is not None: + if not isinstance(block, (int, long)) or block < 0: + raise RedisError('XREAD block must be a non-negative integer') + pieces.append(Token.get_token('BLOCK')) + pieces.append(str(block)) + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREAD count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREAD streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) + return self.execute_command('XREAD', *pieces) + + def xrevrange(self, name, start='+', finish='-', count=None): + """ + Read stream values within an interval, in reverse order. + name: name of the stream + start: first stream ID. defaults to '+', + meaning the latest available. + finish: last stream ID. defaults to '-', + meaning the earliest available. + count: if set, only return this many items, beginning with the + latest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREVRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XREVRANGE', name, *pieces) def xtrim(self, name, maxlen, approximate=True): """ @@ -1977,102 +2073,6 @@ class StrictRedis(object): pieces.extend(streams.values()) return self.execute_command('XREADGROUP', *pieces) - def xpending(self, name, groupname): - """ - Returns information about pending messages of a group. - name: name of the stream. - groupname: name of the consumer group. - """ - return self.execute_command('XPENDING', name, groupname) - - def xpending_range(self, name, groupname, start='-', end='+', count=-1, - consumername=None): - """ - Returns information about pending messages, in a range. - name: name of the stream. - groupname: name of the consumer group. - start: first stream ID. defaults to '-', - meaning the earliest available. - finish: last stream ID. defaults to '+', - meaning the latest available. - count: if set, only return this many items, beginning with the - earliest available. - consumername: name of a consumer to filter by (optional). - """ - pieces = [name, groupname] - if start is not None or end is not None or count is not None: - if start is None or end is None or count is None: - raise RedisError("XPENDING must be provided with start, end " - "and count parameters, or none of them. ") - if not isinstance(count, (int, long)) or count < -1: - raise RedisError("XPENDING count must be a integer >= -1") - pieces.extend((start, end, str(count))) - if consumername is not None: - if start is None or end is None or count is None: - raise RedisError("if XPENDING is provided with consumername," - " it must be provided with start, end and" - " count parameters") - pieces.append(consumername) - return self.execute_command('XPENDING', *pieces, parse_detail=True) - - def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, - idle=None, time=None, retrycount=None, force=False, - justid=False): - """ - Changes the ownership of a pending message. - name: name of the stream. - groupname: name of the consumer group. - consumername: name of a consumer that claims the message. - min_idle_time: filter messages that were idle less than this amount of - milliseconds - message_ids: non-empty list or tuple of message IDs to claim - idle: optional. Set the idle time (last time it was delivered) of the - message in ms - time: optional integer. This is the same as idle but instead of a - relative amount of milliseconds, it sets the idle time to a specific - Unix time (in milliseconds). - retrycount: optional integer. set the retry counter to the specified - value. This counter is incremented every time a message is delivered - again. - force: optional boolean, false by default. Creates the pending message - entry in the PEL even if certain specified IDs are not already in the - PEL assigned to a different client. - justid: optional boolean, false by default. Return just an array of IDs - of messages successfully claimed, without returning the actual message - """ - if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0: - raise RedisError("XCLAIM min_idle_time must be a non negative " - "integer") - if not isinstance(message_ids, (list, tuple)) or not message_ids: - raise RedisError("XCLAIM message_ids must be a non empty list or " - "tuple of message IDs to claim") - - pieces = [name, groupname, consumername, str(min_idle_time)] - pieces.extend(list(message_ids)) - - if idle is not None: - if not isinstance(idle, (int, long)): - raise RedisError("XCLAIM idle must be an integer") - pieces.extend((Token.get_token('IDLE'), str(idle))) - if time is not None: - if not isinstance(time, (int, long)): - raise RedisError("XCLAIM time must be an integer") - pieces.extend((Token.get_token('TIME'), str(time))) - if retrycount is not None: - if not isinstance(retrycount, (int, long)): - raise RedisError("XCLAIM retrycount must be an integer") - pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount))) - - if force: - if not isinstance(force, bool): - raise RedisError("XCLAIM force must be a boolean") - pieces.append(Token.get_token('FORCE')) - if justid: - if not isinstance(justid, bool): - raise RedisError("XCLAIM justid must be a boolean") - pieces.append(Token.get_token('JUSTID')) - return self.execute_command('XCLAIM', *pieces) - # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ |