diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 07:30:43 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 07:30:43 -0700 |
commit | 21c56b372bffe9a38d0a57dd6b3faa6a88ef83bb (patch) | |
tree | 3ee472f49348b459aebf2e60c43eb8e3ac07af70 | |
parent | 0d6c5f28ef87c83df5540abc358252cae3d2060e (diff) | |
parent | a32a8e630c25a2a2e8b637ac7af80ba7df048f23 (diff) | |
download | redis-py-21c56b372bffe9a38d0a57dd6b3faa6a88ef83bb.tar.gz |
Merge branch 'pr/1040'
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | .travis.yml | 13 | ||||
-rw-r--r-- | benchmarks/basic_operations.py | 1 | ||||
-rw-r--r-- | benchmarks/command_packer_benchmark.py | 8 | ||||
-rw-r--r-- | build_tools/.bash_profile | 1 | ||||
-rwxr-xr-x | build_tools/bootstrap.sh (renamed from vagrant/bootstrap.sh) | 0 | ||||
-rwxr-xr-x | build_tools/build_redis.sh (renamed from vagrant/build_redis.sh) | 2 | ||||
-rwxr-xr-x | build_tools/install_redis.sh (renamed from vagrant/install_redis.sh) | 2 | ||||
-rwxr-xr-x | build_tools/install_sentinel.sh (renamed from vagrant/install_sentinel.sh) | 2 | ||||
-rw-r--r-- | build_tools/redis-configs/001-master (renamed from vagrant/redis-configs/001-master) | 2 | ||||
-rw-r--r-- | build_tools/redis-configs/002-slave (renamed from vagrant/redis-configs/002-slave) | 2 | ||||
-rwxr-xr-x | build_tools/redis_init_script (renamed from vagrant/redis_init_script) | 6 | ||||
-rwxr-xr-x | build_tools/redis_vars.sh (renamed from vagrant/redis_vars.sh) | 4 | ||||
-rw-r--r-- | build_tools/sentinel-configs/001-1 (renamed from vagrant/sentinel-configs/001-1) | 0 | ||||
-rw-r--r-- | build_tools/sentinel-configs/002-2 (renamed from vagrant/sentinel-configs/002-2) | 0 | ||||
-rw-r--r-- | build_tools/sentinel-configs/003-3 (renamed from vagrant/sentinel-configs/003-3) | 0 | ||||
-rwxr-xr-x | build_tools/sentinel_init_script (renamed from vagrant/sentinel_init_script) | 6 | ||||
-rw-r--r-- | redis/_compat.py | 2 | ||||
-rwxr-xr-x | redis/client.py | 410 | ||||
-rwxr-xr-x | redis/connection.py | 10 | ||||
-rw-r--r-- | setup.cfg | 2 | ||||
-rw-r--r-- | tests/test_commands.py | 189 | ||||
-rw-r--r-- | tox.ini | 10 | ||||
-rw-r--r-- | vagrant/.bash_profile | 1 | ||||
-rw-r--r-- | vagrant/Vagrantfile | 10 |
25 files changed, 641 insertions, 43 deletions
@@ -9,3 +9,4 @@ vagrant/.vagrant .python-version .cache .eggs +.idea
\ No newline at end of file diff --git a/.travis.yml b/.travis.yml index fcbdbb9..4c4e951 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,19 +7,20 @@ python: - "3.3" - "2.7" - "2.6" -services: - - redis-server +before_install: + - wget http://download.redis.io/releases/redis-5.0.0.tar.gz && mkdir redis_install && tar -xvzf redis-5.0.0.tar.gz -C redis_install && cd redis_install/redis-5.0.0 && make && src/redis-server --daemonize yes && cd ../.. + - redis-cli info env: - TEST_HIREDIS=0 - TEST_HIREDIS=1 install: - pip install -e . - - "if [[ $TEST_PEP8 == '1' ]]; then pip install pep8; fi" + - "if [[ $TEST_PYCODESTYLE == '1' ]]; then pip install pycodestyle; fi" - "if [[ $TEST_HIREDIS == '1' ]]; then pip install hiredis; fi" -script: "if [[ $TEST_PEP8 == '1' ]]; then pep8 --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg .; else python setup.py test; fi" +script: "if [[ $TEST_PYCODESTYLE == '1' ]]; then pycodestyle --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg,redis_install .; else python setup.py test; fi" matrix: include: - python: "2.7" - env: TEST_PEP8=1 + env: TEST_PYCODESTYLE=1 - python: "3.6" - env: TEST_PEP8=1 + env: TEST_PYCODESTYLE=1 diff --git a/benchmarks/basic_operations.py b/benchmarks/basic_operations.py index 2f1e9f4..c50a610 100644 --- a/benchmarks/basic_operations.py +++ b/benchmarks/basic_operations.py @@ -195,5 +195,6 @@ def hmset(conn, num, pipeline_size, data_size): if pipeline_size > 1: conn.execute() + if __name__ == '__main__': run() diff --git a/benchmarks/command_packer_benchmark.py b/benchmarks/command_packer_benchmark.py index 49f48f2..9eb1853 100644 --- a/benchmarks/command_packer_benchmark.py +++ b/benchmarks/command_packer_benchmark.py @@ -22,9 +22,9 @@ class StringJoiningConnection(Connection): _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def pack_command(self, *args): "Pack a series of arguments into a value Redis command" @@ -54,9 +54,9 @@ class ListJoiningConnection(Connection): _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def pack_command(self, *args): output = [] diff --git a/build_tools/.bash_profile b/build_tools/.bash_profile new file mode 100644 index 0000000..b023cf7 --- /dev/null +++ b/build_tools/.bash_profile @@ -0,0 +1 @@ +PATH=$PATH:/var/lib/redis/bin diff --git a/vagrant/bootstrap.sh b/build_tools/bootstrap.sh index a5a0d2c..a5a0d2c 100755 --- a/vagrant/bootstrap.sh +++ b/build_tools/bootstrap.sh diff --git a/vagrant/build_redis.sh b/build_tools/build_redis.sh index 728e617..379c6cc 100755 --- a/vagrant/build_redis.sh +++ b/build_tools/build_redis.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source /home/vagrant/redis-py/vagrant/redis_vars.sh +source /home/vagrant/redis-py/build_tools/redis_vars.sh pushd /home/vagrant diff --git a/vagrant/install_redis.sh b/build_tools/install_redis.sh index bb5f1d2..fd53a1c 100755 --- a/vagrant/install_redis.sh +++ b/build_tools/install_redis.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source /home/vagrant/redis-py/vagrant/redis_vars.sh +source /home/vagrant/redis-py/build_tools/redis_vars.sh for filename in `ls $VAGRANT_REDIS_CONF_DIR`; do # cuts the order prefix off of the filename, e.g. 001-master -> master diff --git a/vagrant/install_sentinel.sh b/build_tools/install_sentinel.sh index 58cd808..0597208 100755 --- a/vagrant/install_sentinel.sh +++ b/build_tools/install_sentinel.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source /home/vagrant/redis-py/vagrant/redis_vars.sh +source /home/vagrant/redis-py/build_tools/redis_vars.sh for filename in `ls $VAGRANT_SENTINEL_CONF_DIR`; do # cuts the order prefix off of the filename, e.g. 001-master -> master diff --git a/vagrant/redis-configs/001-master b/build_tools/redis-configs/001-master index f04f23d..8591f1a 100644 --- a/vagrant/redis-configs/001-master +++ b/build_tools/redis-configs/001-master @@ -5,4 +5,4 @@ daemonize yes unixsocket /tmp/redis_master.sock unixsocketperm 777 dbfilename master.rdb -dir /home/vagrant/redis/backups +dir /var/lib/redis/backups diff --git a/vagrant/redis-configs/002-slave b/build_tools/redis-configs/002-slave index 5d302fe..13eb77e 100644 --- a/vagrant/redis-configs/002-slave +++ b/build_tools/redis-configs/002-slave @@ -5,6 +5,6 @@ daemonize yes unixsocket /tmp/redis-slave.sock unixsocketperm 777 dbfilename slave.rdb -dir /home/vagrant/redis/backups +dir /var/lib/redis/backups slaveof 127.0.0.1 6379 diff --git a/vagrant/redis_init_script b/build_tools/redis_init_script index e8bfa08..04cb2db 100755 --- a/vagrant/redis_init_script +++ b/build_tools/redis_init_script @@ -12,10 +12,10 @@ REDISPORT={{ PORT }} PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf -EXEC=/home/vagrant/redis/bin/redis-server -CLIEXEC=/home/vagrant/redis/bin/redis-cli +EXEC=/var/lib/redis/bin/redis-server +CLIEXEC=/var/lib/redis/bin/redis-cli case "$1" in start) diff --git a/vagrant/redis_vars.sh b/build_tools/redis_vars.sh index 1ec6295..c52dd4c 100755 --- a/vagrant/redis_vars.sh +++ b/build_tools/redis_vars.sh @@ -1,13 +1,13 @@ #!/usr/bin/env bash -VAGRANT_DIR=/home/vagrant/redis-py/vagrant +VAGRANT_DIR=/home/vagrant/redis-py/build_tools VAGRANT_REDIS_CONF_DIR=$VAGRANT_DIR/redis-configs VAGRANT_SENTINEL_CONF_DIR=$VAGRANT_DIR/sentinel-configs REDIS_VERSION=3.2.0 REDIS_DOWNLOAD_DIR=/home/vagrant/redis-downloads REDIS_PACKAGE=redis-$REDIS_VERSION.tar.gz REDIS_BUILD_DIR=$REDIS_DOWNLOAD_DIR/redis-$REDIS_VERSION -REDIS_DIR=/home/vagrant/redis +REDIS_DIR=/var/lib/redis REDIS_BIN_DIR=$REDIS_DIR/bin REDIS_CONF_DIR=$REDIS_DIR/conf REDIS_SAVE_DIR=$REDIS_DIR/backups diff --git a/vagrant/sentinel-configs/001-1 b/build_tools/sentinel-configs/001-1 index eccc3d1..eccc3d1 100644 --- a/vagrant/sentinel-configs/001-1 +++ b/build_tools/sentinel-configs/001-1 diff --git a/vagrant/sentinel-configs/002-2 b/build_tools/sentinel-configs/002-2 index 0cd2801..0cd2801 100644 --- a/vagrant/sentinel-configs/002-2 +++ b/build_tools/sentinel-configs/002-2 diff --git a/vagrant/sentinel-configs/003-3 b/build_tools/sentinel-configs/003-3 index c7f4fcd..c7f4fcd 100644 --- a/vagrant/sentinel-configs/003-3 +++ b/build_tools/sentinel-configs/003-3 diff --git a/vagrant/sentinel_init_script b/build_tools/sentinel_init_script index ea93537..1d94804 100755 --- a/vagrant/sentinel_init_script +++ b/build_tools/sentinel_init_script @@ -12,10 +12,10 @@ SENTINELPORT={{ PORT }} PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf -EXEC=/home/vagrant/redis/bin/redis-sentinel -CLIEXEC=/home/vagrant/redis/bin/redis-cli +EXEC=/var/lib/redis/bin/redis-sentinel +CLIEXEC=/var/lib/redis/bin/redis-cli case "$1" in start) diff --git a/redis/_compat.py b/redis/_compat.py index 307f3cc..32063e7 100644 --- a/redis/_compat.py +++ b/redis/_compat.py @@ -4,7 +4,7 @@ import sys try: InterruptedError = InterruptedError -except: +except NameError: InterruptedError = OSError # For Python older than 3.5, retry EINTR. diff --git a/redis/client.py b/redis/client.py index 79e94d0..6108087 100755 --- a/redis/client.py +++ b/redis/client.py @@ -195,7 +195,7 @@ def pairs_to_dict_typed(response, type_info): if key in type_info: try: value = type_info[key](value) - except: + except Exception: # if for some reason the value can't be coerced, just use # the string value pass @@ -232,6 +232,76 @@ def int_or_none(response): return int(response) +def stream_list(response): + if response is None: + return None + return [(r[0], pairs_to_dict(r[1])) for r in response] + + +def parse_recursive_dict(response): + if response is None: + return None + result = {} + while response: + k = response.pop(0) + v = response.pop(0) + if isinstance(v, list): + v = parse_recursive_dict(v) + result[k] = v + return result + + +def parse_list_of_recursive_dicts(response): + if response is None: + return None + result = [] + for group in response: + result.append(parse_recursive_dict(group)) + return result + + +def parse_xclaim(response): + if all(isinstance(r, (basestring, bytes)) for r in response): + return response + return stream_list(response) + + +def parse_xread(response): + if response is None: + return [] + return [[nativestr(r[0]), stream_list(r[1])] for r in response] + + +def parse_xpending(response, **options): + if isinstance(response, list): + if options.get('parse_detail', False): + return parse_range_xpending(response) + consumers = [] + for consumer_name, consumer_pending in response[3]: + consumers.append({ + 'name': consumer_name, + 'pending': consumer_pending + }) + return { + 'pending': response[0], + 'lower': response[1], + 'upper': response[2], + 'consumers': consumers + } + + +def parse_range_xpending(response): + result = [] + for message in response: + result.append({ + 'message_id': message[0], + 'consumer': message[1], + 'time_since_delivered': message[2], + 'times_delivered': message[3] + }) + return result + + def float_or_none(response): if response is None: return None @@ -366,7 +436,26 @@ class StrictRedis(object): 'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE ' 'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD ' 'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ' - 'GEOADD', + 'GEOADD XLEN', + int + ), + string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XPENDING', parse_xpending), + string_keys_to_dict('XREAD XREADGROUP', parse_xread), + { + 'XGROUP CREATE': bool_ok, + 'XGROUP DESTROY': int, + 'XGROUP SETID': bool_ok, + 'XGROUP DELCONSUMER': int + }, + { + 'XINFO STREAM': parse_recursive_dict, + 'XINFO CONSUMERS': parse_list_of_recursive_dicts, + 'XINFO GROUPS': parse_list_of_recursive_dicts + }, + string_keys_to_dict('XCLAIM', parse_xclaim), + string_keys_to_dict( + 'XACK XDEL XTRIM', int ), string_keys_to_dict( @@ -1675,6 +1764,323 @@ class StrictRedis(object): args = list_or_args(keys, args) return self.execute_command('SUNIONSTORE', dest, *args) + # STREAMS COMMANDS + def xadd(self, _name, fields, id='*', maxlen=None, approximate=True): + """ + Add to a stream. + _name: name of the stream (not using 'name' as this would + prevent 'name' used in the kwargs + fields: dict of field/value pairs to insert into the stream + id: Location to insert this record. By default it is appended. + maxlen: truncate old stream members beyond this size + approximate: actual stream length may be slightly more than maxlen + + """ + pieces = [] + if maxlen is not None: + if not isinstance(maxlen, (int, long)) or maxlen < 1: + raise RedisError('XADD maxlen must be a positive integer') + pieces.append(Token.get_token('MAXLEN')) + if approximate: + pieces.append(Token.get_token('~')) + pieces.append(str(maxlen)) + pieces.append(id) + if not isinstance(fields, dict) or len(fields) == 0: + raise RedisError('XADD fields must be a non-empty dict') + for pair in iteritems(fields): + pieces.extend(pair) + return self.execute_command('XADD', _name, *pieces) + + def xrange(self, name, start='-', finish='+', count=None): + """ + Read stream values within an interval. + name: name of the stream. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XRANGE', name, *pieces) + + def xrevrange(self, name, start='+', finish='-', count=None): + """ + Read stream values within an interval, in reverse order. + name: name of the stream + start: first stream ID. defaults to '+', + meaning the latest available. + finish: last stream ID. defaults to '-', + meaning the earliest available. + count: if set, only return this many items, beginning with the + latest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREVRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XREVRANGE', name, *pieces) + + def xlen(self, name): + """ + Returns the number of elements in a given stream. + """ + return self.execute_command('XLEN', name) + + def xread(self, streams, count=None, block=None): + """ + Block and monitor multiple streams for new data. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + """ + pieces = [] + if block is not None: + if not isinstance(block, (int, long)) or block < 0: + raise RedisError('XREAD block must be a non-negative integer') + pieces.append(Token.get_token('BLOCK')) + pieces.append(str(block)) + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREAD count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREAD streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) + return self.execute_command('XREAD', *pieces) + + def xgroup_create(self, name, groupname, id): + """ + Create a new consumer group associated with a stream. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP CREATE', name, groupname, id) + + def xgroup_destroy(self, name, groupname): + """ + Destroy a consumer group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XGROUP DESTROY', name, groupname) + + def xgroup_setid(self, name, groupname, id): + """ + Set the consumer group last delivered ID to something else. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP SETID', name, groupname, id) + + def xgroup_delconsumer(self, name, groupname, consumername): + """ + Remove a specific consumer from a consumer group. + Returns the number of pending messages that the consumer had before it + was deleted. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to delete + """ + return self.execute_command('XGROUP DELCONSUMER', name, groupname, + consumername) + + def xinfo_stream(self, name): + """ + Returns general information about the stream. + name: name of the stream. + """ + return self.execute_command('XINFO STREAM', name) + + def xinfo_consumers(self, name, groupname): + """ + Returns general information about the consumers in the group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) + + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + + def xdel(self, name, *ids): + """ + Deletes one or more messages from a stream. + name: name of the stream. + *ids: message ids to delete. + """ + return self.execute_command('XDEL', name, *ids) + + def xtrim(self, name, maxlen, approximate=True): + """ + Trims old messages from a stream. + name: name of the stream. + maxlen: truncate old stream messages beyond this size + approximate: actual stream length may be slightly more than maxlen + """ + pieces = [Token.get_token('MAXLEN')] + if approximate: + pieces.append(Token.get_token('~')) + pieces.append(maxlen) + return self.execute_command('XTRIM', name, *pieces) + + def xreadgroup(self, groupname, consumername, streams, count=None, + block=None): + """ + Read from a stream via a consumer group. + groupname: name of the consumer group. + consumername: name of the requesting consumer. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + """ + pieces = [Token.get_token('GROUP'), groupname, consumername] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError("XREADGROUP count must be a positive integer") + pieces.append(Token.get_token("COUNT")) + pieces.append(str(count)) + if block is not None: + if not isinstance(block, (int, long)) or block < 0: + raise RedisError("XREADGROUP block must be a non-negative " + "integer") + pieces.append(Token.get_token("BLOCK")) + pieces.append(str(block)) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREADGROUP streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) + return self.execute_command('XREADGROUP', *pieces) + + def xpending(self, name, groupname): + """ + Returns information about pending messages of a group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XPENDING', name, groupname) + + def xpending_range(self, name, groupname, start='-', end='+', count=-1, + consumername=None): + """ + Returns information about pending messages, in a range. + name: name of the stream. + groupname: name of the consumer group. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + consumername: name of a consumer to filter by (optional). + """ + pieces = [name, groupname] + if start is not None or end is not None or count is not None: + if start is None or end is None or count is None: + raise RedisError("XPENDING must be provided with start, end " + "and count parameters, or none of them. ") + if not isinstance(count, (int, long)) or count < -1: + raise RedisError("XPENDING count must be a integer >= -1") + pieces.extend((start, end, str(count))) + if consumername is not None: + if start is None or end is None or count is None: + raise RedisError("if XPENDING is provided with consumername," + " it must be provided with start, end and" + " count parameters") + pieces.append(consumername) + return self.execute_command('XPENDING', *pieces, parse_detail=True) + + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, + idle=None, time=None, retrycount=None, force=False, + justid=False): + """ + Changes the ownership of a pending message. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds + message_ids: non-empty list or tuple of message IDs to claim + idle: optional. Set the idle time (last time it was delivered) of the + message in ms + time: optional integer. This is the same as idle but instead of a + relative amount of milliseconds, it sets the idle time to a specific + Unix time (in milliseconds). + retrycount: optional integer. set the retry counter to the specified + value. This counter is incremented every time a message is delivered + again. + force: optional boolean, false by default. Creates the pending message + entry in the PEL even if certain specified IDs are not already in the + PEL assigned to a different client. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message + """ + if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0: + raise RedisError("XCLAIM min_idle_time must be a non negative " + "integer") + if not isinstance(message_ids, (list, tuple)) or not message_ids: + raise RedisError("XCLAIM message_ids must be a non empty list or " + "tuple of message IDs to claim") + + pieces = [name, groupname, consumername, str(min_idle_time)] + pieces.extend(list(message_ids)) + + if idle is not None: + if not isinstance(idle, (int, long)): + raise RedisError("XCLAIM idle must be an integer") + pieces.extend((Token.get_token('IDLE'), str(idle))) + if time is not None: + if not isinstance(time, (int, long)): + raise RedisError("XCLAIM time must be an integer") + pieces.extend((Token.get_token('TIME'), str(time))) + if retrycount is not None: + if not isinstance(retrycount, (int, long)): + raise RedisError("XCLAIM retrycount must be an integer") + pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount))) + + if force: + if not isinstance(force, bool): + raise RedisError("XCLAIM force must be a boolean") + pieces.append(Token.get_token('FORCE')) + if justid: + if not isinstance(justid, bool): + raise RedisError("XCLAIM justid must be a boolean") + pieces.append(Token.get_token('JUSTID')) + return self.execute_command('XCLAIM', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/redis/connection.py b/redis/connection.py index 6ad467a..1190260 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -236,7 +236,7 @@ class SocketBuffer(object): try: self.purge() self._buffer.close() - except: + except Exception: # issue #633 suggests the purge/close somehow raised a # BadFileDescriptor error. Perhaps the client ran out of # memory or something else? It's probably OK to ignore @@ -602,9 +602,9 @@ class Connection(object): errmsg = e.args[1] raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def send_command(self, *args): "Pack and send a command to the Redis server" @@ -623,9 +623,9 @@ class Connection(object): "Read the response from a previously sent command" try: response = self._parser.read_response() - except: + except Exception as e: self.disconnect() - raise + raise e if isinstance(response, ResponseError): raise response return response @@ -1,4 +1,4 @@ -[pep8] +[pycodestyle] show-source = 1 exclude = .venv,.tox,dist,docs,build,*.egg diff --git a/tests/test_commands.py b/tests/test_commands.py index b9b9b66..9635732 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1603,6 +1603,195 @@ class TestRedisCommands(object): class TestStrictCommands(object): + @skip_if_server_version_lt('5.0.0') + def test_strict_xrange(self, sr): + varname = 'xrange_test' + sr.delete(varname) + assert sr.xlen(varname) == 0 + stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) + assert sr.xlen(varname) == 1 + stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) + assert sr.xlen(varname) == 2 + assert stamp1 != stamp2 + + milli, offset = stamp2.decode('utf-8').split('-') + new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8') + stamp3 = sr.xadd(varname, {"foo": "bar"}, id=new_id) + assert sr.xlen(varname) == 3 + assert stamp3 == new_id + stamp4 = sr.xadd(varname, {"foo": "baz"}) + assert sr.xlen(varname) == 4 + + def get_ids(results): + return [result[0] for result in results] + + results = sr.xrange(varname, start=stamp1) + assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4] + + results = sr.xrange(varname, start=stamp2, finish=stamp3) + assert get_ids(results) == [stamp2, stamp3] + + results = sr.xrange(varname, finish=stamp3) + assert get_ids(results) == [stamp1, stamp2, stamp3] + + results = sr.xrange(varname, finish=stamp2, count=1) + assert get_ids(results) == [stamp1] + + results = sr.xrevrange(varname, start=stamp4) + assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1] + + results = sr.xrevrange(varname, start=stamp3, finish=stamp2) + assert get_ids(results) == [stamp3, stamp2] + + results = sr.xrevrange(varname, finish=stamp3) + assert get_ids(results) == [stamp4, stamp3] + + results = sr.xrevrange(varname, finish=stamp2, count=1) + assert get_ids(results) == [stamp4] + + assert sr.xlen(varname) == 4 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xread(self, sr): + varname = 'xread_test' + sr.delete(varname) + stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) + stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) + assert stamp1 != stamp2 + + results = sr.xread(streams={varname: '$'}, count=10, block=10) + assert results == [] + + results = sr.xread(count=3, block=0, streams={varname: stamp1}) + assert results[0][1][0][0] == stamp2 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xgroup(self, sr): + stream_name = 'xgroup_test_stream' + sr.delete(stream_name) + group_name = 'xgroup_test_group' + message = {'name': 'boaty', 'other': 'mcboatface'} + b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} + + stamp1 = sr.xadd(stream_name, message) + assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] + + assert sr.xinfo_groups(name=stream_name) == [] + assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) + + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b(stamp1) + assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b('0-0') + + consumer_name = 'captain_jack_sparrow' + + expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] + assert sr.xreadgroup(groupname=group_name, + consumername=consumer_name, + streams={stream_name: '0'}) == expected_value + + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 + sr.xgroup_delconsumer(stream_name, group_name, consumer_name) + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 + + assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xack(self, sr): + stream_name = 'xack_test_stream' + sr.delete(stream_name) + group_name = 'xack_test_group' + + assert sr.xack(stream_name, group_name, 0) == 0 + assert sr.xack(stream_name, group_name, '1-1') == 0 + assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xdel(self, sr): + stream_name = 'xdel_test_stream' + sr.delete(stream_name) + + assert sr.xdel(stream_name, 1) == 0 + + sr.xadd(stream_name, {"foo": "bar"}, id=1) + assert sr.xdel(stream_name, 1) == 1 + + stamp = sr.xadd(stream_name, {"baz": "qaz"}) + assert sr.xdel(stream_name, 1, stamp) == 1 + assert sr.xdel(stream_name, 1, stamp, 42) == 0 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xtrim(self, sr): + stream_name = 'xtrim_test_stream' + sr.delete(stream_name) + + assert sr.xtrim(stream_name, 1000) == 0 + + for i in range(300): + sr.xadd(stream_name, {"index": i}) + + assert sr.xtrim(stream_name, 1000, approximate=False) == 0 + assert sr.xtrim(stream_name, 300) == 0 + assert sr.xtrim(stream_name, 299) == 0 + assert sr.xtrim(stream_name, 234) == 0 + assert sr.xtrim(stream_name, 234, approximate=False) == 66 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xack(self, sr): + stream_name = 'xack_test_stream' + sr.delete(stream_name) + group_name = 'xack_test_group' + + assert sr.xack(stream_name, group_name, 0) == 0 + assert sr.xack(stream_name, group_name, '1-1') == 0 + assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xclaim(self, sr): + stream_name = 'xclaim_test_stream' + group_name = 'xclaim_test_consumer_group' + sr.delete(stream_name) + + stamp = sr.xadd(stream_name, {"john": "wick"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, 'action_movie_consumer', + streams={stream_name: 0}) + assert sr.xinfo_consumers(stream_name, group_name)[0][ + b('name')] == b('action_movie_consumer') + assert sr.xclaim(stream_name, group_name, 'reeves_fan', + min_idle_time=0, message_ids=(stamp,))[0][0] == stamp + assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', + min_idle_time=0, message_ids=(stamp,), + justid=True) == [b(stamp), ] + + @skip_if_server_version_lt('5.0.0') + def test_strict_xpending(self, sr): + stream_name = 'xpending_test_stream' + group_name = 'xpending_test_consumer_group' + consumer_name = 'marie' + sr.delete(stream_name) + + sr.xadd(stream_name, {"foo": "bar"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, consumer_name, + streams={stream_name: 0}) + response = sr.xpending(stream_name, group_name) + assert sorted(response.keys()) == ['consumers', 'lower', 'pending', + 'upper'] + + response = sr.xpending_range(stream_name, group_name, + consumername=consumer_name) + assert sorted(response[0].keys()) == ['consumer', 'message_id', + 'time_since_delivered', + 'times_delivered'] + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ @@ -1,6 +1,6 @@ [tox] minversion = 1.8 -envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pep8 +envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pycodestyle [testenv] deps = @@ -9,9 +9,9 @@ deps = hiredis: hiredis >= 0.1.3 commands = py.test {posargs} -[testenv:pep8] -basepython = python2.6 -deps = pep8 -commands = pep8 +[testenv:pycodestyle] +basepython = python3.6 +deps = pycodestyle +commands = pycodestyle skipsdist = true skip_install = true diff --git a/vagrant/.bash_profile b/vagrant/.bash_profile deleted file mode 100644 index e3d9bca..0000000 --- a/vagrant/.bash_profile +++ /dev/null @@ -1 +0,0 @@ -PATH=$PATH:/home/vagrant/redis/bin diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index 7465ccd..3ee7aee 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -12,11 +12,11 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.synced_folder "../", "/home/vagrant/redis-py" # install the redis server - config.vm.provision :shell, :path => "bootstrap.sh" - config.vm.provision :shell, :path => "build_redis.sh" - config.vm.provision :shell, :path => "install_redis.sh" - config.vm.provision :shell, :path => "install_sentinel.sh" - config.vm.provision :file, :source => ".bash_profile", :destination => "/home/vagrant/.bash_profile" + config.vm.provision :shell, :path => "../build_tools/bootstrap.sh" + config.vm.provision :shell, :path => "../build_tools/build_redis.sh" + config.vm.provision :shell, :path => "../build_tools/install_redis.sh" + config.vm.provision :shell, :path => "../build_tools/install_sentinel.sh" + config.vm.provision :file, :source => "../build_tools/.bash_profile", :destination => "/home/vagrant/.bash_profile" # setup forwarded ports config.vm.network "forwarded_port", guest: 6379, host: 6379 |