diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2018-11-02 16:00:29 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-02 16:00:29 -0700 |
commit | 5f2333e0f2ea91e15a925fff1a772d95ca87411d (patch) | |
tree | ee2b39ccc8eb7dbe03128f018ae209a5af772a10 | |
parent | ebbbb7ddfc763cc150781cb3e72c8a3f718a2260 (diff) | |
parent | a0482156f4166cb7e44d14b57bdd18026acd3f15 (diff) | |
download | redis-py-5f2333e0f2ea91e15a925fff1a772d95ca87411d.tar.gz |
Merge branch 'master' into documents-strictredis-rediss-support
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | .travis.yml | 13 | ||||
-rw-r--r-- | CHANGES | 9 | ||||
-rw-r--r-- | README.rst | 49 | ||||
-rw-r--r-- | benchmarks/basic_operations.py | 1 | ||||
-rw-r--r-- | benchmarks/command_packer_benchmark.py | 12 | ||||
-rw-r--r-- | build_tools/.bash_profile | 1 | ||||
-rwxr-xr-x | build_tools/bootstrap.sh (renamed from vagrant/bootstrap.sh) | 0 | ||||
-rwxr-xr-x | build_tools/build_redis.sh (renamed from vagrant/build_redis.sh) | 2 | ||||
-rwxr-xr-x | build_tools/install_redis.sh (renamed from vagrant/install_redis.sh) | 2 | ||||
-rwxr-xr-x | build_tools/install_sentinel.sh (renamed from vagrant/install_sentinel.sh) | 2 | ||||
-rw-r--r-- | build_tools/redis-configs/001-master (renamed from vagrant/redis-configs/001-master) | 2 | ||||
-rw-r--r-- | build_tools/redis-configs/002-slave (renamed from vagrant/redis-configs/002-slave) | 2 | ||||
-rwxr-xr-x | build_tools/redis_init_script (renamed from vagrant/redis_init_script) | 6 | ||||
-rwxr-xr-x | build_tools/redis_vars.sh (renamed from vagrant/redis_vars.sh) | 4 | ||||
-rw-r--r-- | build_tools/sentinel-configs/001-1 (renamed from vagrant/sentinel-configs/001-1) | 0 | ||||
-rw-r--r-- | build_tools/sentinel-configs/002-2 (renamed from vagrant/sentinel-configs/002-2) | 0 | ||||
-rw-r--r-- | build_tools/sentinel-configs/003-3 (renamed from vagrant/sentinel-configs/003-3) | 0 | ||||
-rwxr-xr-x | build_tools/sentinel_init_script (renamed from vagrant/sentinel_init_script) | 6 | ||||
-rw-r--r-- | redis/_compat.py | 3 | ||||
-rwxr-xr-x | redis/client.py | 525 | ||||
-rwxr-xr-x | redis/connection.py | 20 | ||||
-rw-r--r-- | redis/lock.py | 2 | ||||
-rw-r--r-- | setup.cfg | 2 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/test_commands.py | 499 | ||||
-rw-r--r-- | tests/test_connection_pool.py | 3 | ||||
-rw-r--r-- | tests/test_encoding.py | 7 | ||||
-rw-r--r-- | tests/test_pubsub.py | 4 | ||||
-rw-r--r-- | tox.ini | 10 | ||||
-rw-r--r-- | vagrant/.bash_profile | 1 | ||||
-rw-r--r-- | vagrant/Vagrantfile | 10 |
32 files changed, 1070 insertions, 130 deletions
@@ -9,3 +9,4 @@ vagrant/.vagrant .python-version .cache .eggs +.idea
\ No newline at end of file diff --git a/.travis.yml b/.travis.yml index fcbdbb9..4c4e951 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,19 +7,20 @@ python: - "3.3" - "2.7" - "2.6" -services: - - redis-server +before_install: + - wget http://download.redis.io/releases/redis-5.0.0.tar.gz && mkdir redis_install && tar -xvzf redis-5.0.0.tar.gz -C redis_install && cd redis_install/redis-5.0.0 && make && src/redis-server --daemonize yes && cd ../.. + - redis-cli info env: - TEST_HIREDIS=0 - TEST_HIREDIS=1 install: - pip install -e . - - "if [[ $TEST_PEP8 == '1' ]]; then pip install pep8; fi" + - "if [[ $TEST_PYCODESTYLE == '1' ]]; then pip install pycodestyle; fi" - "if [[ $TEST_HIREDIS == '1' ]]; then pip install hiredis; fi" -script: "if [[ $TEST_PEP8 == '1' ]]; then pep8 --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg .; else python setup.py test; fi" +script: "if [[ $TEST_PYCODESTYLE == '1' ]]; then pycodestyle --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg,redis_install .; else python setup.py test; fi" matrix: include: - python: "2.7" - env: TEST_PEP8=1 + env: TEST_PYCODESTYLE=1 - python: "3.6" - env: TEST_PEP8=1 + env: TEST_PYCODESTYLE=1 @@ -1,6 +1,7 @@ * 2.10.6 * Various performance improvements. Thanks cjsimpson - * Fixed a bug with SRANDMEMBER where + * Fixed a bug with SRANDMEMBER where the behavior for `number=0` did + not match the spec. Thanks Alex Wang * Added HSTRLEN command. Thanks Alexander Putilin * Added the TOUCH command. Thanks Anis Jonischkeit * Remove unnecessary calls to the server when registering Lua scripts. @@ -190,7 +191,7 @@ for the report. * Connections now call socket.shutdown() prior to socket.close() to ensure communication ends immediately per the note at - http://docs.python.org/2/library/socket.html#socket.socket.close + https://docs.python.org/2/library/socket.html#socket.socket.close Thanks to David Martin for pointing this out. * Lock checks are now based on floats rather than ints. Thanks Vitja Makarov. @@ -224,11 +225,11 @@ * Prevent DISCARD from being called if MULTI wasn't also called. Thanks Pete Aykroyd. * SREM now returns an integer indicating the number of items removed from - the set. Thanks http://github.com/ronniekk. + the set. Thanks https://github.com/ronniekk. * Fixed a bug with BGSAVE and BGREWRITEAOF response callbacks with Python3. Thanks Nathan Wan. * Added CLIENT GETNAME and CLIENT SETNAME commands. - Thanks http://github.com/bitterb. + Thanks https://github.com/bitterb. * It's now possible to use len() on a pipeline instance to determine the number of commands that will be executed. Thanks Jon Parise. * Fixed a bug in INFO's parse routine with floating point numbers. Thanks @@ -4,13 +4,17 @@ redis-py The Python interface to the Redis key-value store. .. image:: https://secure.travis-ci.org/andymccurdy/redis-py.png?branch=master - :target: http://travis-ci.org/andymccurdy/redis-py + :target: https://travis-ci.org/andymccurdy/redis-py +.. image:: https://readthedocs.org/projects/redis-py/badge/?version=latest&style=flat + :target: https://redis-py.readthedocs.io/en/latest/ +.. image:: https://badge.fury.io/py/redis.svg + :target: https://pypi.org/project/redis/ Installation ------------ redis-py requires a running Redis server. See `Redis's quickstart -<http://redis.io/topics/quickstart>`_ for installation instructions. +<https://redis.io/topics/quickstart>`_ for installation instructions. To install redis-py, simply: @@ -18,12 +22,6 @@ To install redis-py, simply: $ sudo pip install redis -or alternatively (you really should be using pip though): - -.. code-block:: bash - - $ sudo easy_install redis - or from source: .. code-block:: bash @@ -55,7 +53,7 @@ specified. API Reference ------------- -The `official Redis command documentation <http://redis.io/commands>`_ does a +The `official Redis command documentation <https://redis.io/commands>`_ does a great job of explaining each command in detail. redis-py exposes two client classes that implement these commands. The StrictRedis class attempts to adhere to the official command syntax. There are a few exceptions: @@ -158,19 +156,12 @@ kind enough to create Python bindings. Using Hiredis can provide up to a performance increase is most noticeable when retrieving many pieces of data, such as from LRANGE or SMEMBERS operations. -Hiredis is available on PyPI, and can be installed via pip or easy_install -just like redis-py. +Hiredis is available on PyPI, and can be installed via pip just like redis-py. .. code-block:: bash $ pip install hiredis -or - -.. code-block:: bash - - $ easy_install hiredis - Response Callbacks ^^^^^^^^^^^^^^^^^^ @@ -272,7 +263,7 @@ could do something like this: .. code-block:: pycon >>> with r.pipeline() as pipe: - ... while 1: + ... while True: ... try: ... # put a WATCH on the key that holds our sequence value ... pipe.watch('OUR-SEQUENCE-KEY') @@ -305,7 +296,7 @@ explicitly calling reset(): .. code-block:: pycon >>> pipe = r.pipeline() - >>> while 1: + >>> while True: ... try: ... pipe.watch('OUR-SEQUENCE-KEY') ... ... @@ -544,7 +535,7 @@ supported: 1204 -LUA Scripting +Lua Scripting ^^^^^^^^^^^^^ redis-py supports the EVAL, EVALSHA, and SCRIPT commands. However, there are @@ -553,10 +544,10 @@ scenarios. Therefore, redis-py exposes a Script object that makes scripting much easier to use. To create a Script instance, use the `register_script` function on a client -instance passing the LUA code as the first argument. `register_script` returns +instance passing the Lua code as the first argument. `register_script` returns a Script instance that you can use throughout your code. -The following trivial LUA script accepts two parameters: the name of a key and +The following trivial Lua script accepts two parameters: the name of a key and a multiplier value. The script fetches the value stored in the key, multiplies it with the multiplier value and returns the result. @@ -573,8 +564,8 @@ it with the multiplier value and returns the result. function. Script instances accept the following optional arguments: * **keys**: A list of key names that the script will access. This becomes the - KEYS list in LUA. -* **args**: A list of argument values. This becomes the ARGV list in LUA. + KEYS list in Lua. +* **args**: A list of argument values. This becomes the ARGV list in Lua. * **client**: A redis-py Client or Pipeline instance that will invoke the script. If client isn't specified, the client that intiially created the Script instance (the one that `register_script` was @@ -589,7 +580,7 @@ Continuing the example from above: 10 The value of key 'foo' is set to 2. When multiply is invoked, the 'foo' key is -passed to the script along with the multiplier value of 5. LUA executes the +passed to the script along with the multiplier value of 5. Lua executes the script and returns the result, 10. Script instances can be executed using a different client instance, even one @@ -602,7 +593,7 @@ that points to a completely different Redis server. >>> multiply(keys=['foo'], args=[5], client=r2) 15 -The Script object ensures that the LUA script is loaded into Redis's script +The Script object ensures that the Lua script is loaded into Redis's script cache. In the event of a NOSCRIPT error, it will load the script and retry executing it. @@ -622,7 +613,7 @@ execution. Sentinel support ^^^^^^^^^^^^^^^^ -redis-py can be used together with `Redis Sentinel <http://redis.io/topics/sentinel>`_ +redis-py can be used together with `Redis Sentinel <https://redis.io/topics/sentinel>`_ to discover Redis nodes. You need to have at least one Sentinel daemon running in order to use redis-py's Sentinel support. @@ -663,7 +654,7 @@ If no slaves can be connected to, a connection will be established with the master. See `Guidelines for Redis clients with support for Redis Sentinel -<http://redis.io/topics/sentinel-clients>`_ to learn more about Redis Sentinel. +<https://redis.io/topics/sentinel-clients>`_ to learn more about Redis Sentinel. Scan Iterators ^^^^^^^^^^^^^^ @@ -687,7 +678,7 @@ Author ^^^^^^ redis-py is developed and maintained by Andy McCurdy (sedrik@gmail.com). -It can be found here: http://github.com/andymccurdy/redis-py +It can be found here: https://github.com/andymccurdy/redis-py Special thanks to: diff --git a/benchmarks/basic_operations.py b/benchmarks/basic_operations.py index 2f1e9f4..c50a610 100644 --- a/benchmarks/basic_operations.py +++ b/benchmarks/basic_operations.py @@ -195,5 +195,6 @@ def hmset(conn, num, pipeline_size, data_size): if pipeline_size > 1: conn.execute() + if __name__ == '__main__': run() diff --git a/benchmarks/command_packer_benchmark.py b/benchmarks/command_packer_benchmark.py index 13d6f97..9eb1853 100644 --- a/benchmarks/command_packer_benchmark.py +++ b/benchmarks/command_packer_benchmark.py @@ -22,15 +22,15 @@ class StringJoiningConnection(Connection): _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def pack_command(self, *args): "Pack a series of arguments into a value Redis command" args_output = SYM_EMPTY.join([ SYM_EMPTY.join((SYM_DOLLAR, b(str(len(k))), SYM_CRLF, k, SYM_CRLF)) - for k in imap(self.encode, args)]) + for k in imap(self.encoder.encode, args)]) output = SYM_EMPTY.join( (SYM_STAR, b(str(len(args))), SYM_CRLF, args_output)) return output @@ -54,16 +54,16 @@ class ListJoiningConnection(Connection): _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def pack_command(self, *args): output = [] buff = SYM_EMPTY.join( (SYM_STAR, b(str(len(args))), SYM_CRLF)) - for k in imap(self.encode, args): + for k in imap(self.encoder.encode, args): if len(buff) > 6000 or len(k) > 6000: buff = SYM_EMPTY.join( (buff, SYM_DOLLAR, b(str(len(k))), SYM_CRLF)) diff --git a/build_tools/.bash_profile b/build_tools/.bash_profile new file mode 100644 index 0000000..b023cf7 --- /dev/null +++ b/build_tools/.bash_profile @@ -0,0 +1 @@ +PATH=$PATH:/var/lib/redis/bin diff --git a/vagrant/bootstrap.sh b/build_tools/bootstrap.sh index a5a0d2c..a5a0d2c 100755 --- a/vagrant/bootstrap.sh +++ b/build_tools/bootstrap.sh diff --git a/vagrant/build_redis.sh b/build_tools/build_redis.sh index 728e617..379c6cc 100755 --- a/vagrant/build_redis.sh +++ b/build_tools/build_redis.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source /home/vagrant/redis-py/vagrant/redis_vars.sh +source /home/vagrant/redis-py/build_tools/redis_vars.sh pushd /home/vagrant diff --git a/vagrant/install_redis.sh b/build_tools/install_redis.sh index bb5f1d2..fd53a1c 100755 --- a/vagrant/install_redis.sh +++ b/build_tools/install_redis.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source /home/vagrant/redis-py/vagrant/redis_vars.sh +source /home/vagrant/redis-py/build_tools/redis_vars.sh for filename in `ls $VAGRANT_REDIS_CONF_DIR`; do # cuts the order prefix off of the filename, e.g. 001-master -> master diff --git a/vagrant/install_sentinel.sh b/build_tools/install_sentinel.sh index 58cd808..0597208 100755 --- a/vagrant/install_sentinel.sh +++ b/build_tools/install_sentinel.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source /home/vagrant/redis-py/vagrant/redis_vars.sh +source /home/vagrant/redis-py/build_tools/redis_vars.sh for filename in `ls $VAGRANT_SENTINEL_CONF_DIR`; do # cuts the order prefix off of the filename, e.g. 001-master -> master diff --git a/vagrant/redis-configs/001-master b/build_tools/redis-configs/001-master index f04f23d..8591f1a 100644 --- a/vagrant/redis-configs/001-master +++ b/build_tools/redis-configs/001-master @@ -5,4 +5,4 @@ daemonize yes unixsocket /tmp/redis_master.sock unixsocketperm 777 dbfilename master.rdb -dir /home/vagrant/redis/backups +dir /var/lib/redis/backups diff --git a/vagrant/redis-configs/002-slave b/build_tools/redis-configs/002-slave index 5d302fe..13eb77e 100644 --- a/vagrant/redis-configs/002-slave +++ b/build_tools/redis-configs/002-slave @@ -5,6 +5,6 @@ daemonize yes unixsocket /tmp/redis-slave.sock unixsocketperm 777 dbfilename slave.rdb -dir /home/vagrant/redis/backups +dir /var/lib/redis/backups slaveof 127.0.0.1 6379 diff --git a/vagrant/redis_init_script b/build_tools/redis_init_script index e8bfa08..04cb2db 100755 --- a/vagrant/redis_init_script +++ b/build_tools/redis_init_script @@ -12,10 +12,10 @@ REDISPORT={{ PORT }} PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf -EXEC=/home/vagrant/redis/bin/redis-server -CLIEXEC=/home/vagrant/redis/bin/redis-cli +EXEC=/var/lib/redis/bin/redis-server +CLIEXEC=/var/lib/redis/bin/redis-cli case "$1" in start) diff --git a/vagrant/redis_vars.sh b/build_tools/redis_vars.sh index 1ec6295..c52dd4c 100755 --- a/vagrant/redis_vars.sh +++ b/build_tools/redis_vars.sh @@ -1,13 +1,13 @@ #!/usr/bin/env bash -VAGRANT_DIR=/home/vagrant/redis-py/vagrant +VAGRANT_DIR=/home/vagrant/redis-py/build_tools VAGRANT_REDIS_CONF_DIR=$VAGRANT_DIR/redis-configs VAGRANT_SENTINEL_CONF_DIR=$VAGRANT_DIR/sentinel-configs REDIS_VERSION=3.2.0 REDIS_DOWNLOAD_DIR=/home/vagrant/redis-downloads REDIS_PACKAGE=redis-$REDIS_VERSION.tar.gz REDIS_BUILD_DIR=$REDIS_DOWNLOAD_DIR/redis-$REDIS_VERSION -REDIS_DIR=/home/vagrant/redis +REDIS_DIR=/var/lib/redis REDIS_BIN_DIR=$REDIS_DIR/bin REDIS_CONF_DIR=$REDIS_DIR/conf REDIS_SAVE_DIR=$REDIS_DIR/backups diff --git a/vagrant/sentinel-configs/001-1 b/build_tools/sentinel-configs/001-1 index eccc3d1..eccc3d1 100644 --- a/vagrant/sentinel-configs/001-1 +++ b/build_tools/sentinel-configs/001-1 diff --git a/vagrant/sentinel-configs/002-2 b/build_tools/sentinel-configs/002-2 index 0cd2801..0cd2801 100644 --- a/vagrant/sentinel-configs/002-2 +++ b/build_tools/sentinel-configs/002-2 diff --git a/vagrant/sentinel-configs/003-3 b/build_tools/sentinel-configs/003-3 index c7f4fcd..c7f4fcd 100644 --- a/vagrant/sentinel-configs/003-3 +++ b/build_tools/sentinel-configs/003-3 diff --git a/vagrant/sentinel_init_script b/build_tools/sentinel_init_script index ea93537..1d94804 100755 --- a/vagrant/sentinel_init_script +++ b/build_tools/sentinel_init_script @@ -12,10 +12,10 @@ SENTINELPORT={{ PORT }} PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf -EXEC=/home/vagrant/redis/bin/redis-sentinel -CLIEXEC=/home/vagrant/redis/bin/redis-cli +EXEC=/var/lib/redis/bin/redis-sentinel +CLIEXEC=/var/lib/redis/bin/redis-cli case "$1" in start) diff --git a/redis/_compat.py b/redis/_compat.py index 307f3cc..de856f3 100644 --- a/redis/_compat.py +++ b/redis/_compat.py @@ -4,7 +4,7 @@ import sys try: InterruptedError = InterruptedError -except: +except NameError: InterruptedError = OSError # For Python older than 3.5, retry EINTR. @@ -13,7 +13,6 @@ if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and # Adapted from https://bugs.python.org/review/23863/patch/14532/54418 import socket import time - import errno from select import select as _select diff --git a/redis/client.py b/redis/client.py index 59edcda..f4f7489 100755 --- a/redis/client.py +++ b/redis/client.py @@ -114,7 +114,8 @@ def parse_info(response): for line in response.splitlines(): if line and not line.startswith('#'): if line.find(':') != -1: - key, value = line.split(':', 1) + # support keys that include ':' by using rsplit + key, value = line.rsplit(':', 1) info[key] = get_value(value) else: # if the line isn't splittable, append it to the "__raw__" key @@ -182,10 +183,15 @@ def parse_sentinel_get_master(response): return response and (response[0], int(response[1])) or None -def pairs_to_dict(response): +def pairs_to_dict(response, decode_keys=False): "Create a dict given a list of key/value pairs" - it = iter(response) - return dict(izip(it, it)) + if decode_keys: + # the iter form is faster, but I don't know how to make that work + # with a nativestr() map + return dict(izip(imap(nativestr, response[::2]), response[1::2])) + else: + it = iter(response) + return dict(izip(it, it)) def pairs_to_dict_typed(response, type_info): @@ -195,7 +201,7 @@ def pairs_to_dict_typed(response, type_info): if key in type_info: try: value = type_info[key](value) - except: + except Exception: # if for some reason the value can't be coerced, just use # the string value pass @@ -232,6 +238,58 @@ def int_or_none(response): return int(response) +def parse_stream_list(response): + if response is None: + return None + return [(r[0], pairs_to_dict(r[1])) for r in response] + + +def pairs_to_dict_with_nativestr_keys(response): + return pairs_to_dict(response, decode_keys=True) + + +def parse_list_of_dicts(response): + return list(imap(pairs_to_dict_with_nativestr_keys, response)) + + +def parse_xclaim(response, **options): + if options.get('parse_justid', False): + return response + return parse_stream_list(response) + + +def parse_xinfo_stream(response): + data = pairs_to_dict(response, decode_keys=True) + first = data['first-entry'] + data['first-entry'] = (first[0], pairs_to_dict(first[1])) + last = data['last-entry'] + data['last-entry'] = (last[0], pairs_to_dict(last[1])) + return data + + +def parse_xread(response): + if response is None: + return [] + return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response] + + +def parse_xpending(response, **options): + if options.get('parse_detail', False): + return parse_xpending_range(response) + consumers = [{'name': n, 'pending': long(p)} for n, p in response[3] or []] + return { + 'pending': response[0], + 'min': response[1], + 'max': response[2], + 'consumers': consumers + } + + +def parse_xpending_range(response): + k = ('message_id', 'consumer', 'time_since_delivered', 'times_delivered') + return [dict(izip(k, r)) for r in response] + + def float_or_none(response): if response is None: return None @@ -366,11 +424,11 @@ class StrictRedis(object): 'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE ' 'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD ' 'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ' - 'GEOADD', + 'GEOADD XACK XDEL XLEN XTRIM', int ), string_keys_to_dict( - 'INCRBYFLOAT HINCRBYFLOAT GEODIST', + 'INCRBYFLOAT HINCRBYFLOAT', float ), string_keys_to_dict( @@ -379,7 +437,7 @@ class StrictRedis(object): lambda r: isinstance(r, (long, int)) and r or nativestr(r) == 'OK' ), string_keys_to_dict('SORT', sort_return_tuples), - string_keys_to_dict('ZSCORE ZINCRBY', float_or_none), + string_keys_to_dict('ZSCORE ZINCRBY GEODIST', float_or_none), string_keys_to_dict( 'FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE RENAME ' 'SAVE SELECT SHUTDOWN SLAVEOF WATCH UNWATCH', @@ -391,26 +449,53 @@ class StrictRedis(object): lambda r: r and set(r) or set() ), string_keys_to_dict( - 'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', + 'ZPOPMAX ZPOPMIN ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', zset_score_pairs ), + string_keys_to_dict('BZPOPMIN BZPOPMAX', \ + lambda r: r and (r[0], r[1], float(r[2])) or None), string_keys_to_dict('ZRANK ZREVRANK', int_or_none), + string_keys_to_dict('XREVRANGE XRANGE', parse_stream_list), + string_keys_to_dict('XREAD XREADGROUP', parse_xread), string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True), { 'CLIENT GETNAME': lambda r: r and nativestr(r), 'CLIENT KILL': bool_ok, 'CLIENT LIST': parse_client_list, 'CLIENT SETNAME': bool_ok, + 'CLUSTER ADDSLOTS': bool_ok, + 'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x), + 'CLUSTER COUNTKEYSINSLOT': lambda x: int(x), + 'CLUSTER DELSLOTS': bool_ok, + 'CLUSTER FAILOVER': bool_ok, + 'CLUSTER FORGET': bool_ok, + 'CLUSTER INFO': parse_cluster_info, + 'CLUSTER KEYSLOT': lambda x: int(x), + 'CLUSTER MEET': bool_ok, + 'CLUSTER NODES': parse_cluster_nodes, + 'CLUSTER REPLICATE': bool_ok, + 'CLUSTER RESET': bool_ok, + 'CLUSTER SAVECONFIG': bool_ok, + 'CLUSTER SET-CONFIG-EPOCH': bool_ok, + 'CLUSTER SETSLOT': bool_ok, + 'CLUSTER SLAVES': parse_cluster_nodes, 'CONFIG GET': parse_config_get, 'CONFIG RESETSTAT': bool_ok, 'CONFIG SET': bool_ok, 'DEBUG OBJECT': parse_debug_object, + 'GEOHASH': lambda r: list(map(nativestr, r)), + 'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]), + float(ll[1])) + if ll is not None else None, r)), + 'GEORADIUS': parse_georadius_generic, + 'GEORADIUSBYMEMBER': parse_georadius_generic, 'HGETALL': lambda r: r and pairs_to_dict(r) or {}, 'HSCAN': parse_hscan, 'INFO': parse_info, 'LASTSAVE': timestamp_to_datetime, 'OBJECT': parse_object, 'PING': lambda r: nativestr(r) == 'PONG', + 'PUBSUB NUMSUB': parse_pubsub_numsub, 'RANDOMKEY': lambda r: r and r or None, 'SCAN': parse_scan, 'SCRIPT EXISTS': lambda r: list(imap(bool, r)), @@ -431,30 +516,16 @@ class StrictRedis(object): 'SLOWLOG RESET': bool_ok, 'SSCAN': parse_scan, 'TIME': lambda x: (int(x[0]), int(x[1])), + 'XCLAIM': parse_xclaim, + 'XGROUP CREATE': bool_ok, + 'XGROUP DELCONSUMER': int, + 'XGROUP DESTROY': bool, + 'XGROUP SETID': bool_ok, + 'XINFO CONSUMERS': parse_list_of_dicts, + 'XINFO GROUPS': parse_list_of_dicts, + 'XINFO STREAM': parse_xinfo_stream, + 'XPENDING': parse_xpending, 'ZSCAN': parse_zscan, - 'CLUSTER ADDSLOTS': bool_ok, - 'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x), - 'CLUSTER COUNTKEYSINSLOT': lambda x: int(x), - 'CLUSTER DELSLOTS': bool_ok, - 'CLUSTER FAILOVER': bool_ok, - 'CLUSTER FORGET': bool_ok, - 'CLUSTER INFO': parse_cluster_info, - 'CLUSTER KEYSLOT': lambda x: int(x), - 'CLUSTER MEET': bool_ok, - 'CLUSTER NODES': parse_cluster_nodes, - 'CLUSTER REPLICATE': bool_ok, - 'CLUSTER RESET': bool_ok, - 'CLUSTER SAVECONFIG': bool_ok, - 'CLUSTER SET-CONFIG-EPOCH': bool_ok, - 'CLUSTER SETSLOT': bool_ok, - 'CLUSTER SLAVES': parse_cluster_nodes, - 'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]), - float(ll[1])) - if ll is not None else None, r)), - 'GEOHASH': lambda r: list(map(nativestr, r)), - 'GEORADIUS': parse_georadius_generic, - 'GEORADIUSBYMEMBER': parse_georadius_generic, - 'PUBSUB NUMSUB': parse_pubsub_numsub, } ) @@ -587,7 +658,7 @@ class StrictRedis(object): value_from_callable = kwargs.pop('value_from_callable', False) watch_delay = kwargs.pop('watch_delay', None) with self.pipeline(True, shard_hint) as pipe: - while 1: + while True: try: if watches: pipe.watch(*watches) @@ -1683,6 +1754,328 @@ class StrictRedis(object): args = list_or_args(keys, args) return self.execute_command('SUNIONSTORE', dest, *args) + # STREAMS COMMANDS + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + + def xadd(self, name, fields, id='*', maxlen=None, approximate=True): + """ + Add to a stream. + name: name of the stream + fields: dict of field/value pairs to insert into the stream + id: Location to insert this record. By default it is appended. + maxlen: truncate old stream members beyond this size + approximate: actual stream length may be slightly more than maxlen + + """ + pieces = [] + if maxlen is not None: + if not isinstance(maxlen, (int, long)) or maxlen < 1: + raise RedisError('XADD maxlen must be a positive integer') + pieces.append(Token.get_token('MAXLEN')) + if approximate: + pieces.append(Token.get_token('~')) + pieces.append(str(maxlen)) + pieces.append(id) + if not isinstance(fields, dict) or len(fields) == 0: + raise RedisError('XADD fields must be a non-empty dict') + for pair in iteritems(fields): + pieces.extend(pair) + return self.execute_command('XADD', name, *pieces) + + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, + idle=None, time=None, retrycount=None, force=False, + justid=False): + """ + Changes the ownership of a pending message. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds + message_ids: non-empty list or tuple of message IDs to claim + idle: optional. Set the idle time (last time it was delivered) of the + message in ms + time: optional integer. This is the same as idle but instead of a + relative amount of milliseconds, it sets the idle time to a specific + Unix time (in milliseconds). + retrycount: optional integer. set the retry counter to the specified + value. This counter is incremented every time a message is delivered + again. + force: optional boolean, false by default. Creates the pending message + entry in the PEL even if certain specified IDs are not already in the + PEL assigned to a different client. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message + """ + if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0: + raise RedisError("XCLAIM min_idle_time must be a non negative " + "integer") + if not isinstance(message_ids, (list, tuple)) or not message_ids: + raise RedisError("XCLAIM message_ids must be a non empty list or " + "tuple of message IDs to claim") + + kwargs = {} + pieces = [name, groupname, consumername, str(min_idle_time)] + pieces.extend(list(message_ids)) + + if idle is not None: + if not isinstance(idle, (int, long)): + raise RedisError("XCLAIM idle must be an integer") + pieces.extend((Token.get_token('IDLE'), str(idle))) + if time is not None: + if not isinstance(time, (int, long)): + raise RedisError("XCLAIM time must be an integer") + pieces.extend((Token.get_token('TIME'), str(time))) + if retrycount is not None: + if not isinstance(retrycount, (int, long)): + raise RedisError("XCLAIM retrycount must be an integer") + pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount))) + + if force: + if not isinstance(force, bool): + raise RedisError("XCLAIM force must be a boolean") + pieces.append(Token.get_token('FORCE')) + if justid: + if not isinstance(justid, bool): + raise RedisError("XCLAIM justid must be a boolean") + pieces.append(Token.get_token('JUSTID')) + kwargs['parse_justid'] = True + return self.execute_command('XCLAIM', *pieces, **kwargs) + + def xdel(self, name, *ids): + """ + Deletes one or more messages from a stream. + name: name of the stream. + *ids: message ids to delete. + """ + return self.execute_command('XDEL', name, *ids) + + def xgroup_create(self, name, groupname, id='$', mkstream=False): + """ + Create a new consumer group associated with a stream. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + pieces = ['XGROUP CREATE', name, groupname, id] + if mkstream: + pieces.append('MKSTREAM') + return self.execute_command(*pieces) + + def xgroup_delconsumer(self, name, groupname, consumername): + """ + Remove a specific consumer from a consumer group. + Returns the number of pending messages that the consumer had before it + was deleted. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to delete + """ + return self.execute_command('XGROUP DELCONSUMER', name, groupname, + consumername) + + def xgroup_destroy(self, name, groupname): + """ + Destroy a consumer group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XGROUP DESTROY', name, groupname) + + def xgroup_setid(self, name, groupname, id): + """ + Set the consumer group last delivered ID to something else. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP SETID', name, groupname, id) + + def xinfo_consumers(self, name, groupname): + """ + Returns general information about the consumers in the group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) + + def xinfo_stream(self, name): + """ + Returns general information about the stream. + name: name of the stream. + """ + return self.execute_command('XINFO STREAM', name) + + def xlen(self, name): + """ + Returns the number of elements in a given stream. + """ + return self.execute_command('XLEN', name) + + def xpending(self, name, groupname): + """ + Returns information about pending messages of a group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XPENDING', name, groupname) + + def xpending_range(self, name, groupname, min='-', max='+', count=-1, + consumername=None): + """ + Returns information about pending messages, in a range. + name: name of the stream. + groupname: name of the consumer group. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + consumername: name of a consumer to filter by (optional). + """ + pieces = [name, groupname] + if min is not None or max is not None or count is not None: + if min is None or max is None or count is None: + raise RedisError("XPENDING must be provided with min, max " + "and count parameters, or none of them. ") + if not isinstance(count, (int, long)) or count < -1: + raise RedisError("XPENDING count must be a integer >= -1") + pieces.extend((min, max, str(count))) + if consumername is not None: + if min is None or max is None or count is None: + raise RedisError("if XPENDING is provided with consumername," + " it must be provided with min, max and" + " count parameters") + pieces.append(consumername) + return self.execute_command('XPENDING', *pieces, parse_detail=True) + + def xrange(self, name, min='-', max='+', count=None): + """ + Read stream values within an interval. + name: name of the stream. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + """ + pieces = [min, max] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XRANGE', name, *pieces) + + def xread(self, streams, count=None, block=None): + """ + Block and monitor multiple streams for new data. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + """ + pieces = [] + if block is not None: + if not isinstance(block, (int, long)) or block < 0: + raise RedisError('XREAD block must be a non-negative integer') + pieces.append(Token.get_token('BLOCK')) + pieces.append(str(block)) + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREAD count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREAD streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + keys, values = izip(*iteritems(streams)) + pieces.extend(keys) + pieces.extend(values) + return self.execute_command('XREAD', *pieces) + + def xreadgroup(self, groupname, consumername, streams, count=None, + block=None): + """ + Read from a stream via a consumer group. + groupname: name of the consumer group. + consumername: name of the requesting consumer. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + """ + pieces = [Token.get_token('GROUP'), groupname, consumername] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError("XREADGROUP count must be a positive integer") + pieces.append(Token.get_token("COUNT")) + pieces.append(str(count)) + if block is not None: + if not isinstance(block, (int, long)) or block < 0: + raise RedisError("XREADGROUP block must be a non-negative " + "integer") + pieces.append(Token.get_token("BLOCK")) + pieces.append(str(block)) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREADGROUP streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) + return self.execute_command('XREADGROUP', *pieces) + + def xrevrange(self, name, max='+', min='-', count=None): + """ + Read stream values within an interval, in reverse order. + name: name of the stream + start: first stream ID. defaults to '+', + meaning the latest available. + finish: last stream ID. defaults to '-', + meaning the earliest available. + count: if set, only return this many items, beginning with the + latest available. + """ + pieces = [max, min] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREVRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XREVRANGE', name, *pieces) + + def xtrim(self, name, maxlen, approximate=True): + """ + Trims old messages from a stream. + name: name of the stream. + maxlen: truncate old stream messages beyond this size + approximate: actual stream length may be slightly more than maxlen + """ + pieces = [Token.get_token('MAXLEN')] + if approximate: + pieces.append(Token.get_token('~')) + pieces.append(maxlen) + return self.execute_command('XTRIM', name, *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ @@ -1736,6 +2129,68 @@ class StrictRedis(object): """ return self.execute_command('ZLEXCOUNT', name, min, max) + def zpopmax(self, name, count=None): + """ + Remove and return up to ``count`` members with the highest scores + from the sorted set ``name``. + """ + args = (count is not None) and [count] or [] + options = { + 'withscores': True + } + return self.execute_command('ZPOPMAX', name, *args, **options) + + def zpopmin(self, name, count=None): + """ + Remove and return up to ``count`` members with the lowest scores + from the sorted set ``name``. + """ + args = (count is not None) and [count] or [] + options = { + 'withscores': True + } + return self.execute_command('ZPOPMIN', name, *args, **options) + + def bzpopmax(self, keys, timeout=0): + """ + ZPOPMAX a value off of the first non-empty sorted set + named in the ``keys`` list. + + If none of the sorted sets in ``keys`` has a value to ZPOPMAX, + then block for ``timeout`` seconds, or until a member gets added + to one of the sorted sets. + + If timeout is 0, then block indefinitely. + """ + if timeout is None: + timeout = 0 + if isinstance(keys, basestring): + keys = [keys] + else: + keys = list(keys) + keys.append(timeout) + return self.execute_command('BZPOPMAX', *keys) + + def bzpopmin(self, keys, timeout=0): + """ + ZPOPMIN a value off of the first non-empty sorted set + named in the ``keys`` list. + + If none of the sorted sets in ``keys`` has a value to ZPOPMIN, + then block for ``timeout`` seconds, or until a member gets added + to one of the sorted sets. + + If timeout is 0, then block indefinitely. + """ + if timeout is None: + timeout = 0 + if isinstance(keys, basestring): + keys = [keys] + else: + keys = list(keys) + keys.append(timeout) + return self.execute_command('BZPOPMIN', *keys) + def zrange(self, name, start, end, desc=False, withscores=False, score_cast_func=float): """ @@ -2147,14 +2602,14 @@ class StrictRedis(object): def geohash(self, name, *values): """ Return the geo hash string for each item of ``values`` members of - the specified key identified by the ``name``argument. + the specified key identified by the ``name`` argument. """ return self.execute_command('GEOHASH', name, *values) def geopos(self, name, *values): """ Return the positions of each item of ``values`` as members of - the specified key identified by the ``name``argument. Each position + the specified key identified by the ``name`` argument. Each position is represented by the pairs lon and lat. """ return self.execute_command('GEOPOS', name, *values) diff --git a/redis/connection.py b/redis/connection.py index 6ad467a..00c3311 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -236,7 +236,7 @@ class SocketBuffer(object): try: self.purge() self._buffer.close() - except: + except Exception: # issue #633 suggests the purge/close somehow raised a # BadFileDescriptor error. Perhaps the client ran out of # memory or something else? It's probably OK to ignore @@ -602,9 +602,9 @@ class Connection(object): errmsg = e.args[1] raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def send_command(self, *args): "Pack and send a command to the Redis server" @@ -623,9 +623,9 @@ class Connection(object): "Read the response from a previously sent command" try: response = self._parser.read_response() - except: + except Exception as e: self.disconnect() - raise + raise e if isinstance(response, ResponseError): raise response return response @@ -779,7 +779,8 @@ URL_QUERY_ARGUMENT_PARSERS = { 'socket_timeout': float, 'socket_connect_timeout': float, 'socket_keepalive': to_bool, - 'retry_on_timeout': to_bool + 'retry_on_timeout': to_bool, + 'max_connections': int, } @@ -799,11 +800,11 @@ class ConnectionPool(object): Three URL schemes are supported: - ```redis://`` - <http://www.iana.org/assignments/uri-schemes/prov/redis>`_ creates a + <https://www.iana.org/assignments/uri-schemes/prov/redis>`_ creates a normal TCP socket connection - ```rediss://`` - <http://www.iana.org/assignments/uri-schemes/prov/rediss>`_ creates a - SSL wrapped TCP socket connection + <https://www.iana.org/assignments/uri-schemes/prov/rediss>`_ creates + a SSL wrapped TCP socket connection - ``unix://`` creates a Unix Domain Socket connection There are several ways to specify a database number. The parse function @@ -829,6 +830,7 @@ class ConnectionPool(object): True/False, Yes/No values to indicate state. Invalid types cause a ``UserWarning`` to be raised. In the case of conflicting arguments, querystring arguments always win. + """ url_string = url url = urlparse(url) diff --git a/redis/lock.py b/redis/lock.py index 732568b..bc7e850 100644 --- a/redis/lock.py +++ b/redis/lock.py @@ -107,7 +107,7 @@ class Lock(object): stop_trying_at = None if blocking_timeout is not None: stop_trying_at = mod_time.time() + blocking_timeout - while 1: + while True: if self.do_acquire(token): self.local.token = token return True @@ -1,4 +1,4 @@ -[pep8] +[pycodestyle] show-source = 1 exclude = .venv,.tox,dist,docs,build,*.egg @@ -36,7 +36,7 @@ setup( version=__version__, description='Python client for Redis key-value store', long_description=long_description, - url='http://github.com/andymccurdy/redis-py', + url='https://github.com/andymccurdy/redis-py', author='Andy McCurdy', author_email='sedrik@gmail.com', maintainer='Andy McCurdy', diff --git a/tests/test_commands.py b/tests/test_commands.py index b9b9b66..6394a7a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2,11 +2,12 @@ from __future__ import with_statement import binascii import datetime import pytest +import re import redis import time from redis._compat import (unichr, u, b, ascii_letters, iteritems, iterkeys, - itervalues) + itervalues, long) from redis.client import parse_info from redis import exceptions @@ -34,6 +35,13 @@ def redis_server_time(client): return datetime.datetime.fromtimestamp(timestamp) +def get_stream_message(client, stream, message_id): + "Fetch a stream message and format it as a (message_id, fields) pair" + response = client.xrange(stream, min=message_id, max=message_id) + assert len(response) == 1 + return response[0] + + # RESPONSE CALLBACKS class TestResponseCallbacks(object): "Tests for the response callback system" @@ -70,11 +78,10 @@ class TestRedisCommands(object): @skip_if_server_version_lt('2.6.9') def test_client_list_after_client_setname(self, r): - r.client_setname('cl=i=ent') + r.client_setname('redis_py_test') clients = r.client_list() - assert isinstance(clients[0], dict) - assert 'name' in clients[0] - assert clients[0]['name'] == 'cl=i=ent' + # we don't know which client ours will be + assert 'redis_py_test' in [c['name'] for c in clients] def test_config_get(self, r): data = r.config_get() @@ -969,6 +976,48 @@ class TestRedisCommands(object): assert r.zrange('d', 0, -1, withscores=True) == \ [(b('a3'), 20), (b('a1'), 23)] + @skip_if_server_version_lt('4.9.0') + def test_zpopmax(self, r): + r.zadd('a', a1=1, a2=2, a3=3) + assert r.zpopmax('a') == [(b('a3'), 3)] + + # with count + assert r.zpopmax('a', count=2) == \ + [(b('a2'), 2), (b('a1'), 1)] + + @skip_if_server_version_lt('4.9.0') + def test_zpopmin(self, r): + r.zadd('a', a1=1, a2=2, a3=3) + assert r.zpopmin('a') == [(b('a1'), 1)] + + # with count + assert r.zpopmin('a', count=2) == \ + [(b('a2'), 2), (b('a3'), 3)] + + @skip_if_server_version_lt('4.9.0') + def test_bzpopmax(self, r): + r.zadd('a', a1=1, a2=2) + r.zadd('b', b1=10, b2=20) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b2'), 20) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b1'), 10) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a2'), 2) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a1'), 1) + assert r.bzpopmax(['b', 'a'], timeout=1) is None + r.zadd('c', c1=100) + assert r.bzpopmax('c', timeout=1) == (b('c'), b('c1'), 100) + + @skip_if_server_version_lt('4.9.0') + def test_bzpopmin(self, r): + r.zadd('a', a1=1, a2=2) + r.zadd('b', b1=10, b2=20) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b1'), 10) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b2'), 20) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a1'), 1) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a2'), 2) + assert r.bzpopmin(['b', 'a'], timeout=1) is None + r.zadd('c', c1=100) + assert r.bzpopmin('c', timeout=1) == (b('c'), b('c1'), 100) + def test_zrange(self, r): r.zadd('a', a1=1, a2=2, a3=3) assert r.zrange('a', 0, 1) == [b('a1'), b('a2')] @@ -1600,9 +1649,447 @@ class TestRedisCommands(object): ['place1', 0.0, 3471609698139488, (2.1909382939338684, 41.433790281840835)]] + @skip_if_server_version_lt('5.0.0') + def test_xack(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + # xack on a stream that doesn't exist + assert r.xack(stream, group, '0-0') == 0 + + m1 = r.xadd(stream, {'one': 'one'}) + m2 = r.xadd(stream, {'two': 'two'}) + m3 = r.xadd(stream, {'three': 'three'}) + + # xack on a group that doesn't exist + assert r.xack(stream, group, m1) == 0 + + r.xgroup_create(stream, group, 0) + r.xreadgroup(group, consumer, streams={stream: 0}) + # xack returns the number of ack'd elements + assert r.xack(stream, group, m1) == 1 + assert r.xack(stream, group, m2, m3) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xadd(self, r): + stream = 'stream' + message_id = r.xadd(stream, {'foo': 'bar'}) + assert re.match(br'[0-9]+\-[0-9]+', message_id) + + # explicit message id + message_id = b('9999999999999999999-0') + assert message_id == r.xadd(stream, {'foo': 'bar'}, id=message_id) + + # with maxlen, the list evicts the first message + r.xadd(stream, {'foo': 'bar'}, maxlen=2, approximate=False) + assert r.xlen(stream) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xclaim(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + + message_id = r.xadd(stream, {'john': 'wick'}) + message = get_stream_message(r, stream, message_id) + r.xgroup_create(stream, group, 0) + + # trying to claim a message that isn't already pending doesn't + # do anything + response = r.xclaim(stream, group, consumer2, + min_idle_time=0, message_ids=(message_id,)) + assert response == [] + + # read the group as consumer1 to initially claim the messages + r.xreadgroup(group, consumer1, streams={stream: 0}) + + # claim the message as consumer2 + response = r.xclaim(stream, group, consumer2, + min_idle_time=0, message_ids=(message_id,)) + assert response[0] == message + + # reclaim the message as consumer1, but use the justid argument + # which only returns message ids + assert r.xclaim(stream, group, consumer1, + min_idle_time=0, message_ids=(message_id,), + justid=True) == [message_id] + + @skip_if_server_version_lt('5.0.0') + def test_xdel(self, r): + stream = 'stream' + + # deleting from an empty stream doesn't do anything + assert r.xdel(stream, 1) == 0 + + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) + + # xdel returns the number of deleted elements + assert r.xdel(stream, m1) == 1 + assert r.xdel(stream, m2, m3) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xgroup_create(self, r): + # tests xgroup_create and xinfo_groups + stream = 'stream' + group = 'group' + r.xadd(stream, {'foo': 'bar'}) + + # no group is setup yet, no info to obtain + assert r.xinfo_groups(stream) == [] + + assert r.xgroup_create(stream, group, 0) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': b('0-0') + }] + assert r.xinfo_groups(stream) == expected + + @skip_if_server_version_lt('5.0.0') + def test_xgroup_create_mkstream(self, r): + # tests xgroup_create and xinfo_groups + stream = 'stream' + group = 'group' + + # an error is raised if a group is created on a stream that + # doesn't already exist + with pytest.raises(exceptions.ResponseError): + r.xgroup_create(stream, group, 0) + + # however, with mkstream=True, the underlying stream is created + # automatically + assert r.xgroup_create(stream, group, 0, mkstream=True) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': b('0-0') + }] + assert r.xinfo_groups(stream) == expected + + @skip_if_server_version_lt('5.0.0') + def test_xgroup_delconsumer(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + + # a consumer that hasn't yet read any messages doesn't do anything + assert r.xgroup_delconsumer(stream, group, consumer) == 0 + + # read all messages from the group + r.xreadgroup(group, consumer, streams={stream: 0}) + + # deleting the consumer should return 2 pending messages + assert r.xgroup_delconsumer(stream, group, consumer) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xgroup_destroy(self, r): + stream = 'stream' + group = 'group' + r.xadd(stream, {'foo': 'bar'}) + + # destroying a nonexistent group returns False + assert not r.xgroup_destroy(stream, group) + + r.xgroup_create(stream, group, 0) + assert r.xgroup_destroy(stream, group) + + @skip_if_server_version_lt('5.0.0') + def test_xgroup_setid(self, r): + stream = 'stream' + group = 'group' + message_id = r.xadd(stream, {'foo': 'bar'}) + + r.xgroup_create(stream, group, 0) + # advance the last_delivered_id to the message_id + r.xgroup_setid(stream, group, message_id) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': message_id + }] + assert r.xinfo_groups(stream) == expected + + @skip_if_server_version_lt('5.0.0') + def test_xinfo_consumers(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + r.xadd(stream, {'foo': 'bar'}) + + r.xgroup_create(stream, group, 0) + r.xreadgroup(group, consumer1, streams={stream: 0}) + r.xreadgroup(group, consumer2, streams={stream: 0}) + info = r.xinfo_consumers(stream, group) + assert len(info) == 2 + expected = [ + {'name': b(consumer1), 'pending': 1}, + {'name': b(consumer2), 'pending': 0}, + ] -class TestStrictCommands(object): + # we can't determine the idle time, so just make sure it's an int + assert isinstance(info[0].pop('idle'), (int, long)) + assert isinstance(info[1].pop('idle'), (int, long)) + assert info == expected + + @skip_if_server_version_lt('5.0.0') + def test_xinfo_stream(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + info = r.xinfo_stream(stream) + + assert info['length'] == 2 + assert info['first-entry'] == get_stream_message(r, stream, m1) + assert info['last-entry'] == get_stream_message(r, stream, m2) + + @skip_if_server_version_lt('5.0.0') + def test_xlen(self, r): + stream = 'stream' + assert r.xlen(stream) == 0 + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + assert r.xlen(stream) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xpending(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + + # xpending on a group that has no consumers yet + expected = { + 'pending': 0, + 'min': None, + 'max': None, + 'consumers': [] + } + assert r.xpending(stream, group) == expected + + # read 1 message from the group with each consumer + r.xreadgroup(group, consumer1, streams={stream: 0}, count=1) + r.xreadgroup(group, consumer2, streams={stream: m1}, count=1) + + expected = { + 'pending': 2, + 'min': m1, + 'max': m2, + 'consumers': [ + {'name': b(consumer1), 'pending': 1}, + {'name': b(consumer2), 'pending': 1}, + ] + } + assert r.xpending(stream, group) == expected + + @skip_if_server_version_lt('5.0.0') + def test_xpending_range(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + + # xpending range on a group that has no consumers yet + assert r.xpending_range(stream, group) == [] + + # read 1 message from the group with each consumer + r.xreadgroup(group, consumer1, streams={stream: 0}, count=1) + r.xreadgroup(group, consumer2, streams={stream: m1}, count=1) + + response = r.xpending_range(stream, group) + assert len(response) == 2 + assert response[0]['message_id'] == m1 + assert response[0]['consumer'] == b(consumer1) + assert response[1]['message_id'] == m2 + assert response[1]['consumer'] == b(consumer2) + + @skip_if_server_version_lt('5.0.0') + def test_xrange(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) + m4 = r.xadd(stream, {'foo': 'bar'}) + + def get_ids(results): + return [result[0] for result in results] + + results = r.xrange(stream, min=m1) + assert get_ids(results) == [m1, m2, m3, m4] + + results = r.xrange(stream, min=m2, max=m3) + assert get_ids(results) == [m2, m3] + + results = r.xrange(stream, max=m3) + assert get_ids(results) == [m1, m2, m3] + + results = r.xrange(stream, max=m2, count=1) + assert get_ids(results) == [m1] + + @skip_if_server_version_lt('5.0.0') + def test_xread(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'bing': 'baz'}) + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at 0 returns both messages + assert r.xread(streams={stream: 0}) == expected + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + ] + ] + ] + # xread starting at 0 and count=1 returns only the first message + assert r.xread(streams={stream: 0}, count=1) == expected + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at m1 returns only the second message + assert r.xread(streams={stream: m1}) == expected + + # xread starting at the last message returns an empty list + assert r.xread(streams={stream: m2}) == [] + + @skip_if_server_version_lt('5.0.0') + def test_xreadgroup(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'bing': 'baz'}) + r.xgroup_create(stream, group, 0) + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at 0 returns both messages + assert r.xreadgroup(group, consumer, streams={stream: 0}) == expected + + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + ] + ] + ] + # xread starting at 0 and count=1 returns only the first message + assert r.xreadgroup(group, consumer, streams={stream: 0}, count=1) == \ + expected + + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at m1 returns only the second message + assert r.xreadgroup(group, consumer, streams={stream: m1}) == expected + + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) + + # xread starting at the last message returns an empty message list + expected = [ + [ + stream, + [] + ] + ] + assert r.xreadgroup(group, consumer, streams={stream: m2}) == expected + + @skip_if_server_version_lt('5.0.0') + def test_xrevrange(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) + m4 = r.xadd(stream, {'foo': 'bar'}) + + def get_ids(results): + return [result[0] for result in results] + + results = r.xrevrange(stream, max=m4) + assert get_ids(results) == [m4, m3, m2, m1] + + results = r.xrevrange(stream, max=m3, min=m2) + assert get_ids(results) == [m3, m2] + + results = r.xrevrange(stream, min=m3) + assert get_ids(results) == [m4, m3] + + results = r.xrevrange(stream, min=m2, count=1) + assert get_ids(results) == [m4] + + @skip_if_server_version_lt('5.0.0') + def test_xtrim(self, r): + stream = 'stream' + + # trimming an empty key doesn't do anything + assert r.xtrim(stream, 1000) == 0 + + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + + # trimming an amount large than the number of messages + # doesn't do anything + assert r.xtrim(stream, 5, approximate=False) == 0 + + # 1 message is trimmed + assert r.xtrim(stream, 3, approximate=False) == 1 + + +class TestStrictCommands(object): def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 11c2008..56a73fe 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -240,7 +240,7 @@ class TestConnectionPoolURLParsing(object): def test_extra_typed_querystring_options(self): pool = redis.ConnectionPool.from_url( 'redis://localhost/2?socket_timeout=20&socket_connect_timeout=10' - '&socket_keepalive=&retry_on_timeout=Yes' + '&socket_keepalive=&retry_on_timeout=Yes&max_connections=10' ) assert pool.connection_class == redis.Connection @@ -253,6 +253,7 @@ class TestConnectionPoolURLParsing(object): 'retry_on_timeout': True, 'password': None, } + assert pool.max_connections == 10 def test_boolean_parsing(self): for expected, value in ( diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 8b5bf5a..4c8a36c 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -1,14 +1,15 @@ from __future__ import with_statement import pytest +import redis from redis._compat import unichr, u, unicode -from .conftest import r as _redis_client +from .conftest import _get_client class TestEncoding(object): @pytest.fixture() def r(self, request): - return _redis_client(request=request, decode_responses=True) + return _get_client(redis.Redis, request=request, decode_responses=True) def test_simple_encoding(self, r): unicode_string = unichr(3456) + u('abcd') + unichr(3421) @@ -34,7 +35,7 @@ class TestEncoding(object): class TestCommandsAndTokensArentEncoded(object): @pytest.fixture() def r(self, request): - return _redis_client(request=request, encoding='utf-16') + return _get_client(redis.Redis, request=request, encoding='utf-16') def test_basic_command(self, r): r.set('hello', 'world') diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index a240248..01a7129 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -6,7 +6,7 @@ import redis from redis.exceptions import ConnectionError from redis._compat import basestring, u, unichr, b -from .conftest import r as _redis_client +from .conftest import _get_client from .conftest import skip_if_server_version_lt @@ -316,7 +316,7 @@ class TestPubSubAutoDecoding(object): @pytest.fixture() def r(self, request): - return _redis_client(request=request, decode_responses=True) + return _get_client(redis.Redis, request=request, decode_responses=True) def test_channel_subscribe_unsubscribe(self, r): p = r.pubsub() @@ -1,6 +1,6 @@ [tox] minversion = 1.8 -envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pep8 +envlist = {py26,py27,py33,py34,py35,py36}-{plain,hiredis}, pycodestyle [testenv] deps = @@ -9,9 +9,9 @@ deps = hiredis: hiredis >= 0.1.3 commands = py.test {posargs} -[testenv:pep8] -basepython = python2.6 -deps = pep8 -commands = pep8 +[testenv:pycodestyle] +basepython = python3.6 +deps = pycodestyle +commands = pycodestyle skipsdist = true skip_install = true diff --git a/vagrant/.bash_profile b/vagrant/.bash_profile deleted file mode 100644 index e3d9bca..0000000 --- a/vagrant/.bash_profile +++ /dev/null @@ -1 +0,0 @@ -PATH=$PATH:/home/vagrant/redis/bin diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index 7465ccd..3ee7aee 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -12,11 +12,11 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.synced_folder "../", "/home/vagrant/redis-py" # install the redis server - config.vm.provision :shell, :path => "bootstrap.sh" - config.vm.provision :shell, :path => "build_redis.sh" - config.vm.provision :shell, :path => "install_redis.sh" - config.vm.provision :shell, :path => "install_sentinel.sh" - config.vm.provision :file, :source => ".bash_profile", :destination => "/home/vagrant/.bash_profile" + config.vm.provision :shell, :path => "../build_tools/bootstrap.sh" + config.vm.provision :shell, :path => "../build_tools/build_redis.sh" + config.vm.provision :shell, :path => "../build_tools/install_redis.sh" + config.vm.provision :shell, :path => "../build_tools/install_sentinel.sh" + config.vm.provision :file, :source => "../build_tools/.bash_profile", :destination => "/home/vagrant/.bash_profile" # setup forwarded ports config.vm.network "forwarded_port", guest: 6379, host: 6379 |