From ebbbb7ddfc763cc150781cb3e72c8a3f718a2260 Mon Sep 17 00:00:00 2001 From: James Remeika Date: Fri, 6 Apr 2018 19:31:07 -0400 Subject: Documents rediss:// support --- redis/client.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/redis/client.py b/redis/client.py index 79e94d0..59edcda 100755 --- a/redis/client.py +++ b/redis/client.py @@ -461,16 +461,24 @@ class StrictRedis(object): @classmethod def from_url(cls, url, db=None, **kwargs): """ - Return a Redis client object configured from the given URL, which must - use either `the ``redis://`` scheme - `_ for RESP - connections or the ``unix://`` scheme for Unix domain sockets. + Return a Redis client object configured from the given URL For example:: redis://[:password]@localhost:6379/0 + rediss://[:password]@localhost:6379/0 unix://[:password]@/path/to/socket.sock?db=0 + Three URL schemes are supported: + + - ```redis://`` + `_ creates a + normal TCP socket connection + - ```rediss://`` + `_ creates a + SSL wrapped TCP socket connection + - ``unix://`` creates a Unix Domain Socket connection + There are several ways to specify a database number. The parse function will return the first specified option: 1. A ``db`` querystring option, e.g. redis://localhost?db=0 -- cgit v1.2.1 From eb6a34dab4d316a7c8ea186775321458e033e3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Mas=C5=82owski?= Date: Wed, 23 May 2018 16:02:56 +0200 Subject: Fix parsing max_connections URL query string parameter Previously ConnectionPool.from_url kept it as a string, causing a 'ValueError: "max_connections" must be a positive integer'. --- redis/connection.py | 3 ++- tests/test_connection_pool.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index 6ad467a..25c1f73 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -779,7 +779,8 @@ URL_QUERY_ARGUMENT_PARSERS = { 'socket_timeout': float, 'socket_connect_timeout': float, 'socket_keepalive': to_bool, - 'retry_on_timeout': to_bool + 'retry_on_timeout': to_bool, + 'max_connections': int, } diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 11c2008..56a73fe 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -240,7 +240,7 @@ class TestConnectionPoolURLParsing(object): def test_extra_typed_querystring_options(self): pool = redis.ConnectionPool.from_url( 'redis://localhost/2?socket_timeout=20&socket_connect_timeout=10' - '&socket_keepalive=&retry_on_timeout=Yes' + '&socket_keepalive=&retry_on_timeout=Yes&max_connections=10' ) assert pool.connection_class == redis.Connection @@ -253,6 +253,7 @@ class TestConnectionPoolURLParsing(object): 'retry_on_timeout': True, 'password': None, } + assert pool.max_connections == 10 def test_boolean_parsing(self): for expected, value in ( -- cgit v1.2.1 From d750ed1fdb47d755b609d4b6206777ef07610804 Mon Sep 17 00:00:00 2001 From: Jonathan Stern Date: Sat, 2 Jun 2018 14:25:41 -0500 Subject: fix description for srandmember bugfix Reading through the changelog I noticed that one of the items seemed to trail off mid-sentence. I assume this was meant to refer to https://github.com/andymccurdy/redis-py/issues/881 and https://github.com/andymccurdy/redis-py/pull/882 --- CHANGES | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES b/CHANGES index d048ea5..64dcf19 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,7 @@ * 2.10.6 * Various performance improvements. Thanks cjsimpson - * Fixed a bug with SRANDMEMBER where + * Fixed a bug with SRANDMEMBER where the behavior for `number=0` did + not match the spec. Thanks Alex Wang * Added HSTRLEN command. Thanks Alexander Putilin * Added the TOUCH command. Thanks Anis Jonischkeit * Remove unnecessary calls to the server when registering Lua scripts. -- cgit v1.2.1 From e73ee62342a7d4f552f7d0ac6bf203db4c7288cf Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Thu, 19 Jul 2018 14:52:40 +0300 Subject: move redis installation to /var/lib/redis --- vagrant/.bash_profile | 2 +- vagrant/redis-configs/001-master | 2 +- vagrant/redis-configs/002-slave | 2 +- vagrant/redis_init_script | 6 +++--- vagrant/redis_vars.sh | 2 +- vagrant/sentinel_init_script | 6 +++--- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/vagrant/.bash_profile b/vagrant/.bash_profile index e3d9bca..b023cf7 100644 --- a/vagrant/.bash_profile +++ b/vagrant/.bash_profile @@ -1 +1 @@ -PATH=$PATH:/home/vagrant/redis/bin +PATH=$PATH:/var/lib/redis/bin diff --git a/vagrant/redis-configs/001-master b/vagrant/redis-configs/001-master index f04f23d..8591f1a 100644 --- a/vagrant/redis-configs/001-master +++ b/vagrant/redis-configs/001-master @@ -5,4 +5,4 @@ daemonize yes unixsocket /tmp/redis_master.sock unixsocketperm 777 dbfilename master.rdb -dir /home/vagrant/redis/backups +dir /var/lib/redis/backups diff --git a/vagrant/redis-configs/002-slave b/vagrant/redis-configs/002-slave index 5d302fe..13eb77e 100644 --- a/vagrant/redis-configs/002-slave +++ b/vagrant/redis-configs/002-slave @@ -5,6 +5,6 @@ daemonize yes unixsocket /tmp/redis-slave.sock unixsocketperm 777 dbfilename slave.rdb -dir /home/vagrant/redis/backups +dir /var/lib/redis/backups slaveof 127.0.0.1 6379 diff --git a/vagrant/redis_init_script b/vagrant/redis_init_script index e8bfa08..04cb2db 100755 --- a/vagrant/redis_init_script +++ b/vagrant/redis_init_script @@ -12,10 +12,10 @@ REDISPORT={{ PORT }} PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf -EXEC=/home/vagrant/redis/bin/redis-server -CLIEXEC=/home/vagrant/redis/bin/redis-cli +EXEC=/var/lib/redis/bin/redis-server +CLIEXEC=/var/lib/redis/bin/redis-cli case "$1" in start) diff --git a/vagrant/redis_vars.sh b/vagrant/redis_vars.sh index 1ec6295..4719959 100755 --- a/vagrant/redis_vars.sh +++ b/vagrant/redis_vars.sh @@ -7,7 +7,7 @@ REDIS_VERSION=3.2.0 REDIS_DOWNLOAD_DIR=/home/vagrant/redis-downloads REDIS_PACKAGE=redis-$REDIS_VERSION.tar.gz REDIS_BUILD_DIR=$REDIS_DOWNLOAD_DIR/redis-$REDIS_VERSION -REDIS_DIR=/home/vagrant/redis +REDIS_DIR=/var/lib/redis REDIS_BIN_DIR=$REDIS_DIR/bin REDIS_CONF_DIR=$REDIS_DIR/conf REDIS_SAVE_DIR=$REDIS_DIR/backups diff --git a/vagrant/sentinel_init_script b/vagrant/sentinel_init_script index ea93537..1d94804 100755 --- a/vagrant/sentinel_init_script +++ b/vagrant/sentinel_init_script @@ -12,10 +12,10 @@ SENTINELPORT={{ PORT }} PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf -EXEC=/home/vagrant/redis/bin/redis-sentinel -CLIEXEC=/home/vagrant/redis/bin/redis-cli +EXEC=/var/lib/redis/bin/redis-sentinel +CLIEXEC=/var/lib/redis/bin/redis-cli case "$1" in start) -- cgit v1.2.1 From fe4bb23415c6a40edfff97da6bfe0c313fb2f4f4 Mon Sep 17 00:00:00 2001 From: Michael Zalimeni Date: Wed, 8 Aug 2018 11:49:46 -0400 Subject: Fix parsing for INFO keys that include ':' --- redis/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 79e94d0..1e31a6e 100755 --- a/redis/client.py +++ b/redis/client.py @@ -114,7 +114,8 @@ def parse_info(response): for line in response.splitlines(): if line and not line.startswith('#'): if line.find(':') != -1: - key, value = line.split(':', 1) + # support keys that include ':' by using rsplit + key, value = line.rsplit(':', 1) info[key] = get_value(value) else: # if the line isn't splittable, append it to the "__raw__" key -- cgit v1.2.1 From 9c90388944478aec4450a4ff4c5db5af77c08f54 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Thu, 30 Aug 2018 13:44:29 +0300 Subject: move build tools to their own folder, separate from vagrant --- build_tools/.bash_profile | 1 + build_tools/bootstrap.sh | 4 +++ build_tools/build_redis.sh | 29 +++++++++++++++++++++ build_tools/install_redis.sh | 37 +++++++++++++++++++++++++++ build_tools/install_sentinel.sh | 37 +++++++++++++++++++++++++++ build_tools/redis-configs/001-master | 8 ++++++ build_tools/redis-configs/002-slave | 10 ++++++++ build_tools/redis_init_script | 49 ++++++++++++++++++++++++++++++++++++ build_tools/redis_vars.sh | 48 +++++++++++++++++++++++++++++++++++ build_tools/sentinel-configs/001-1 | 6 +++++ build_tools/sentinel-configs/002-2 | 6 +++++ build_tools/sentinel-configs/003-3 | 6 +++++ build_tools/sentinel_init_script | 49 ++++++++++++++++++++++++++++++++++++ vagrant/.bash_profile | 1 - vagrant/Vagrantfile | 10 ++++---- vagrant/bootstrap.sh | 4 --- vagrant/build_redis.sh | 29 --------------------- vagrant/install_redis.sh | 37 --------------------------- vagrant/install_sentinel.sh | 37 --------------------------- vagrant/redis-configs/001-master | 8 ------ vagrant/redis-configs/002-slave | 10 -------- vagrant/redis_init_script | 49 ------------------------------------ vagrant/redis_vars.sh | 48 ----------------------------------- vagrant/sentinel-configs/001-1 | 6 ----- vagrant/sentinel-configs/002-2 | 6 ----- vagrant/sentinel-configs/003-3 | 6 ----- vagrant/sentinel_init_script | 49 ------------------------------------ 27 files changed, 295 insertions(+), 295 deletions(-) create mode 100644 build_tools/.bash_profile create mode 100755 build_tools/bootstrap.sh create mode 100755 build_tools/build_redis.sh create mode 100755 build_tools/install_redis.sh create mode 100755 build_tools/install_sentinel.sh create mode 100644 build_tools/redis-configs/001-master create mode 100644 build_tools/redis-configs/002-slave create mode 100755 build_tools/redis_init_script create mode 100755 build_tools/redis_vars.sh create mode 100644 build_tools/sentinel-configs/001-1 create mode 100644 build_tools/sentinel-configs/002-2 create mode 100644 build_tools/sentinel-configs/003-3 create mode 100755 build_tools/sentinel_init_script delete mode 100644 vagrant/.bash_profile delete mode 100755 vagrant/bootstrap.sh delete mode 100755 vagrant/build_redis.sh delete mode 100755 vagrant/install_redis.sh delete mode 100755 vagrant/install_sentinel.sh delete mode 100644 vagrant/redis-configs/001-master delete mode 100644 vagrant/redis-configs/002-slave delete mode 100755 vagrant/redis_init_script delete mode 100755 vagrant/redis_vars.sh delete mode 100644 vagrant/sentinel-configs/001-1 delete mode 100644 vagrant/sentinel-configs/002-2 delete mode 100644 vagrant/sentinel-configs/003-3 delete mode 100755 vagrant/sentinel_init_script diff --git a/build_tools/.bash_profile b/build_tools/.bash_profile new file mode 100644 index 0000000..b023cf7 --- /dev/null +++ b/build_tools/.bash_profile @@ -0,0 +1 @@ +PATH=$PATH:/var/lib/redis/bin diff --git a/build_tools/bootstrap.sh b/build_tools/bootstrap.sh new file mode 100755 index 0000000..a5a0d2c --- /dev/null +++ b/build_tools/bootstrap.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# need make to build redis +sudo apt-get install make diff --git a/build_tools/build_redis.sh b/build_tools/build_redis.sh new file mode 100755 index 0000000..379c6cc --- /dev/null +++ b/build_tools/build_redis.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +source /home/vagrant/redis-py/build_tools/redis_vars.sh + +pushd /home/vagrant + +uninstall_all_sentinel_instances +uninstall_all_redis_instances + +# create a clean directory for redis +rm -rf $REDIS_DIR +mkdir -p $REDIS_BIN_DIR +mkdir -p $REDIS_CONF_DIR +mkdir -p $REDIS_SAVE_DIR + +# download, unpack and build redis +mkdir -p $REDIS_DOWNLOAD_DIR +cd $REDIS_DOWNLOAD_DIR +rm -f $REDIS_PACKAGE +rm -rf $REDIS_BUILD_DIR +wget http://download.redis.io/releases/$REDIS_PACKAGE +tar zxvf $REDIS_PACKAGE +cd $REDIS_BUILD_DIR +make +cp src/redis-server $REDIS_DIR/bin +cp src/redis-cli $REDIS_DIR/bin +cp src/redis-sentinel $REDIS_DIR/bin + +popd diff --git a/build_tools/install_redis.sh b/build_tools/install_redis.sh new file mode 100755 index 0000000..fd53a1c --- /dev/null +++ b/build_tools/install_redis.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +source /home/vagrant/redis-py/build_tools/redis_vars.sh + +for filename in `ls $VAGRANT_REDIS_CONF_DIR`; do + # cuts the order prefix off of the filename, e.g. 001-master -> master + PROCESS_NAME=redis-`echo $filename | cut -f 2- -d -` + echo "======================================" + echo "INSTALLING REDIS SERVER: $PROCESS_NAME" + echo "======================================" + + # make sure the instance is uninstalled (it should be already) + uninstall_instance $PROCESS_NAME + + # base config + mkdir -p $REDIS_CONF_DIR + cp $REDIS_BUILD_DIR/redis.conf $REDIS_CONF_DIR/$PROCESS_NAME.conf + # override config values from file + cat $VAGRANT_REDIS_CONF_DIR/$filename >> $REDIS_CONF_DIR/$PROCESS_NAME.conf + + # replace placeholder variables in init.d script + cp $VAGRANT_DIR/redis_init_script /etc/init.d/$PROCESS_NAME + sed -i "s/{{ PROCESS_NAME }}/$PROCESS_NAME/g" /etc/init.d/$PROCESS_NAME + # need to read the config file to find out what port this instance will run on + port=`grep port $VAGRANT_REDIS_CONF_DIR/$filename | cut -f 2 -d " "` + sed -i "s/{{ PORT }}/$port/g" /etc/init.d/$PROCESS_NAME + chmod 755 /etc/init.d/$PROCESS_NAME + + # and tell update-rc.d about it + update-rc.d $PROCESS_NAME defaults 98 + + # save the $PROCESS_NAME into installed instances file + echo $PROCESS_NAME >> $REDIS_INSTALLED_INSTANCES_FILE + + # start redis + /etc/init.d/$PROCESS_NAME start +done diff --git a/build_tools/install_sentinel.sh b/build_tools/install_sentinel.sh new file mode 100755 index 0000000..0597208 --- /dev/null +++ b/build_tools/install_sentinel.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +source /home/vagrant/redis-py/build_tools/redis_vars.sh + +for filename in `ls $VAGRANT_SENTINEL_CONF_DIR`; do + # cuts the order prefix off of the filename, e.g. 001-master -> master + PROCESS_NAME=sentinel-`echo $filename | cut -f 2- -d -` + echo "=========================================" + echo "INSTALLING SENTINEL SERVER: $PROCESS_NAME" + echo "=========================================" + + # make sure the instance is uninstalled (it should be already) + uninstall_instance $PROCESS_NAME + + # base config + mkdir -p $REDIS_CONF_DIR + cp $REDIS_BUILD_DIR/sentinel.conf $REDIS_CONF_DIR/$PROCESS_NAME.conf + # override config values from file + cat $VAGRANT_SENTINEL_CONF_DIR/$filename >> $REDIS_CONF_DIR/$PROCESS_NAME.conf + + # replace placeholder variables in init.d script + cp $VAGRANT_DIR/sentinel_init_script /etc/init.d/$PROCESS_NAME + sed -i "s/{{ PROCESS_NAME }}/$PROCESS_NAME/g" /etc/init.d/$PROCESS_NAME + # need to read the config file to find out what port this instance will run on + port=`grep port $VAGRANT_SENTINEL_CONF_DIR/$filename | cut -f 2 -d " "` + sed -i "s/{{ PORT }}/$port/g" /etc/init.d/$PROCESS_NAME + chmod 755 /etc/init.d/$PROCESS_NAME + + # and tell update-rc.d about it + update-rc.d $PROCESS_NAME defaults 99 + + # save the $PROCESS_NAME into installed instances file + echo $PROCESS_NAME >> $SENTINEL_INSTALLED_INSTANCES_FILE + + # start redis + /etc/init.d/$PROCESS_NAME start +done diff --git a/build_tools/redis-configs/001-master b/build_tools/redis-configs/001-master new file mode 100644 index 0000000..8591f1a --- /dev/null +++ b/build_tools/redis-configs/001-master @@ -0,0 +1,8 @@ +pidfile /var/run/redis-master.pid +bind * +port 6379 +daemonize yes +unixsocket /tmp/redis_master.sock +unixsocketperm 777 +dbfilename master.rdb +dir /var/lib/redis/backups diff --git a/build_tools/redis-configs/002-slave b/build_tools/redis-configs/002-slave new file mode 100644 index 0000000..13eb77e --- /dev/null +++ b/build_tools/redis-configs/002-slave @@ -0,0 +1,10 @@ +pidfile /var/run/redis-slave.pid +bind * +port 6380 +daemonize yes +unixsocket /tmp/redis-slave.sock +unixsocketperm 777 +dbfilename slave.rdb +dir /var/lib/redis/backups + +slaveof 127.0.0.1 6379 diff --git a/build_tools/redis_init_script b/build_tools/redis_init_script new file mode 100755 index 0000000..04cb2db --- /dev/null +++ b/build_tools/redis_init_script @@ -0,0 +1,49 @@ +#!/bin/sh + +### BEGIN INIT INFO +# Provides: redis-server +# Required-Start: $syslog +# Required-Stop: $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Start redis-server at boot time +# Description: Control redis-server. +### END INIT INFO + +REDISPORT={{ PORT }} +PIDFILE=/var/run/{{ PROCESS_NAME }}.pid +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf + +EXEC=/var/lib/redis/bin/redis-server +CLIEXEC=/var/lib/redis/bin/redis-cli + +case "$1" in + start) + if [ -f $PIDFILE ] + then + echo "$PIDFILE exists, process is already running or crashed" + else + echo "Starting Redis server..." + $EXEC $CONF + fi + ;; + stop) + if [ ! -f $PIDFILE ] + then + echo "$PIDFILE does not exist, process is not running" + else + PID=$(cat $PIDFILE) + echo "Stopping ..." + $CLIEXEC -p $REDISPORT shutdown + while [ -x /proc/${PID} ] + do + echo "Waiting for Redis to shutdown ..." + sleep 1 + done + echo "Redis stopped" + fi + ;; + *) + echo "Please use start or stop as first argument" + ;; +esac diff --git a/build_tools/redis_vars.sh b/build_tools/redis_vars.sh new file mode 100755 index 0000000..c52dd4c --- /dev/null +++ b/build_tools/redis_vars.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +VAGRANT_DIR=/home/vagrant/redis-py/build_tools +VAGRANT_REDIS_CONF_DIR=$VAGRANT_DIR/redis-configs +VAGRANT_SENTINEL_CONF_DIR=$VAGRANT_DIR/sentinel-configs +REDIS_VERSION=3.2.0 +REDIS_DOWNLOAD_DIR=/home/vagrant/redis-downloads +REDIS_PACKAGE=redis-$REDIS_VERSION.tar.gz +REDIS_BUILD_DIR=$REDIS_DOWNLOAD_DIR/redis-$REDIS_VERSION +REDIS_DIR=/var/lib/redis +REDIS_BIN_DIR=$REDIS_DIR/bin +REDIS_CONF_DIR=$REDIS_DIR/conf +REDIS_SAVE_DIR=$REDIS_DIR/backups +REDIS_INSTALLED_INSTANCES_FILE=$REDIS_DIR/redis-instances +SENTINEL_INSTALLED_INSTANCES_FILE=$REDIS_DIR/sentinel-instances + +function uninstall_instance() { + # Expects $1 to be the init.d filename, e.g. redis-nodename or + # sentinel-nodename + + if [ -a /etc/init.d/$1 ]; then + + echo "======================================" + echo "UNINSTALLING REDIS SERVER: $1" + echo "======================================" + + /etc/init.d/$1 stop + update-rc.d -f $1 remove + rm -f /etc/init.d/$1 + fi; + rm -f $REDIS_CONF_DIR/$1.conf +} + +function uninstall_all_redis_instances() { + if [ -a $REDIS_INSTALLED_INSTANCES_FILE ]; then + cat $REDIS_INSTALLED_INSTANCES_FILE | while read line; do + uninstall_instance $line; + done; + fi +} + +function uninstall_all_sentinel_instances() { + if [ -a $SENTINEL_INSTALLED_INSTANCES_FILE ]; then + cat $SENTINEL_INSTALLED_INSTANCES_FILE | while read line; do + uninstall_instance $line; + done; + fi +} diff --git a/build_tools/sentinel-configs/001-1 b/build_tools/sentinel-configs/001-1 new file mode 100644 index 0000000..eccc3d1 --- /dev/null +++ b/build_tools/sentinel-configs/001-1 @@ -0,0 +1,6 @@ +pidfile /var/run/sentinel-1.pid +port 26379 +daemonize yes + +# short timeout for sentinel tests +sentinel down-after-milliseconds mymaster 500 diff --git a/build_tools/sentinel-configs/002-2 b/build_tools/sentinel-configs/002-2 new file mode 100644 index 0000000..0cd2801 --- /dev/null +++ b/build_tools/sentinel-configs/002-2 @@ -0,0 +1,6 @@ +pidfile /var/run/sentinel-2.pid +port 26380 +daemonize yes + +# short timeout for sentinel tests +sentinel down-after-milliseconds mymaster 500 diff --git a/build_tools/sentinel-configs/003-3 b/build_tools/sentinel-configs/003-3 new file mode 100644 index 0000000..c7f4fcd --- /dev/null +++ b/build_tools/sentinel-configs/003-3 @@ -0,0 +1,6 @@ +pidfile /var/run/sentinel-3.pid +port 26381 +daemonize yes + +# short timeout for sentinel tests +sentinel down-after-milliseconds mymaster 500 diff --git a/build_tools/sentinel_init_script b/build_tools/sentinel_init_script new file mode 100755 index 0000000..1d94804 --- /dev/null +++ b/build_tools/sentinel_init_script @@ -0,0 +1,49 @@ +#!/bin/sh + +### BEGIN INIT INFO +# Provides: redis-sentintel +# Required-Start: $syslog +# Required-Stop: $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Start redis-sentinel at boot time +# Description: Control redis-sentinel. +### END INIT INFO + +SENTINELPORT={{ PORT }} +PIDFILE=/var/run/{{ PROCESS_NAME }}.pid +CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf + +EXEC=/var/lib/redis/bin/redis-sentinel +CLIEXEC=/var/lib/redis/bin/redis-cli + +case "$1" in + start) + if [ -f $PIDFILE ] + then + echo "$PIDFILE exists, process is already running or crashed" + else + echo "Starting Redis Sentinel..." + $EXEC $CONF + fi + ;; + stop) + if [ ! -f $PIDFILE ] + then + echo "$PIDFILE does not exist, process is not running" + else + PID=$(cat $PIDFILE) + echo "Stopping ..." + $CLIEXEC -p $SENTINELPORT shutdown + while [ -x /proc/${PID} ] + do + echo "Waiting for Sentinel to shutdown ..." + sleep 1 + done + echo "Sentinel stopped" + fi + ;; + *) + echo "Please use start or stop as first argument" + ;; +esac diff --git a/vagrant/.bash_profile b/vagrant/.bash_profile deleted file mode 100644 index b023cf7..0000000 --- a/vagrant/.bash_profile +++ /dev/null @@ -1 +0,0 @@ -PATH=$PATH:/var/lib/redis/bin diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index 7465ccd..3ee7aee 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -12,11 +12,11 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.synced_folder "../", "/home/vagrant/redis-py" # install the redis server - config.vm.provision :shell, :path => "bootstrap.sh" - config.vm.provision :shell, :path => "build_redis.sh" - config.vm.provision :shell, :path => "install_redis.sh" - config.vm.provision :shell, :path => "install_sentinel.sh" - config.vm.provision :file, :source => ".bash_profile", :destination => "/home/vagrant/.bash_profile" + config.vm.provision :shell, :path => "../build_tools/bootstrap.sh" + config.vm.provision :shell, :path => "../build_tools/build_redis.sh" + config.vm.provision :shell, :path => "../build_tools/install_redis.sh" + config.vm.provision :shell, :path => "../build_tools/install_sentinel.sh" + config.vm.provision :file, :source => "../build_tools/.bash_profile", :destination => "/home/vagrant/.bash_profile" # setup forwarded ports config.vm.network "forwarded_port", guest: 6379, host: 6379 diff --git a/vagrant/bootstrap.sh b/vagrant/bootstrap.sh deleted file mode 100755 index a5a0d2c..0000000 --- a/vagrant/bootstrap.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env bash - -# need make to build redis -sudo apt-get install make diff --git a/vagrant/build_redis.sh b/vagrant/build_redis.sh deleted file mode 100755 index 728e617..0000000 --- a/vagrant/build_redis.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env bash - -source /home/vagrant/redis-py/vagrant/redis_vars.sh - -pushd /home/vagrant - -uninstall_all_sentinel_instances -uninstall_all_redis_instances - -# create a clean directory for redis -rm -rf $REDIS_DIR -mkdir -p $REDIS_BIN_DIR -mkdir -p $REDIS_CONF_DIR -mkdir -p $REDIS_SAVE_DIR - -# download, unpack and build redis -mkdir -p $REDIS_DOWNLOAD_DIR -cd $REDIS_DOWNLOAD_DIR -rm -f $REDIS_PACKAGE -rm -rf $REDIS_BUILD_DIR -wget http://download.redis.io/releases/$REDIS_PACKAGE -tar zxvf $REDIS_PACKAGE -cd $REDIS_BUILD_DIR -make -cp src/redis-server $REDIS_DIR/bin -cp src/redis-cli $REDIS_DIR/bin -cp src/redis-sentinel $REDIS_DIR/bin - -popd diff --git a/vagrant/install_redis.sh b/vagrant/install_redis.sh deleted file mode 100755 index bb5f1d2..0000000 --- a/vagrant/install_redis.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env bash - -source /home/vagrant/redis-py/vagrant/redis_vars.sh - -for filename in `ls $VAGRANT_REDIS_CONF_DIR`; do - # cuts the order prefix off of the filename, e.g. 001-master -> master - PROCESS_NAME=redis-`echo $filename | cut -f 2- -d -` - echo "======================================" - echo "INSTALLING REDIS SERVER: $PROCESS_NAME" - echo "======================================" - - # make sure the instance is uninstalled (it should be already) - uninstall_instance $PROCESS_NAME - - # base config - mkdir -p $REDIS_CONF_DIR - cp $REDIS_BUILD_DIR/redis.conf $REDIS_CONF_DIR/$PROCESS_NAME.conf - # override config values from file - cat $VAGRANT_REDIS_CONF_DIR/$filename >> $REDIS_CONF_DIR/$PROCESS_NAME.conf - - # replace placeholder variables in init.d script - cp $VAGRANT_DIR/redis_init_script /etc/init.d/$PROCESS_NAME - sed -i "s/{{ PROCESS_NAME }}/$PROCESS_NAME/g" /etc/init.d/$PROCESS_NAME - # need to read the config file to find out what port this instance will run on - port=`grep port $VAGRANT_REDIS_CONF_DIR/$filename | cut -f 2 -d " "` - sed -i "s/{{ PORT }}/$port/g" /etc/init.d/$PROCESS_NAME - chmod 755 /etc/init.d/$PROCESS_NAME - - # and tell update-rc.d about it - update-rc.d $PROCESS_NAME defaults 98 - - # save the $PROCESS_NAME into installed instances file - echo $PROCESS_NAME >> $REDIS_INSTALLED_INSTANCES_FILE - - # start redis - /etc/init.d/$PROCESS_NAME start -done diff --git a/vagrant/install_sentinel.sh b/vagrant/install_sentinel.sh deleted file mode 100755 index 58cd808..0000000 --- a/vagrant/install_sentinel.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env bash - -source /home/vagrant/redis-py/vagrant/redis_vars.sh - -for filename in `ls $VAGRANT_SENTINEL_CONF_DIR`; do - # cuts the order prefix off of the filename, e.g. 001-master -> master - PROCESS_NAME=sentinel-`echo $filename | cut -f 2- -d -` - echo "=========================================" - echo "INSTALLING SENTINEL SERVER: $PROCESS_NAME" - echo "=========================================" - - # make sure the instance is uninstalled (it should be already) - uninstall_instance $PROCESS_NAME - - # base config - mkdir -p $REDIS_CONF_DIR - cp $REDIS_BUILD_DIR/sentinel.conf $REDIS_CONF_DIR/$PROCESS_NAME.conf - # override config values from file - cat $VAGRANT_SENTINEL_CONF_DIR/$filename >> $REDIS_CONF_DIR/$PROCESS_NAME.conf - - # replace placeholder variables in init.d script - cp $VAGRANT_DIR/sentinel_init_script /etc/init.d/$PROCESS_NAME - sed -i "s/{{ PROCESS_NAME }}/$PROCESS_NAME/g" /etc/init.d/$PROCESS_NAME - # need to read the config file to find out what port this instance will run on - port=`grep port $VAGRANT_SENTINEL_CONF_DIR/$filename | cut -f 2 -d " "` - sed -i "s/{{ PORT }}/$port/g" /etc/init.d/$PROCESS_NAME - chmod 755 /etc/init.d/$PROCESS_NAME - - # and tell update-rc.d about it - update-rc.d $PROCESS_NAME defaults 99 - - # save the $PROCESS_NAME into installed instances file - echo $PROCESS_NAME >> $SENTINEL_INSTALLED_INSTANCES_FILE - - # start redis - /etc/init.d/$PROCESS_NAME start -done diff --git a/vagrant/redis-configs/001-master b/vagrant/redis-configs/001-master deleted file mode 100644 index 8591f1a..0000000 --- a/vagrant/redis-configs/001-master +++ /dev/null @@ -1,8 +0,0 @@ -pidfile /var/run/redis-master.pid -bind * -port 6379 -daemonize yes -unixsocket /tmp/redis_master.sock -unixsocketperm 777 -dbfilename master.rdb -dir /var/lib/redis/backups diff --git a/vagrant/redis-configs/002-slave b/vagrant/redis-configs/002-slave deleted file mode 100644 index 13eb77e..0000000 --- a/vagrant/redis-configs/002-slave +++ /dev/null @@ -1,10 +0,0 @@ -pidfile /var/run/redis-slave.pid -bind * -port 6380 -daemonize yes -unixsocket /tmp/redis-slave.sock -unixsocketperm 777 -dbfilename slave.rdb -dir /var/lib/redis/backups - -slaveof 127.0.0.1 6379 diff --git a/vagrant/redis_init_script b/vagrant/redis_init_script deleted file mode 100755 index 04cb2db..0000000 --- a/vagrant/redis_init_script +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/sh - -### BEGIN INIT INFO -# Provides: redis-server -# Required-Start: $syslog -# Required-Stop: $syslog -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Short-Description: Start redis-server at boot time -# Description: Control redis-server. -### END INIT INFO - -REDISPORT={{ PORT }} -PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf - -EXEC=/var/lib/redis/bin/redis-server -CLIEXEC=/var/lib/redis/bin/redis-cli - -case "$1" in - start) - if [ -f $PIDFILE ] - then - echo "$PIDFILE exists, process is already running or crashed" - else - echo "Starting Redis server..." - $EXEC $CONF - fi - ;; - stop) - if [ ! -f $PIDFILE ] - then - echo "$PIDFILE does not exist, process is not running" - else - PID=$(cat $PIDFILE) - echo "Stopping ..." - $CLIEXEC -p $REDISPORT shutdown - while [ -x /proc/${PID} ] - do - echo "Waiting for Redis to shutdown ..." - sleep 1 - done - echo "Redis stopped" - fi - ;; - *) - echo "Please use start or stop as first argument" - ;; -esac diff --git a/vagrant/redis_vars.sh b/vagrant/redis_vars.sh deleted file mode 100755 index 4719959..0000000 --- a/vagrant/redis_vars.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash - -VAGRANT_DIR=/home/vagrant/redis-py/vagrant -VAGRANT_REDIS_CONF_DIR=$VAGRANT_DIR/redis-configs -VAGRANT_SENTINEL_CONF_DIR=$VAGRANT_DIR/sentinel-configs -REDIS_VERSION=3.2.0 -REDIS_DOWNLOAD_DIR=/home/vagrant/redis-downloads -REDIS_PACKAGE=redis-$REDIS_VERSION.tar.gz -REDIS_BUILD_DIR=$REDIS_DOWNLOAD_DIR/redis-$REDIS_VERSION -REDIS_DIR=/var/lib/redis -REDIS_BIN_DIR=$REDIS_DIR/bin -REDIS_CONF_DIR=$REDIS_DIR/conf -REDIS_SAVE_DIR=$REDIS_DIR/backups -REDIS_INSTALLED_INSTANCES_FILE=$REDIS_DIR/redis-instances -SENTINEL_INSTALLED_INSTANCES_FILE=$REDIS_DIR/sentinel-instances - -function uninstall_instance() { - # Expects $1 to be the init.d filename, e.g. redis-nodename or - # sentinel-nodename - - if [ -a /etc/init.d/$1 ]; then - - echo "======================================" - echo "UNINSTALLING REDIS SERVER: $1" - echo "======================================" - - /etc/init.d/$1 stop - update-rc.d -f $1 remove - rm -f /etc/init.d/$1 - fi; - rm -f $REDIS_CONF_DIR/$1.conf -} - -function uninstall_all_redis_instances() { - if [ -a $REDIS_INSTALLED_INSTANCES_FILE ]; then - cat $REDIS_INSTALLED_INSTANCES_FILE | while read line; do - uninstall_instance $line; - done; - fi -} - -function uninstall_all_sentinel_instances() { - if [ -a $SENTINEL_INSTALLED_INSTANCES_FILE ]; then - cat $SENTINEL_INSTALLED_INSTANCES_FILE | while read line; do - uninstall_instance $line; - done; - fi -} diff --git a/vagrant/sentinel-configs/001-1 b/vagrant/sentinel-configs/001-1 deleted file mode 100644 index eccc3d1..0000000 --- a/vagrant/sentinel-configs/001-1 +++ /dev/null @@ -1,6 +0,0 @@ -pidfile /var/run/sentinel-1.pid -port 26379 -daemonize yes - -# short timeout for sentinel tests -sentinel down-after-milliseconds mymaster 500 diff --git a/vagrant/sentinel-configs/002-2 b/vagrant/sentinel-configs/002-2 deleted file mode 100644 index 0cd2801..0000000 --- a/vagrant/sentinel-configs/002-2 +++ /dev/null @@ -1,6 +0,0 @@ -pidfile /var/run/sentinel-2.pid -port 26380 -daemonize yes - -# short timeout for sentinel tests -sentinel down-after-milliseconds mymaster 500 diff --git a/vagrant/sentinel-configs/003-3 b/vagrant/sentinel-configs/003-3 deleted file mode 100644 index c7f4fcd..0000000 --- a/vagrant/sentinel-configs/003-3 +++ /dev/null @@ -1,6 +0,0 @@ -pidfile /var/run/sentinel-3.pid -port 26381 -daemonize yes - -# short timeout for sentinel tests -sentinel down-after-milliseconds mymaster 500 diff --git a/vagrant/sentinel_init_script b/vagrant/sentinel_init_script deleted file mode 100755 index 1d94804..0000000 --- a/vagrant/sentinel_init_script +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/sh - -### BEGIN INIT INFO -# Provides: redis-sentintel -# Required-Start: $syslog -# Required-Stop: $syslog -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Short-Description: Start redis-sentinel at boot time -# Description: Control redis-sentinel. -### END INIT INFO - -SENTINELPORT={{ PORT }} -PIDFILE=/var/run/{{ PROCESS_NAME }}.pid -CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf - -EXEC=/var/lib/redis/bin/redis-sentinel -CLIEXEC=/var/lib/redis/bin/redis-cli - -case "$1" in - start) - if [ -f $PIDFILE ] - then - echo "$PIDFILE exists, process is already running or crashed" - else - echo "Starting Redis Sentinel..." - $EXEC $CONF - fi - ;; - stop) - if [ ! -f $PIDFILE ] - then - echo "$PIDFILE does not exist, process is not running" - else - PID=$(cat $PIDFILE) - echo "Stopping ..." - $CLIEXEC -p $SENTINELPORT shutdown - while [ -x /proc/${PID} ] - do - echo "Waiting for Sentinel to shutdown ..." - sleep 1 - done - echo "Sentinel stopped" - fi - ;; - *) - echo "Please use start or stop as first argument" - ;; -esac -- cgit v1.2.1 From bc81b7f7e3c11670a2f33ecb5df71f449410bd93 Mon Sep 17 00:00:00 2001 From: qingping209 Date: Sun, 30 Sep 2018 10:26:23 +0800 Subject: geodist may return none or float, response handler callback should be float_or_none --- redis/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 79e94d0..42881bc 100755 --- a/redis/client.py +++ b/redis/client.py @@ -370,7 +370,7 @@ class StrictRedis(object): int ), string_keys_to_dict( - 'INCRBYFLOAT HINCRBYFLOAT GEODIST', + 'INCRBYFLOAT HINCRBYFLOAT', float ), string_keys_to_dict( @@ -379,7 +379,7 @@ class StrictRedis(object): lambda r: isinstance(r, (long, int)) and r or nativestr(r) == 'OK' ), string_keys_to_dict('SORT', sort_return_tuples), - string_keys_to_dict('ZSCORE ZINCRBY', float_or_none), + string_keys_to_dict('ZSCORE ZINCRBY GEODIST', float_or_none), string_keys_to_dict( 'FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE RENAME ' 'SAVE SELECT SHUTDOWN SLAVEOF WATCH UNWATCH', -- cgit v1.2.1 From 66eefc6f52e9130e61dad251438352964946c786 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 5 Oct 2018 17:39:52 +0300 Subject: Adds ZPOPMAX and ZPOPMIN Signed-off-by: Itamar Haber --- .vscode/settings.json | 7 +++++++ redis/client.py | 18 +++++++++++++++++- tests/test_commands.py | 20 ++++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..fc1004a --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.venvPath": "${workspaceFolder}/.tox", + "python.linting.enabled": false, + "python.unitTest.unittestEnabled": false, + "python.unitTest.nosetestsEnabled": false, + "python.unitTest.pyTestEnabled": true +} \ No newline at end of file diff --git a/redis/client.py b/redis/client.py index 79e94d0..95e9438 100755 --- a/redis/client.py +++ b/redis/client.py @@ -391,7 +391,7 @@ class StrictRedis(object): lambda r: r and set(r) or set() ), string_keys_to_dict( - 'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', + 'ZPOPMAX ZPOPMIN ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', zset_score_pairs ), string_keys_to_dict('ZRANK ZREVRANK', int_or_none), @@ -1728,6 +1728,22 @@ class StrictRedis(object): """ return self.execute_command('ZLEXCOUNT', name, min, max) + def zpopmax(self, name, count=None): + "Remove and return up to ``count`` members with the highest scores from the sorted set ``name``" + args = (count is not None) and [count] or [] + options = { + 'withscores': True + } + return self.execute_command('ZPOPMAX', name, *args, **options) + + def zpopmin(self, name, count=None): + "Remove and return up to ``count`` members with the lowest scores from the sorted set ``name``" + args = (count is not None) and [count] or [] + options = { + 'withscores': True + } + return self.execute_command('ZPOPMIN', name, *args, **options) + def zrange(self, name, start, end, desc=False, withscores=False, score_cast_func=float): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index b9b9b66..a3e1a33 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -969,6 +969,26 @@ class TestRedisCommands(object): assert r.zrange('d', 0, -1, withscores=True) == \ [(b('a3'), 20), (b('a1'), 23)] + @skip_if_server_version_lt('4.9.0') + def test_zpopmax(self, r): + r.delete('a') + r.zadd('a', a1=1, a2=2, a3=3) + assert r.zpopmax('a') == [(b('a3'), 3)] + + # with count + assert r.zpopmax('a', count=2) == \ + [(b('a2'), 2), (b('a1'), 1)] + + @skip_if_server_version_lt('4.9.0') + def test_zpopmin(self, r): + r.delete('a') + r.zadd('a', a1=1, a2=2, a3=3) + assert r.zpopmin('a') == [(b('a1'), 1)] + + # with count + assert r.zpopmin('a', count=2) == \ + [(b('a2'), 2), (b('a3'), 3)] + def test_zrange(self, r): r.zadd('a', a1=1, a2=2, a3=3) assert r.zrange('a', 0, 1) == [b('a1'), b('a2')] -- cgit v1.2.1 From b2f61090f442468460d6968aee3ddea556b9b846 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 5 Oct 2018 17:59:05 +0300 Subject: Adds BZPOPMAX and BZPOPMIN Signed-off-by: Itamar Haber --- redis/client.py | 42 +++++++++++++++++++++++++++++++++++++++++- tests/test_commands.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 95e9438..535a59b 100755 --- a/redis/client.py +++ b/redis/client.py @@ -385,7 +385,7 @@ class StrictRedis(object): 'SAVE SELECT SHUTDOWN SLAVEOF WATCH UNWATCH', bool_ok ), - string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None), + string_keys_to_dict('BLPOP BRPOP BZPOPMIN BZPOPMAX', lambda r: r and tuple(r) or None), string_keys_to_dict( 'SDIFF SINTER SMEMBERS SUNION', lambda r: r and set(r) or set() @@ -1744,6 +1744,46 @@ class StrictRedis(object): } return self.execute_command('ZPOPMIN', name, *args, **options) + def bzpopmax(self, keys, timeout=0): + """ + ZPOPMAX a value off of the first non-empty sorted set + named in the ``keys`` list. + + If none of the sorted sets in ``keys`` has a value to ZPOPMAX, + then block for ``timeout`` seconds, or until a member gets added + to one of the sorted sets. + + If timeout is 0, then block indefinitely. + """ + if timeout is None: + timeout = 0 + if isinstance(keys, basestring): + keys = [keys] + else: + keys = list(keys) + keys.append(timeout) + return self.execute_command('BZPOPMAX', *keys) + + def bzpopmin(self, keys, timeout=0): + """ + ZPOPMIN a value off of the first non-empty sorted set + named in the ``keys`` list. + + If none of the sorted sets in ``keys`` has a value to ZPOPMIN, + then block for ``timeout`` seconds, or until a member gets added + to one of the sorted sets. + + If timeout is 0, then block indefinitely. + """ + if timeout is None: + timeout = 0 + if isinstance(keys, basestring): + keys = [keys] + else: + keys = list(keys) + keys.append(timeout) + return self.execute_command('BZPOPMIN', *keys) + def zrange(self, name, start, end, desc=False, withscores=False, score_cast_func=float): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index a3e1a33..1a5b426 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -989,6 +989,36 @@ class TestRedisCommands(object): assert r.zpopmin('a', count=2) == \ [(b('a2'), 2), (b('a3'), 3)] + @skip_if_server_version_lt('4.9.0') + def test_bzpopmax(self, r): + r.delete('a') + r.delete('b') + r.delete('c') + r.zadd('a', a1=1, a2=2) + r.zadd('b', b1=10, b2=20) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b2'), b('20')) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b1'), b('10')) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a2'), b('2')) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a1'), b('1')) + assert r.bzpopmax(['b', 'a'], timeout=1) is None + r.zadd('c', c1=100) + assert r.bzpopmax('c', timeout=1) == (b('c'), b('c1'), b('100')) + + @skip_if_server_version_lt('4.9.0') + def test_bzpopmin(self, r): + r.delete('a') + r.delete('b') + r.delete('c') + r.zadd('a', a1=1, a2=2) + r.zadd('b', b1=10, b2=20) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b1'), b('10')) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b2'), b('20')) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a1'), b('1')) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a2'), b('2')) + assert r.bzpopmin(['b', 'a'], timeout=1) is None + r.zadd('c', c1=100) + assert r.bzpopmin('c', timeout=1) == (b('c'), b('c1'), b('100')) + def test_zrange(self, r): r.zadd('a', a1=1, a2=2, a3=3) assert r.zrange('a', 0, 1) == [b('a1'), b('a2')] -- cgit v1.2.1 From 5ad0df5e61a90a14104c1174cce33743bae766fc Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 5 Oct 2018 18:04:53 +0300 Subject: Converts score from string to float Signed-off-by: Itamar Haber --- redis/client.py | 3 ++- tests/test_commands.py | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/redis/client.py b/redis/client.py index 535a59b..e5328c5 100755 --- a/redis/client.py +++ b/redis/client.py @@ -385,7 +385,7 @@ class StrictRedis(object): 'SAVE SELECT SHUTDOWN SLAVEOF WATCH UNWATCH', bool_ok ), - string_keys_to_dict('BLPOP BRPOP BZPOPMIN BZPOPMAX', lambda r: r and tuple(r) or None), + string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None), string_keys_to_dict( 'SDIFF SINTER SMEMBERS SUNION', lambda r: r and set(r) or set() @@ -394,6 +394,7 @@ class StrictRedis(object): 'ZPOPMAX ZPOPMIN ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', zset_score_pairs ), + string_keys_to_dict('BZPOPMIN BZPOPMAX', lambda r: r and (r[0], r[1], float(r[2])) or None), string_keys_to_dict('ZRANK ZREVRANK', int_or_none), string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True), { diff --git a/tests/test_commands.py b/tests/test_commands.py index 1a5b426..a470cad 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -996,13 +996,13 @@ class TestRedisCommands(object): r.delete('c') r.zadd('a', a1=1, a2=2) r.zadd('b', b1=10, b2=20) - assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b2'), b('20')) - assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b1'), b('10')) - assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a2'), b('2')) - assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a1'), b('1')) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b2'), 20) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b1'), 10) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a2'), 2) + assert r.bzpopmax(['b', 'a'], timeout=1) == (b('a'), b('a1'), 1) assert r.bzpopmax(['b', 'a'], timeout=1) is None r.zadd('c', c1=100) - assert r.bzpopmax('c', timeout=1) == (b('c'), b('c1'), b('100')) + assert r.bzpopmax('c', timeout=1) == (b('c'), b('c1'), 100) @skip_if_server_version_lt('4.9.0') def test_bzpopmin(self, r): @@ -1011,13 +1011,13 @@ class TestRedisCommands(object): r.delete('c') r.zadd('a', a1=1, a2=2) r.zadd('b', b1=10, b2=20) - assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b1'), b('10')) - assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b2'), b('20')) - assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a1'), b('1')) - assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a2'), b('2')) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b1'), 10) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b2'), 20) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a1'), 1) + assert r.bzpopmin(['b', 'a'], timeout=1) == (b('a'), b('a2'), 2) assert r.bzpopmin(['b', 'a'], timeout=1) is None r.zadd('c', c1=100) - assert r.bzpopmin('c', timeout=1) == (b('c'), b('c1'), b('100')) + assert r.bzpopmin('c', timeout=1) == (b('c'), b('c1'), 100) def test_zrange(self, r): r.zadd('a', a1=1, a2=2, a3=3) -- cgit v1.2.1 From b275920e86611ad6bf80d717dd5d327f47a8c374 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 5 Oct 2018 18:16:44 +0300 Subject: PEP8's it Signed-off-by: Itamar Haber --- redis/client.py | 21 ++++++++++++++------- tests/test_commands.py | 4 ++-- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/redis/client.py b/redis/client.py index e5328c5..5b86dc3 100755 --- a/redis/client.py +++ b/redis/client.py @@ -394,7 +394,8 @@ class StrictRedis(object): 'ZPOPMAX ZPOPMIN ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE', zset_score_pairs ), - string_keys_to_dict('BZPOPMIN BZPOPMAX', lambda r: r and (r[0], r[1], float(r[2])) or None), + string_keys_to_dict('BZPOPMIN BZPOPMAX', \ + lambda r: r and (r[0], r[1], float(r[2])) or None), string_keys_to_dict('ZRANK ZREVRANK', int_or_none), string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True), { @@ -1730,7 +1731,10 @@ class StrictRedis(object): return self.execute_command('ZLEXCOUNT', name, min, max) def zpopmax(self, name, count=None): - "Remove and return up to ``count`` members with the highest scores from the sorted set ``name``" + """ + Remove and return up to ``count`` members with the highest scores + from the sorted set ``name``. + """ args = (count is not None) and [count] or [] options = { 'withscores': True @@ -1738,7 +1742,10 @@ class StrictRedis(object): return self.execute_command('ZPOPMAX', name, *args, **options) def zpopmin(self, name, count=None): - "Remove and return up to ``count`` members with the lowest scores from the sorted set ``name``" + """ + Remove and return up to ``count`` members with the lowest scores + from the sorted set ``name``. + """ args = (count is not None) and [count] or [] options = { 'withscores': True @@ -1750,8 +1757,8 @@ class StrictRedis(object): ZPOPMAX a value off of the first non-empty sorted set named in the ``keys`` list. - If none of the sorted sets in ``keys`` has a value to ZPOPMAX, - then block for ``timeout`` seconds, or until a member gets added + If none of the sorted sets in ``keys`` has a value to ZPOPMAX, + then block for ``timeout`` seconds, or until a member gets added to one of the sorted sets. If timeout is 0, then block indefinitely. @@ -1770,8 +1777,8 @@ class StrictRedis(object): ZPOPMIN a value off of the first non-empty sorted set named in the ``keys`` list. - If none of the sorted sets in ``keys`` has a value to ZPOPMIN, - then block for ``timeout`` seconds, or until a member gets added + If none of the sorted sets in ``keys`` has a value to ZPOPMIN, + then block for ``timeout`` seconds, or until a member gets added to one of the sorted sets. If timeout is 0, then block indefinitely. diff --git a/tests/test_commands.py b/tests/test_commands.py index a470cad..c7c31c2 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -977,7 +977,7 @@ class TestRedisCommands(object): # with count assert r.zpopmax('a', count=2) == \ - [(b('a2'), 2), (b('a1'), 1)] + [(b('a2'), 2), (b('a1'), 1)] @skip_if_server_version_lt('4.9.0') def test_zpopmin(self, r): @@ -987,7 +987,7 @@ class TestRedisCommands(object): # with count assert r.zpopmin('a', count=2) == \ - [(b('a2'), 2), (b('a3'), 3)] + [(b('a2'), 2), (b('a3'), 3)] @skip_if_server_version_lt('4.9.0') def test_bzpopmax(self, r): -- cgit v1.2.1 From 6c4cd12425a6c840113696407b06e82424e87a72 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 5 Oct 2018 19:45:35 +0300 Subject: Removes errant committed file Signed-off-by: Itamar Haber --- .vscode/settings.json | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index fc1004a..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "python.venvPath": "${workspaceFolder}/.tox", - "python.linting.enabled": false, - "python.unitTest.unittestEnabled": false, - "python.unitTest.nosetestsEnabled": false, - "python.unitTest.pyTestEnabled": true -} \ No newline at end of file -- cgit v1.2.1 From d474b90bc63384d2e6ed84a90b043dccb6395b06 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 11 Oct 2018 22:53:10 -0700 Subject: Drop easy_install mentions These days everyone uses `pip`. Anyone who really needs `easy_install` will need to know far more than they'll discover in this simple readme. --- README.rst | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/README.rst b/README.rst index a21c13a..62b1c83 100644 --- a/README.rst +++ b/README.rst @@ -22,12 +22,6 @@ To install redis-py, simply: $ sudo pip install redis -or alternatively (you really should be using pip though): - -.. code-block:: bash - - $ sudo easy_install redis - or from source: .. code-block:: bash @@ -169,12 +163,6 @@ just like redis-py. $ pip install hiredis -or - -.. code-block:: bash - - $ easy_install hiredis - Response Callbacks ^^^^^^^^^^^^^^^^^^ -- cgit v1.2.1 From 39283c4b27367a7f93b4e0217e95ddc2e8dc6436 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 11 Oct 2018 22:54:57 -0700 Subject: "while 1" --> "while True" This is python, not C --- README.rst | 4 ++-- redis/client.py | 2 +- redis/lock.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.rst b/README.rst index a21c13a..0efafc7 100644 --- a/README.rst +++ b/README.rst @@ -276,7 +276,7 @@ could do something like this: .. code-block:: pycon >>> with r.pipeline() as pipe: - ... while 1: + ... while True: ... try: ... # put a WATCH on the key that holds our sequence value ... pipe.watch('OUR-SEQUENCE-KEY') @@ -309,7 +309,7 @@ explicitly calling reset(): .. code-block:: pycon >>> pipe = r.pipeline() - >>> while 1: + >>> while True: ... try: ... pipe.watch('OUR-SEQUENCE-KEY') ... ... diff --git a/redis/client.py b/redis/client.py index 79e94d0..27d5eff 100755 --- a/redis/client.py +++ b/redis/client.py @@ -579,7 +579,7 @@ class StrictRedis(object): value_from_callable = kwargs.pop('value_from_callable', False) watch_delay = kwargs.pop('watch_delay', None) with self.pipeline(True, shard_hint) as pipe: - while 1: + while True: try: if watches: pipe.watch(*watches) diff --git a/redis/lock.py b/redis/lock.py index 732568b..bc7e850 100644 --- a/redis/lock.py +++ b/redis/lock.py @@ -107,7 +107,7 @@ class Lock(object): stop_trying_at = None if blocking_timeout is not None: stop_trying_at = mod_time.time() + blocking_timeout - while 1: + while True: if self.do_acquire(token): self.local.token = token return True -- cgit v1.2.1 From b5ffff6d151e83688bc6a49051cccd2fd5517407 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Wed, 3 Oct 2018 17:20:51 +0300 Subject: pep8 was renamed to pycodestyle --- .travis.yml | 8 ++++---- setup.cfg | 2 +- tox.ini | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index fcbdbb9..31c24f3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,12 +14,12 @@ env: - TEST_HIREDIS=1 install: - pip install -e . - - "if [[ $TEST_PEP8 == '1' ]]; then pip install pep8; fi" + - "if [[ $TEST_PYCODESTYLE == '1' ]]; then pip install pycodestyle; fi" - "if [[ $TEST_HIREDIS == '1' ]]; then pip install hiredis; fi" -script: "if [[ $TEST_PEP8 == '1' ]]; then pep8 --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg .; else python setup.py test; fi" +script: "if [[ $TEST_PYCODESTYLE == '1' ]]; then pycodestyle --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg .; else python setup.py test; fi" matrix: include: - python: "2.7" - env: TEST_PEP8=1 + env: TEST_PYCODESTYLE=1 - python: "3.6" - env: TEST_PEP8=1 + env: TEST_PYCODESTYLE=1 diff --git a/setup.cfg b/setup.cfg index cdae7c4..089b37c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,4 +1,4 @@ -[pep8] +[pycodestyle] show-source = 1 exclude = .venv,.tox,dist,docs,build,*.egg diff --git a/tox.ini b/tox.ini index a0945d2..16b7a50 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] minversion = 1.8 -envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pep8 +envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pycodestyle [testenv] deps = @@ -9,9 +9,9 @@ deps = hiredis: hiredis >= 0.1.3 commands = py.test {posargs} -[testenv:pep8] +[testenv:pycodestyle] basepython = python2.6 -deps = pep8 -commands = pep8 +deps = pycodestyle +commands = pycodestyle skipsdist = true skip_install = true -- cgit v1.2.1 From f5a16bef38e9051d1b2d92aa8e16e71e1a6c1541 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Tue, 2 Oct 2018 15:35:13 +0300 Subject: run travis with redis 5 --- .travis.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 31c24f3..4c4e951 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,8 +7,9 @@ python: - "3.3" - "2.7" - "2.6" -services: - - redis-server +before_install: + - wget http://download.redis.io/releases/redis-5.0.0.tar.gz && mkdir redis_install && tar -xvzf redis-5.0.0.tar.gz -C redis_install && cd redis_install/redis-5.0.0 && make && src/redis-server --daemonize yes && cd ../.. + - redis-cli info env: - TEST_HIREDIS=0 - TEST_HIREDIS=1 @@ -16,7 +17,7 @@ install: - pip install -e . - "if [[ $TEST_PYCODESTYLE == '1' ]]; then pip install pycodestyle; fi" - "if [[ $TEST_HIREDIS == '1' ]]; then pip install hiredis; fi" -script: "if [[ $TEST_PYCODESTYLE == '1' ]]; then pycodestyle --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg .; else python setup.py test; fi" +script: "if [[ $TEST_PYCODESTYLE == '1' ]]; then pycodestyle --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg,redis_install .; else python setup.py test; fi" matrix: include: - python: "2.7" -- cgit v1.2.1 From 59f279cab86ea8f27b99d170556cd761163783f2 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Thu, 4 Oct 2018 13:52:35 +0300 Subject: gitignore .idea folder (used by jetbrains IDEs) --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 4736334..ab39968 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ vagrant/.vagrant .python-version .cache .eggs +.idea \ No newline at end of file -- cgit v1.2.1 From 832c296f73bcdcf51b420bb3683ff62d1b43767b Mon Sep 17 00:00:00 2001 From: Nick Farrell Date: Sat, 4 Nov 2017 12:57:27 +1100 Subject: Added support for Streams This includes: XADD, XREAD, XRANGE, XREVRANGE, XLEN. See http://antirez.com/news/114 for more information. Consumer groups is not yet supported, as its details are still being finalised upstream. --- redis/client.py | 134 ++++++++++++++++++++++++++++++++++++++++++++++++- tests/test_commands.py | 62 +++++++++++++++++++++++ 2 files changed, 195 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 79e94d0..95dc63f 100755 --- a/redis/client.py +++ b/redis/client.py @@ -232,6 +232,33 @@ def int_or_none(response): return int(response) +def stream_key(response): + return response + + +def stream_list(response): + if response is None: + return None + result = [] + for r in response: + kv_pairs = r[1] + kv_dict = dict() + while len(kv_pairs) > 1: + kv_dict[kv_pairs.pop()] = kv_pairs.pop() + result.append((r[0], kv_dict)) + + return result + + +def multi_stream_list(response): + if response is None: + return None + result = dict() + for r in response: + result[r[0].decode('utf-8')] = stream_list(r[1]) + return result + + def float_or_none(response): if response is None: return None @@ -366,9 +393,12 @@ class StrictRedis(object): 'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE ' 'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD ' 'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ' - 'GEOADD', + 'GEOADD XLEN', int ), + string_keys_to_dict('XADD', stream_key), + string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XREAD', multi_stream_list), string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1675,6 +1705,108 @@ class StrictRedis(object): args = list_or_args(keys, args) return self.execute_command('SUNIONSTORE', dest, *args) + # STREAMS COMMANDS + def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs): + """ + Add to a stream. + _name: name of the stream (not using 'name' as this would + prevent 'name' used in the kwargs + id: Location to insert this record. By default it is appended. + maxlen: truncate old stream members beyond this size + approximate: actual stream length may be slightly more than maxlen + **kwargs: key/value pairs to insert into the stream + + """ + pieces = [] + if maxlen is not None: + if not isinstance(maxlen, int) or maxlen < 1: + raise RedisError("XADD maxlen must be a positive integer") + pieces.append("MAXLEN") + if approximate: + pieces.append("~") + pieces.append(str(maxlen)) + pieces.append(id) + for pair in iteritems(kwargs): + pieces.append(pair[0]) + pieces.append(pair[1]) + return self.execute_command('XADD', _name, *pieces) + + def xrange(self, name, start='-', finish='+', count=None): + """ + Read stream values within an interval. + name: name of the stream. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XRANGE count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + return self.execute_command('XRANGE', name, *pieces) + + def xrevrange(self, name, start='+', finish='-', count=None): + """ + Read stream values within an interval, in reverse order. + name: name of the stream + start: first stream ID. defaults to '+', + meaning the latest available. + finish: last stream ID. defaults to '-', + meaning the earliest available. + count: if set, only return this many items, beginning with the + latest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XREVRANGE count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + return self.execute_command('XREVRANGE', name, *pieces) + + def xlen(self, name): + """ + Returns the number of elements in a given stream. + """ + return self.execute_command('XLEN', name) + + def xread(self, count=None, block=None, **streams): + """ + Block and monitor multiple streams for new data. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + **streams: a mapping of stream names to stream IDs, where + IDs indicate the last ID already seen. + """ + pieces = [] + if block is not None: + if not isinstance(block, int) or block < 1: + raise RedisError("XREAD block must be a positive integer") + pieces.append("BLOCK") + pieces.append(str(block)) + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XREAD count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + pieces.append("STREAMS") + ids = [] + for partial_stream in iteritems(streams): + pieces.append(partial_stream[0]) + ids.append(partial_stream[1]) + + pieces.extend(ids) + return self.execute_command('XREAD', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index b9b9b66..17b34c7 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1603,6 +1603,68 @@ class TestRedisCommands(object): class TestStrictCommands(object): + @skip_if_server_version_lt('6.0.0') + def test_strict_xrange(self, sr): + varname = 'xrange_test' + sr.delete(varname) + assert sr.xlen(varname) == 0 + stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4) + assert sr.xlen(varname) == 1 + stamp2 = sr.xadd(varname, name="baz", other="zab") + assert sr.xlen(varname) == 2 + assert stamp1 != stamp2 + + milli, offset = stamp2.decode('utf-8').split('-') + new_id = ("%s-0" % (milli + 10000)).encode('utf-8') + stamp3 = sr.xadd(varname, id=new_id, foo="bar") + assert sr.xlen(varname) == 3 + assert stamp3 == new_id + stamp4 = sr.xadd(varname, foo="baz") + assert sr.xlen(varname) == 4 + + def get_ids(results): + return [result[0] for result in results] + + results = sr.xrange(varname, start=stamp1) + assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4] + + results = sr.xrange(varname, start=stamp2, finish=stamp3) + assert get_ids(results) == [stamp2, stamp3] + + results = sr.xrange(varname, finish=stamp3) + assert get_ids(results) == [stamp1, stamp2, stamp3] + + results = sr.xrange(varname, finish=stamp2, count=1) + assert get_ids(results) == [stamp1] + + results = sr.xrevrange(varname, start=stamp1) + assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1] + + results = sr.xrevrange(varname, start=stamp3, finish=stamp2) + assert get_ids(results) == [stamp3, stamp2] + + results = sr.xrevrange(varname, finish=stamp3) + assert get_ids(results) == [stamp4, stamp3] + + results = sr.xrevrange(varname, finish=stamp2, count=1) + assert get_ids(results) == [stamp4] + + assert sr.xlen(varname) == 4 + + @skip_if_server_version_lt('5.0.0') + def test_strict_xread(self, sr): + varname = 'xread_test' + sr.delete(varname) + stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4) + stamp2 = sr.xadd(varname, name="baz", other="zab") + assert stamp1 != stamp2 + + results = sr.xread(varname='$', count=10, block=10) + assert results is None + + results = sr.xread(count=3, block=1000, **{varname: stamp1}) + assert results[varname][0][0] == stamp2 + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From 644de16c16bfef7ce80dde1d2d718d91a6eae308 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Tue, 2 Oct 2018 11:15:30 +0300 Subject: xread: block parameter may be set to zero, to block indefinitely --- redis/client.py | 4 ++-- tests/test_commands.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/redis/client.py b/redis/client.py index 95dc63f..570d94d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1788,8 +1788,8 @@ class StrictRedis(object): """ pieces = [] if block is not None: - if not isinstance(block, int) or block < 1: - raise RedisError("XREAD block must be a positive integer") + if not isinstance(block, int) or block < 0: + raise RedisError("XREAD block must be a non-negative integer") pieces.append("BLOCK") pieces.append(str(block)) if count is not None: diff --git a/tests/test_commands.py b/tests/test_commands.py index 17b34c7..c78de8b 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1662,7 +1662,7 @@ class TestStrictCommands(object): results = sr.xread(varname='$', count=10, block=10) assert results is None - results = sr.xread(count=3, block=1000, **{varname: stamp1}) + results = sr.xread(count=3, block=0, **{varname: stamp1}) assert results[varname][0][0] == stamp2 def test_strict_zadd(self, sr): -- cgit v1.2.1 From 2699f31ddf7923bcb027da44d4e94d89c7d25e8d Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Tue, 2 Oct 2018 15:00:50 +0300 Subject: Implement XGROUP --- redis/client.py | 42 ++++++++++++++++++++++++++++++++ tests/test_commands.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/redis/client.py b/redis/client.py index 570d94d..7f3f07d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -399,6 +399,12 @@ class StrictRedis(object): string_keys_to_dict('XADD', stream_key), string_keys_to_dict('XREVRANGE XRANGE', stream_list), string_keys_to_dict('XREAD', multi_stream_list), + { + 'XGROUP CREATE': bool_ok, + 'XGROUP DESTROY': int, + 'XGROUP SETID': bool_ok, + 'XGROUP DELCONSUMER': int + }, string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1807,6 +1813,42 @@ class StrictRedis(object): pieces.extend(ids) return self.execute_command('XREAD', *pieces) + def xgroup_create(self, name, groupname, id): + """ + Create a new consumer group associated with a stream. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP CREATE', name, groupname, id) + + def xgroup_destroy(self, name, groupname): + """ + Destroy a consumer group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XGROUP DESTROY', name, groupname) + + def xgroup_setid(self, name, groupname, id): + """ + Set the consumer group last delivered ID to something else. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP SETID', name, groupname, id) + + def xgroup_delconsumer(self, name, groupname, consumername): + """ + Remove a specific consumer from a consumer group. + Returns the number of pending messages that the consumer had before it was deleted. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to delete + """ + return self.execute_command('XGROUP DELCONSUMER', name, groupname, consumername) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index c78de8b..617a9d1 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1665,6 +1665,71 @@ class TestStrictCommands(object): results = sr.xread(count=3, block=0, **{varname: stamp1}) assert results[varname][0][0] == stamp2 + @skip_if_server_version_lt('5.0.0') + def test_strict_xgroup(self, sr): + stream_name = 'xgroup_test_stream' + sr.delete(stream_name) + group_name = 'xgroup_test_group' + try: + sr.xgroup_destroy(name=stream_name, groupname=group_name) + except redis.ResponseError: + pass + + with pytest.raises(redis.ResponseError): + sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + stamp1 = sr.xadd(stream_name, name="marco", other="polo") + assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') + assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') + + # TODO: test xgroup_delconsumer after implementing XREADGROUP + + assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 + + @skip_if_server_version_lt('4.9.105') + def test_strict_xack(self, sr): + stream_name = 'xack_test_stream' + sr.delete(stream_name) + group_name = 'xack_test_group' + + assert sr.xack(stream_name, group_name, 0) == 0 + assert sr.xack(stream_name, group_name, '1-1') == 0 + assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + + @skip_if_server_version_lt('4.9.105') + def test_strict_xdel(self, sr): + stream_name = 'xdel_test_stream' + sr.delete(stream_name) + + assert sr.xdel(stream_name, 1) == 0 + + sr.xadd(stream_name, id=1, foo='bar') + assert sr.xdel(stream_name, 1) == 1 + + stamp = sr.xadd(stream_name, baz='qaz') + assert sr.xdel(stream_name, 1, stamp) == 1 + assert sr.xdel(stream_name, 1, stamp, 42) == 0 + + @skip_if_server_version_lt('4.9.105') + def test_strict_xtrim(self, sr): + stream_name = 'xtrim_test_stream' + sr.delete(stream_name) + + assert sr.xtrim(stream_name, 1000) == 0 + + for i in range(300): + sr.xadd(stream_name, index=i) + + assert sr.xtrim(stream_name, 1000, approximate=False) == 0 + assert sr.xtrim(stream_name, 300) == 0 + assert sr.xtrim(stream_name, 299) == 0 + assert sr.xtrim(stream_name, 234) == 0 + assert sr.xtrim(stream_name, 234, approximate=False) == 66 + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From ce2231697d5dcd746f7b51b6a40b291b310e1f5f Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Tue, 2 Oct 2018 18:17:00 +0300 Subject: run tests of commands added in redis 5, if redis>=5 --- tests/test_commands.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 617a9d1..b89b0c7 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1603,7 +1603,7 @@ class TestRedisCommands(object): class TestStrictCommands(object): - @skip_if_server_version_lt('6.0.0') + @skip_if_server_version_lt('5.0.0') def test_strict_xrange(self, sr): varname = 'xrange_test' sr.delete(varname) @@ -1690,7 +1690,7 @@ class TestStrictCommands(object): assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 - @skip_if_server_version_lt('4.9.105') + @skip_if_server_version_lt('5.0.0') def test_strict_xack(self, sr): stream_name = 'xack_test_stream' sr.delete(stream_name) @@ -1700,7 +1700,7 @@ class TestStrictCommands(object): assert sr.xack(stream_name, group_name, '1-1') == 0 assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 - @skip_if_server_version_lt('4.9.105') + @skip_if_server_version_lt('5.0.0') def test_strict_xdel(self, sr): stream_name = 'xdel_test_stream' sr.delete(stream_name) @@ -1714,7 +1714,7 @@ class TestStrictCommands(object): assert sr.xdel(stream_name, 1, stamp) == 1 assert sr.xdel(stream_name, 1, stamp, 42) == 0 - @skip_if_server_version_lt('4.9.105') + @skip_if_server_version_lt('5.0.0') def test_strict_xtrim(self, sr): stream_name = 'xtrim_test_stream' sr.delete(stream_name) -- cgit v1.2.1 From 7efb71bd81e365e94669ad1b1fff65c5f83b0508 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Tue, 2 Oct 2018 18:27:27 +0300 Subject: Fixes to test_strict_xrange --- tests/test_commands.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index b89b0c7..e6b727a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1615,7 +1615,7 @@ class TestStrictCommands(object): assert stamp1 != stamp2 milli, offset = stamp2.decode('utf-8').split('-') - new_id = ("%s-0" % (milli + 10000)).encode('utf-8') + new_id = "{}-0".format(int(milli) + 10000).encode('utf-8') stamp3 = sr.xadd(varname, id=new_id, foo="bar") assert sr.xlen(varname) == 3 assert stamp3 == new_id @@ -1637,7 +1637,7 @@ class TestStrictCommands(object): results = sr.xrange(varname, finish=stamp2, count=1) assert get_ids(results) == [stamp1] - results = sr.xrevrange(varname, start=stamp1) + results = sr.xrevrange(varname, start=stamp4) assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1] results = sr.xrevrange(varname, start=stamp3, finish=stamp2) -- cgit v1.2.1 From f1ece6b139d7cf4a7900526ff1d53b1387d89c68 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Wed, 3 Oct 2018 11:33:24 +0300 Subject: Implements XINFO --- redis/client.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++- tests/test_commands.py | 18 +++++---- 2 files changed, 115 insertions(+), 10 deletions(-) diff --git a/redis/client.py b/redis/client.py index 7f3f07d..f3492da 100755 --- a/redis/client.py +++ b/redis/client.py @@ -250,6 +250,28 @@ def stream_list(response): return result +def parse_xinfo_dict(response): + if response is None: + return None + result = {} + while response: + k = response.pop(0) + v = response.pop(0) + if isinstance(v, list): + v = parse_xinfo_dict(v) + result[k] = v + return result + + +def parse_xinfo_list(response): + if response is None: + return None + result = [] + for group in response: + result.append(parse_xinfo_dict(group)) + return result + + def multi_stream_list(response): if response is None: return None @@ -405,6 +427,11 @@ class StrictRedis(object): 'XGROUP SETID': bool_ok, 'XGROUP DELCONSUMER': int }, + { + 'XINFO STREAM': parse_xinfo_dict, + 'XINFO CONSUMERS': parse_xinfo_list, + 'XINFO GROUPS': parse_xinfo_list + }, string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1842,12 +1869,88 @@ class StrictRedis(object): def xgroup_delconsumer(self, name, groupname, consumername): """ Remove a specific consumer from a consumer group. - Returns the number of pending messages that the consumer had before it was deleted. + Returns the number of pending messages that the consumer had before it + was deleted. name: name of the stream. groupname: name of the consumer group. consumername: name of consumer to delete """ - return self.execute_command('XGROUP DELCONSUMER', name, groupname, consumername) + return self.execute_command('XGROUP DELCONSUMER', name, groupname, + consumername) + + def xinfo_stream(self, name): + """ + Returns general information about the stream. + name: name of the stream. + """ + return self.execute_command('XINFO STREAM', name) + + def xinfo_consumers(self, name, groupname): + """ + Returns general information about the consumers in the group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) + + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + + def xdel(self, name, *ids): + """ + Deletes one or more messages from a stream. + name: name of the stream. + *ids: message ids to delete. + """ + return self.execute_command('XDEL', name, *ids) + + def xtrim(self, name, maxlen, approximate=True): + """ + Trims old messages from a stream. + name: name of the stream. + maxlen: truncate old stream messages beyond this size + approximate: actual stream length may be slightly more than maxlen + """ + pieces = ['MAXLEN'] + if approximate: + pieces.append('~') + pieces.append(maxlen) + return self.execute_command('XTRIM', name, *pieces) + + def xinfo_stream(self, name): + """ + Returns general information about the stream. + name: name of the stream. + """ + return self.execute_command('XINFO STREAM', name) + + def xinfo_consumers(self, name, groupname): + """ + Returns general information about the consumers in the group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): diff --git a/tests/test_commands.py b/tests/test_commands.py index e6b727a..946934d 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1615,7 +1615,7 @@ class TestStrictCommands(object): assert stamp1 != stamp2 milli, offset = stamp2.decode('utf-8').split('-') - new_id = "{}-0".format(int(milli) + 10000).encode('utf-8') + new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8') stamp3 = sr.xadd(varname, id=new_id, foo="bar") assert sr.xlen(varname) == 3 assert stamp3 == new_id @@ -1670,21 +1670,23 @@ class TestStrictCommands(object): stream_name = 'xgroup_test_stream' sr.delete(stream_name) group_name = 'xgroup_test_group' - try: - sr.xgroup_destroy(name=stream_name, groupname=group_name) - except redis.ResponseError: - pass - with pytest.raises(redis.ResponseError): - sr.xgroup_create(name=stream_name, groupname=group_name, id='$') - stamp1 = sr.xadd(stream_name, name="marco", other="polo") + stamp1 = sr.xadd(stream_name, name="boaty", other="mcboatface") + assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] + + assert sr.xinfo_groups(name=stream_name) == [] assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) with pytest.raises(redis.ResponseError): sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') with pytest.raises(redis.ResponseError): sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') + assert sr.xinfo_groups(name=stream_name)[0][b('last-delivered-id')] \ + == b(stamp1) assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') + assert sr.xinfo_groups(name=stream_name)[0][b('last-delivered-id')]\ + == b('0-0') # TODO: test xgroup_delconsumer after implementing XREADGROUP -- cgit v1.2.1 From 1f8c69cfcd6c0fad819fc78777b56873ec8d26dc Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Mon, 15 Oct 2018 20:37:55 +0300 Subject: Implements XACK --- redis/client.py | 12 +++++++++++- tests/test_commands.py | 10 ++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index f3492da..410cdac 100755 --- a/redis/client.py +++ b/redis/client.py @@ -426,12 +426,13 @@ class StrictRedis(object): 'XGROUP DESTROY': int, 'XGROUP SETID': bool_ok, 'XGROUP DELCONSUMER': int - }, + }, { 'XINFO STREAM': parse_xinfo_dict, 'XINFO CONSUMERS': parse_xinfo_list, 'XINFO GROUPS': parse_xinfo_list }, + string_keys_to_dict('XACK', int), string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1952,6 +1953,15 @@ class StrictRedis(object): """ return self.execute_command('XINFO GROUPS', name) + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index 946934d..1f70230 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1732,6 +1732,16 @@ class TestStrictCommands(object): assert sr.xtrim(stream_name, 234) == 0 assert sr.xtrim(stream_name, 234, approximate=False) == 66 + @skip_if_server_version_lt('5.0.0') + def test_strict_xack(self, sr): + stream_name = 'xack_test_stream' + sr.delete(stream_name) + group_name = 'xack_test_group' + + assert sr.xack(stream_name, group_name, 0) == 0 + assert sr.xack(stream_name, group_name, '1-1') == 0 + assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From 2a15e07f1a8aec26d287ec31ed330f598a1fca00 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Mon, 15 Oct 2018 20:52:33 +0300 Subject: Implements XDEL --- redis/client.py | 13 ++++++++++++- tests/test_commands.py | 14 ++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 410cdac..45b9879 100755 --- a/redis/client.py +++ b/redis/client.py @@ -432,7 +432,10 @@ class StrictRedis(object): 'XINFO CONSUMERS': parse_xinfo_list, 'XINFO GROUPS': parse_xinfo_list }, - string_keys_to_dict('XACK', int), + string_keys_to_dict( + 'XACK XDEL', + int + ), string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1962,6 +1965,14 @@ class StrictRedis(object): """ return self.execute_command('XACK', name, groupname, *ids) + def xdel(self, name, *ids): + """ + Deletes one or more messages from a stream. + name: name of the stream. + *ids: message ids to delete. + """ + return self.execute_command('XDEL', name, *ids) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index 1f70230..ab9c9c9 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1742,6 +1742,20 @@ class TestStrictCommands(object): assert sr.xack(stream_name, group_name, '1-1') == 0 assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + @skip_if_server_version_lt('5.0.0') + def test_strict_xdel(self, sr): + stream_name = 'xdel_test_stream' + sr.delete(stream_name) + + assert sr.xdel(stream_name, 1) == 0 + + sr.xadd(stream_name, id=1, foo='bar') + assert sr.xdel(stream_name, 1) == 1 + + stamp = sr.xadd(stream_name, baz='qaz') + assert sr.xdel(stream_name, 1, stamp) == 1 + assert sr.xdel(stream_name, 1, stamp, 42) == 0 + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From f5f9bef17dabcacd3efdd9229af6fe9701628788 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Mon, 15 Oct 2018 21:24:31 +0300 Subject: Implements XTRIM --- redis/client.py | 15 ++++++++++++++- tests/test_commands.py | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 45b9879..136f36d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -433,7 +433,7 @@ class StrictRedis(object): 'XINFO GROUPS': parse_xinfo_list }, string_keys_to_dict( - 'XACK XDEL', + 'XACK XDEL XTRIM', int ), string_keys_to_dict( @@ -1973,6 +1973,19 @@ class StrictRedis(object): """ return self.execute_command('XDEL', name, *ids) + def xtrim(self, name, maxlen, approximate=True): + """ + Trims old messages from a stream. + name: name of the stream. + maxlen: truncate old stream messages beyond this size + approximate: actual stream length may be slightly more than maxlen + """ + pieces = ['MAXLEN'] + if approximate: + pieces.append('~') + pieces.append(maxlen) + return self.execute_command('XTRIM', name, *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index ab9c9c9..997d928 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1756,6 +1756,22 @@ class TestStrictCommands(object): assert sr.xdel(stream_name, 1, stamp) == 1 assert sr.xdel(stream_name, 1, stamp, 42) == 0 + @skip_if_server_version_lt('5.0.0') + def test_strict_xtrim(self, sr): + stream_name = 'xtrim_test_stream' + sr.delete(stream_name) + + assert sr.xtrim(stream_name, 1000) == 0 + + for i in range(300): + sr.xadd(stream_name, index=i) + + assert sr.xtrim(stream_name, 1000, approximate=False) == 0 + assert sr.xtrim(stream_name, 300) == 0 + assert sr.xtrim(stream_name, 299) == 0 + assert sr.xtrim(stream_name, 234) == 0 + assert sr.xtrim(stream_name, 234, approximate=False) == 66 + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From 227dc28c34f610173995d7aee2f716f95225f2af Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 15 Oct 2018 15:40:27 +0300 Subject: Implements XREADGROUP --- redis/client.py | 36 +++++++++++++++++++++++++++++++++++- tests/test_commands.py | 22 ++++++++++++++++------ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/redis/client.py b/redis/client.py index 136f36d..fe0de50 100755 --- a/redis/client.py +++ b/redis/client.py @@ -420,7 +420,7 @@ class StrictRedis(object): ), string_keys_to_dict('XADD', stream_key), string_keys_to_dict('XREVRANGE XRANGE', stream_list), - string_keys_to_dict('XREAD', multi_stream_list), + string_keys_to_dict('XREAD XREADGROUP', multi_stream_list), { 'XGROUP CREATE': bool_ok, 'XGROUP DESTROY': int, @@ -1986,6 +1986,40 @@ class StrictRedis(object): pieces.append(maxlen) return self.execute_command('XTRIM', name, *pieces) + def xreadgroup(self, groupname, consumername, count=None, block=None, + **streams): + """ + Read from a stream via a consumer group. + groupname: name of the consumer group. + consumername: name of the requesting consumer. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + **streams: a mapping of stream names to stream IDs, where + IDs indicate the last ID already seen. + """ + if streams is None: + streams = {} + pieces = ['GROUP', groupname, consumername] + if block is not None: + if not isinstance(block, int) or block < 0: + raise RedisError("XREAD block must be a non-negative integer") + pieces.append("BLOCK") + pieces.append(str(block)) + if count is not None: + if not isinstance(count, int) or count < 1: + raise RedisError("XREAD count must be a positive integer") + pieces.append("COUNT") + pieces.append(str(count)) + + pieces.append("STREAMS") + ids = [] + for partial_stream in iteritems(streams): + pieces.append(partial_stream[0]) + ids.append(partial_stream[1]) + pieces.extend(ids) + return self.execute_command('XREADGROUP', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index 997d928..4964e78 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1670,8 +1670,10 @@ class TestStrictCommands(object): stream_name = 'xgroup_test_stream' sr.delete(stream_name) group_name = 'xgroup_test_group' + message = {'name': 'boaty', 'other': 'mcboatface'} + b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} - stamp1 = sr.xadd(stream_name, name="boaty", other="mcboatface") + stamp1 = sr.xadd(stream_name, **message) assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] assert sr.xinfo_groups(name=stream_name) == [] @@ -1682,13 +1684,21 @@ class TestStrictCommands(object): sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') with pytest.raises(redis.ResponseError): sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') - assert sr.xinfo_groups(name=stream_name)[0][b('last-delivered-id')] \ - == b(stamp1) + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b(stamp1) assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') - assert sr.xinfo_groups(name=stream_name)[0][b('last-delivered-id')]\ - == b('0-0') + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b('0-0') - # TODO: test xgroup_delconsumer after implementing XREADGROUP + consumer_name = 'captain_jack_sparrow' + + assert sr.xreadgroup(groupname=group_name, consumername=consumer_name, + **{stream_name: '0'}) == { + stream_name: [(b(stamp1), b_message)]} + + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 + sr.xgroup_delconsumer(stream_name, group_name, consumer_name) + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 -- cgit v1.2.1 From a8c998d41cee1f9100508082627a29db08f2c428 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 15 Oct 2018 16:30:26 +0300 Subject: Implements XPENDING --- redis/client.py | 45 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/redis/client.py b/redis/client.py index fe0de50..9600bc7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -250,7 +250,7 @@ def stream_list(response): return result -def parse_xinfo_dict(response): +def parse_recursive_dict(response): if response is None: return None result = {} @@ -258,20 +258,26 @@ def parse_xinfo_dict(response): k = response.pop(0) v = response.pop(0) if isinstance(v, list): - v = parse_xinfo_dict(v) + v = parse_recursive_dict(v) result[k] = v return result -def parse_xinfo_list(response): +def parse_list_of_recursive_dicts(response): if response is None: return None result = [] for group in response: - result.append(parse_xinfo_dict(group)) + result.append(parse_recursive_dict(group)) return result +def parse_xclaim(response): + if isinstance(response, str): + return response + return stream_list(response) + + def multi_stream_list(response): if response is None: return None @@ -428,10 +434,11 @@ class StrictRedis(object): 'XGROUP DELCONSUMER': int }, { - 'XINFO STREAM': parse_xinfo_dict, - 'XINFO CONSUMERS': parse_xinfo_list, - 'XINFO GROUPS': parse_xinfo_list + 'XINFO STREAM': parse_recursive_dict, + 'XINFO CONSUMERS': parse_list_of_recursive_dicts, + 'XINFO GROUPS': parse_list_of_recursive_dicts }, + string_keys_to_dict('XCLAIM', parse_xclaim), string_keys_to_dict( 'XACK XDEL XTRIM', int @@ -2020,6 +2027,30 @@ class StrictRedis(object): pieces.extend(ids) return self.execute_command('XREADGROUP', *pieces) + def xpending(self, name, groupname, start=None, end=None, count=None, + consumername=None): + """ + Returns information about pending messages. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer to filter by (optional). + """ + pieces = [name, groupname] + if start is not None or end is not None or count is not None: + if start is None or end is None or count is None: + raise RedisError("XPENDING must be provided with start, end " + "and count parameters, or none of them. ") + if not isinstance(count, int) or count < 1: + raise RedisError("XPENDING count must be a positive integer") + pieces.extend((start, end, str(count))) + if consumername is not None: + if start is None or end is None or count is None: + raise RedisError("if XPENDING is provided with consumername," + " it must be provided with start, end and" + " count parameters") + pieces.append(consumername) + return self.execute_command('XPENDING', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ -- cgit v1.2.1 From b187aac2df98e1347e084a2901995f0b1a132aa2 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Tue, 16 Oct 2018 13:24:21 +0300 Subject: Implements XCLAIM --- redis/client.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/redis/client.py b/redis/client.py index 9600bc7..a66470d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -326,7 +326,6 @@ def parse_zscan(response, **options): it = iter(r) return long(cursor), list(izip(it, imap(score_cast_func, it))) - def parse_slowlog_get(response, **options): return [{ 'id': item[0], @@ -2008,17 +2007,17 @@ class StrictRedis(object): if streams is None: streams = {} pieces = ['GROUP', groupname, consumername] - if block is not None: - if not isinstance(block, int) or block < 0: - raise RedisError("XREAD block must be a non-negative integer") - pieces.append("BLOCK") - pieces.append(str(block)) if count is not None: if not isinstance(count, int) or count < 1: - raise RedisError("XREAD count must be a positive integer") + raise RedisError("XREADGROUP count must be a positive integer") pieces.append("COUNT") pieces.append(str(count)) - + if block is not None: + if not isinstance(block, int) or block < 0: + raise RedisError("XREADGROUP block must be a non-negative " + "integer") + pieces.append("BLOCK") + pieces.append(str(block)) pieces.append("STREAMS") ids = [] for partial_stream in iteritems(streams): @@ -2051,6 +2050,58 @@ class StrictRedis(object): pieces.append(consumername) return self.execute_command('XPENDING', *pieces) + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, + idle=None, time=None, retrycount=None, force=False, + justid=False): + """ + Changes the ownership of a pending message. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds + message_ids: non-empty list or tuple of message IDs to claim + idle: optional. Set the idle time (last time it was delivered) of the + message in ms + time: optional integer. This is the same as idle but instead of a + relative amount of milliseconds, it sets the idle time to a specific + Unix time (in milliseconds). + retrycount: optional integer. set the retry counter to the specified + value. This counter is incremented every time a message is delivered + again. + force: optional boolean, false by default. Creates the pending message + entry in the PEL even if certain specified IDs are not already in the + PEL assigned to a different client. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message + """ + if not isinstance(min_idle_time, int) or min_idle_time < 0: + raise RedisError("XCLAIM min_idle_time must be a non negative " + "integer") + if not isinstance(message_ids, (list, tuple)) or not message_ids: + raise RedisError("XCLAIM message_ids must be a non empty list or " + "tuple of message IDs to claim") + + pieces = [name, groupname, consumername, str(min_idle_time)] + pieces.extend(list(message_ids)) + + optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'} + for param, param_name in optional_ints.items(): + if param is not None: + if not isinstance(param, int): + raise RedisError("XCLAIM {} must be an integer" + .format(param_name)) + pieces.append(str(param)) + + optional_bools = {force: 'force', justid: 'justid'} + for param, param_name in optional_bools.items(): + if param: + if not isinstance(param, bool): + raise RedisError("XCLAIM {} must be a boolean" + .format(param_name)) + pieces.append(param_name.upper()) + return self.execute_command('XCLAIM', *pieces) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ -- cgit v1.2.1 From 1f2eaf31f29e6003179404b23c6ae4ab187595b4 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Mon, 15 Oct 2018 21:26:56 +0300 Subject: Style: use single ticks instead of double quotes for strings --- redis/client.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/redis/client.py b/redis/client.py index a66470d..ed57ffb 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1763,10 +1763,10 @@ class StrictRedis(object): pieces = [] if maxlen is not None: if not isinstance(maxlen, int) or maxlen < 1: - raise RedisError("XADD maxlen must be a positive integer") - pieces.append("MAXLEN") + raise RedisError('XADD maxlen must be a positive integer') + pieces.append('MAXLEN') if approximate: - pieces.append("~") + pieces.append('~') pieces.append(str(maxlen)) pieces.append(id) for pair in iteritems(kwargs): @@ -1788,8 +1788,8 @@ class StrictRedis(object): pieces = [start, finish] if count is not None: if not isinstance(count, int) or count < 1: - raise RedisError("XRANGE count must be a positive integer") - pieces.append("COUNT") + raise RedisError('XRANGE count must be a positive integer') + pieces.append('COUNT') pieces.append(str(count)) return self.execute_command('XRANGE', name, *pieces) @@ -1808,8 +1808,8 @@ class StrictRedis(object): pieces = [start, finish] if count is not None: if not isinstance(count, int) or count < 1: - raise RedisError("XREVRANGE count must be a positive integer") - pieces.append("COUNT") + raise RedisError('XREVRANGE count must be a positive integer') + pieces.append('COUNT') pieces.append(str(count)) return self.execute_command('XREVRANGE', name, *pieces) @@ -1832,16 +1832,16 @@ class StrictRedis(object): pieces = [] if block is not None: if not isinstance(block, int) or block < 0: - raise RedisError("XREAD block must be a non-negative integer") - pieces.append("BLOCK") + raise RedisError('XREAD block must be a non-negative integer') + pieces.append('BLOCK') pieces.append(str(block)) if count is not None: if not isinstance(count, int) or count < 1: - raise RedisError("XREAD count must be a positive integer") - pieces.append("COUNT") + raise RedisError('XREAD count must be a positive integer') + pieces.append('COUNT') pieces.append(str(count)) - pieces.append("STREAMS") + pieces.append('STREAMS') ids = [] for partial_stream in iteritems(streams): pieces.append(partial_stream[0]) -- cgit v1.2.1 From 07eacf3ba5e5f87bafe992a51e04971ee0fd8b12 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Wed, 17 Oct 2018 15:28:23 +0300 Subject: pycodestyle changes in benchmarking --- benchmarks/basic_operations.py | 1 + benchmarks/command_packer_benchmark.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/benchmarks/basic_operations.py b/benchmarks/basic_operations.py index 2f1e9f4..c50a610 100644 --- a/benchmarks/basic_operations.py +++ b/benchmarks/basic_operations.py @@ -195,5 +195,6 @@ def hmset(conn, num, pipeline_size, data_size): if pipeline_size > 1: conn.execute() + if __name__ == '__main__': run() diff --git a/benchmarks/command_packer_benchmark.py b/benchmarks/command_packer_benchmark.py index 49f48f2..9eb1853 100644 --- a/benchmarks/command_packer_benchmark.py +++ b/benchmarks/command_packer_benchmark.py @@ -22,9 +22,9 @@ class StringJoiningConnection(Connection): _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def pack_command(self, *args): "Pack a series of arguments into a value Redis command" @@ -54,9 +54,9 @@ class ListJoiningConnection(Connection): _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def pack_command(self, *args): output = [] -- cgit v1.2.1 From 04784cb2ecb0847198aaf64102e5129868c17da6 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Wed, 17 Oct 2018 15:36:04 +0300 Subject: pycodestyle fixes in client --- redis/_compat.py | 2 +- redis/client.py | 3 ++- redis/connection.py | 10 +++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/redis/_compat.py b/redis/_compat.py index 307f3cc..32063e7 100644 --- a/redis/_compat.py +++ b/redis/_compat.py @@ -4,7 +4,7 @@ import sys try: InterruptedError = InterruptedError -except: +except NameError: InterruptedError = OSError # For Python older than 3.5, retry EINTR. diff --git a/redis/client.py b/redis/client.py index ed57ffb..a69b847 100755 --- a/redis/client.py +++ b/redis/client.py @@ -195,7 +195,7 @@ def pairs_to_dict_typed(response, type_info): if key in type_info: try: value = type_info[key](value) - except: + except Exception: # if for some reason the value can't be coerced, just use # the string value pass @@ -326,6 +326,7 @@ def parse_zscan(response, **options): it = iter(r) return long(cursor), list(izip(it, imap(score_cast_func, it))) + def parse_slowlog_get(response, **options): return [{ 'id': item[0], diff --git a/redis/connection.py b/redis/connection.py index 6ad467a..1190260 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -236,7 +236,7 @@ class SocketBuffer(object): try: self.purge() self._buffer.close() - except: + except Exception: # issue #633 suggests the purge/close somehow raised a # BadFileDescriptor error. Perhaps the client ran out of # memory or something else? It's probably OK to ignore @@ -602,9 +602,9 @@ class Connection(object): errmsg = e.args[1] raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg)) - except: + except Exception as e: self.disconnect() - raise + raise e def send_command(self, *args): "Pack and send a command to the Redis server" @@ -623,9 +623,9 @@ class Connection(object): "Read the response from a previously sent command" try: response = self._parser.read_response() - except: + except Exception as e: self.disconnect() - raise + raise e if isinstance(response, ResponseError): raise response return response -- cgit v1.2.1 From 3d24b463dc44d74dec30cffc4c5a23c566975cea Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Sun, 28 Oct 2018 09:09:31 +0200 Subject: pycodestyle requires python 2.7 or greater Co-Authored-By: RoeyPrat --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 16b7a50..ac086ed 100644 --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,7 @@ deps = commands = py.test {posargs} [testenv:pycodestyle] -basepython = python2.6 +basepython = python3.6 deps = pycodestyle commands = pycodestyle skipsdist = true -- cgit v1.2.1 From 9984fa9cbaea7d6380df090dfdc036042948dad1 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 09:31:48 +0200 Subject: XADD key/value pairs of the entry should be specified as a single dict arg rather than kwargs --- redis/client.py | 8 +++++--- tests/test_commands.py | 28 ++++++++++++++-------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/redis/client.py b/redis/client.py index a69b847..5ee99fa 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1750,15 +1750,15 @@ class StrictRedis(object): return self.execute_command('SUNIONSTORE', dest, *args) # STREAMS COMMANDS - def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs): + def xadd(self, _name, fields, id='*', maxlen=None, approximate=True): """ Add to a stream. _name: name of the stream (not using 'name' as this would prevent 'name' used in the kwargs + fields: dict of field/value pairs to insert into the stream id: Location to insert this record. By default it is appended. maxlen: truncate old stream members beyond this size approximate: actual stream length may be slightly more than maxlen - **kwargs: key/value pairs to insert into the stream """ pieces = [] @@ -1770,7 +1770,9 @@ class StrictRedis(object): pieces.append('~') pieces.append(str(maxlen)) pieces.append(id) - for pair in iteritems(kwargs): + if not isinstance(fields, dict) or len(fields) == 0: + raise RedisError('XADD fields must be a non-empty dict') + for pair in iteritems(fields): pieces.append(pair[0]) pieces.append(pair[1]) return self.execute_command('XADD', _name, *pieces) diff --git a/tests/test_commands.py b/tests/test_commands.py index 4964e78..70dcbfa 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1608,18 +1608,18 @@ class TestStrictCommands(object): varname = 'xrange_test' sr.delete(varname) assert sr.xlen(varname) == 0 - stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4) + stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) assert sr.xlen(varname) == 1 - stamp2 = sr.xadd(varname, name="baz", other="zab") + stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) assert sr.xlen(varname) == 2 assert stamp1 != stamp2 milli, offset = stamp2.decode('utf-8').split('-') new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8') - stamp3 = sr.xadd(varname, id=new_id, foo="bar") + stamp3 = sr.xadd(varname, {"foo": "bar"}, id=new_id) assert sr.xlen(varname) == 3 assert stamp3 == new_id - stamp4 = sr.xadd(varname, foo="baz") + stamp4 = sr.xadd(varname, {"foo": "baz"}) assert sr.xlen(varname) == 4 def get_ids(results): @@ -1655,15 +1655,15 @@ class TestStrictCommands(object): def test_strict_xread(self, sr): varname = 'xread_test' sr.delete(varname) - stamp1 = sr.xadd(varname, name="bar", other="rab", maxlen=4) - stamp2 = sr.xadd(varname, name="baz", other="zab") + stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) + stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) assert stamp1 != stamp2 results = sr.xread(varname='$', count=10, block=10) assert results is None results = sr.xread(count=3, block=0, **{varname: stamp1}) - assert results[varname][0][0] == stamp2 + assert results[0][1][0][0] == stamp2 @skip_if_server_version_lt('5.0.0') def test_strict_xgroup(self, sr): @@ -1673,7 +1673,7 @@ class TestStrictCommands(object): message = {'name': 'boaty', 'other': 'mcboatface'} b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} - stamp1 = sr.xadd(stream_name, **message) + stamp1 = sr.xadd(stream_name, message) assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] assert sr.xinfo_groups(name=stream_name) == [] @@ -1719,10 +1719,10 @@ class TestStrictCommands(object): assert sr.xdel(stream_name, 1) == 0 - sr.xadd(stream_name, id=1, foo='bar') + sr.xadd(stream_name, {"foo": "bar"}, id=1) assert sr.xdel(stream_name, 1) == 1 - stamp = sr.xadd(stream_name, baz='qaz') + stamp = sr.xadd(stream_name, {"baz": "qaz"}) assert sr.xdel(stream_name, 1, stamp) == 1 assert sr.xdel(stream_name, 1, stamp, 42) == 0 @@ -1734,7 +1734,7 @@ class TestStrictCommands(object): assert sr.xtrim(stream_name, 1000) == 0 for i in range(300): - sr.xadd(stream_name, index=i) + sr.xadd(stream_name, {"index": i}) assert sr.xtrim(stream_name, 1000, approximate=False) == 0 assert sr.xtrim(stream_name, 300) == 0 @@ -1759,10 +1759,10 @@ class TestStrictCommands(object): assert sr.xdel(stream_name, 1) == 0 - sr.xadd(stream_name, id=1, foo='bar') + sr.xadd(stream_name, {"foo": "bar"}, id=1) assert sr.xdel(stream_name, 1) == 1 - stamp = sr.xadd(stream_name, baz='qaz') + stamp = sr.xadd(stream_name, {"baz": "qaz"}) assert sr.xdel(stream_name, 1, stamp) == 1 assert sr.xdel(stream_name, 1, stamp, 42) == 0 @@ -1774,7 +1774,7 @@ class TestStrictCommands(object): assert sr.xtrim(stream_name, 1000) == 0 for i in range(300): - sr.xadd(stream_name, index=i) + sr.xadd(stream_name, {"index": i}) assert sr.xtrim(stream_name, 1000, approximate=False) == 0 assert sr.xtrim(stream_name, 300) == 0 -- cgit v1.2.1 From abbd97050a7cb3e687c3ab23d6fc550e5f197b8d Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 09:41:00 +0200 Subject: for python 2/3 compat we need isinstance to check both int and long --- redis/client.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/redis/client.py b/redis/client.py index 5ee99fa..842f5e9 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1763,7 +1763,7 @@ class StrictRedis(object): """ pieces = [] if maxlen is not None: - if not isinstance(maxlen, int) or maxlen < 1: + if not isinstance(maxlen, (int, long)) or maxlen < 1: raise RedisError('XADD maxlen must be a positive integer') pieces.append('MAXLEN') if approximate: @@ -1790,7 +1790,7 @@ class StrictRedis(object): """ pieces = [start, finish] if count is not None: - if not isinstance(count, int) or count < 1: + if not isinstance(count, (int, long)) or count < 1: raise RedisError('XRANGE count must be a positive integer') pieces.append('COUNT') pieces.append(str(count)) @@ -1810,7 +1810,7 @@ class StrictRedis(object): """ pieces = [start, finish] if count is not None: - if not isinstance(count, int) or count < 1: + if not isinstance(count, (int, long)) or count < 1: raise RedisError('XREVRANGE count must be a positive integer') pieces.append('COUNT') pieces.append(str(count)) @@ -1834,12 +1834,12 @@ class StrictRedis(object): """ pieces = [] if block is not None: - if not isinstance(block, int) or block < 0: + if not isinstance(block, (int, long)) or block < 0: raise RedisError('XREAD block must be a non-negative integer') pieces.append('BLOCK') pieces.append(str(block)) if count is not None: - if not isinstance(count, int) or count < 1: + if not isinstance(count, (int, long)) or count < 1: raise RedisError('XREAD count must be a positive integer') pieces.append('COUNT') pieces.append(str(count)) @@ -2011,12 +2011,12 @@ class StrictRedis(object): streams = {} pieces = ['GROUP', groupname, consumername] if count is not None: - if not isinstance(count, int) or count < 1: + if not isinstance(count, (int, long)) or count < 1: raise RedisError("XREADGROUP count must be a positive integer") pieces.append("COUNT") pieces.append(str(count)) if block is not None: - if not isinstance(block, int) or block < 0: + if not isinstance(block, (int, long)) or block < 0: raise RedisError("XREADGROUP block must be a non-negative " "integer") pieces.append("BLOCK") @@ -2042,7 +2042,7 @@ class StrictRedis(object): if start is None or end is None or count is None: raise RedisError("XPENDING must be provided with start, end " "and count parameters, or none of them. ") - if not isinstance(count, int) or count < 1: + if not isinstance(count, (int, long)) or count < 1: raise RedisError("XPENDING count must be a positive integer") pieces.extend((start, end, str(count))) if consumername is not None: @@ -2078,7 +2078,7 @@ class StrictRedis(object): justid: optional boolean, false by default. Return just an array of IDs of messages successfully claimed, without returning the actual message """ - if not isinstance(min_idle_time, int) or min_idle_time < 0: + if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0: raise RedisError("XCLAIM min_idle_time must be a non negative " "integer") if not isinstance(message_ids, (list, tuple)) or not message_ids: @@ -2091,7 +2091,7 @@ class StrictRedis(object): optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'} for param, param_name in optional_ints.items(): if param is not None: - if not isinstance(param, int): + if not isinstance(param, (int, long)): raise RedisError("XCLAIM {} must be an integer" .format(param_name)) pieces.append(str(param)) -- cgit v1.2.1 From 44942c7c5254bfaf2304299e82a49ef548bd31cb Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 09:51:55 +0200 Subject: pieces.extend(pair) is slightly more efficient. --- redis/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 842f5e9..78b8302 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1773,8 +1773,7 @@ class StrictRedis(object): if not isinstance(fields, dict) or len(fields) == 0: raise RedisError('XADD fields must be a non-empty dict') for pair in iteritems(fields): - pieces.append(pair[0]) - pieces.append(pair[1]) + pieces.extend(pair) return self.execute_command('XADD', _name, *pieces) def xrange(self, name, start='-', finish='+', count=None): -- cgit v1.2.1 From 8d006b6f1b29424cdae72c1c0d49079de45d2962 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 10:05:53 +0200 Subject: XCLAIM renaming param to param_value, and adding param_name to pieces --- redis/client.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/redis/client.py b/redis/client.py index 78b8302..faa57dd 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2088,17 +2088,17 @@ class StrictRedis(object): pieces.extend(list(message_ids)) optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'} - for param, param_name in optional_ints.items(): - if param is not None: - if not isinstance(param, (int, long)): + for param_value, param_name in optional_ints.items(): + if param_value is not None: + if not isinstance(param_value, (int, long)): raise RedisError("XCLAIM {} must be an integer" .format(param_name)) - pieces.append(str(param)) + pieces.extend((param_name, str(param_value))) optional_bools = {force: 'force', justid: 'justid'} - for param, param_name in optional_bools.items(): - if param: - if not isinstance(param, bool): + for param_value, param_name in optional_bools.items(): + if param_value: + if not isinstance(param_value, bool): raise RedisError("XCLAIM {} must be a boolean" .format(param_name)) pieces.append(param_name.upper()) -- cgit v1.2.1 From 492adbcf1183322edcf9a834ee3636622a109025 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 10:07:55 +0200 Subject: XADD removing unnecessary stream_key parse function --- redis/client.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/redis/client.py b/redis/client.py index faa57dd..b982979 100755 --- a/redis/client.py +++ b/redis/client.py @@ -232,10 +232,6 @@ def int_or_none(response): return int(response) -def stream_key(response): - return response - - def stream_list(response): if response is None: return None @@ -424,7 +420,6 @@ class StrictRedis(object): 'GEOADD XLEN', int ), - string_keys_to_dict('XADD', stream_key), string_keys_to_dict('XREVRANGE XRANGE', stream_list), string_keys_to_dict('XREAD XREADGROUP', multi_stream_list), { -- cgit v1.2.1 From d2d3b2243d3516c71c7feef4bf466352d07ead41 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 10:22:19 +0200 Subject: stream_list should reuse pairs_to_dict --- redis/client.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/redis/client.py b/redis/client.py index b982979..908a63e 100755 --- a/redis/client.py +++ b/redis/client.py @@ -235,15 +235,7 @@ def int_or_none(response): def stream_list(response): if response is None: return None - result = [] - for r in response: - kv_pairs = r[1] - kv_dict = dict() - while len(kv_pairs) > 1: - kv_dict[kv_pairs.pop()] = kv_pairs.pop() - result.append((r[0], kv_dict)) - - return result + return [(r[0], pairs_to_dict(r[1])) for r in response] def parse_recursive_dict(response): -- cgit v1.2.1 From 4ad035dfc94582d8265572439a2f0bdf41637e79 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 10:27:27 +0200 Subject: multi_stream_list should use nativestr for compatibility --- redis/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 908a63e..c055496 100755 --- a/redis/client.py +++ b/redis/client.py @@ -271,7 +271,7 @@ def multi_stream_list(response): return None result = dict() for r in response: - result[r[0].decode('utf-8')] = stream_list(r[1]) + result[nativestr(r[0])] = stream_list(r[1]) return result -- cgit v1.2.1 From 3ba1d14bc74f9a5045d714ce67422ef05b59b724 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 11:52:01 +0200 Subject: allow list based iterating on XREADGROUP results --- redis/client.py | 9 +++------ tests/test_commands.py | 7 ++++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/redis/client.py b/redis/client.py index c055496..13c01d0 100755 --- a/redis/client.py +++ b/redis/client.py @@ -266,13 +266,10 @@ def parse_xclaim(response): return stream_list(response) -def multi_stream_list(response): +def parse_xreadgroup(response): if response is None: return None - result = dict() - for r in response: - result[nativestr(r[0])] = stream_list(r[1]) - return result + return [[nativestr(r[0]), stream_list(r[1])] for r in response] def float_or_none(response): @@ -413,7 +410,7 @@ class StrictRedis(object): int ), string_keys_to_dict('XREVRANGE XRANGE', stream_list), - string_keys_to_dict('XREAD XREADGROUP', multi_stream_list), + string_keys_to_dict('XREAD XREADGROUP', parse_xreadgroup), { 'XGROUP CREATE': bool_ok, 'XGROUP DESTROY': int, diff --git a/tests/test_commands.py b/tests/test_commands.py index 70dcbfa..929b4bd 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1692,9 +1692,10 @@ class TestStrictCommands(object): consumer_name = 'captain_jack_sparrow' - assert sr.xreadgroup(groupname=group_name, consumername=consumer_name, - **{stream_name: '0'}) == { - stream_name: [(b(stamp1), b_message)]} + expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] + assert sr.xreadgroup(groupname=group_name, + consumername=consumer_name, + **{stream_name: '0'}) == expected_value assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 sr.xgroup_delconsumer(stream_name, group_name, consumer_name) -- cgit v1.2.1 From 4ef348569a985c6e030a0995a0c78ed7e18fe264 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Sun, 28 Oct 2018 12:07:33 +0200 Subject: unit test for xclaim --- tests/test_commands.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_commands.py b/tests/test_commands.py index 929b4bd..d5fae9e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1783,6 +1783,21 @@ class TestStrictCommands(object): assert sr.xtrim(stream_name, 234) == 0 assert sr.xtrim(stream_name, 234, approximate=False) == 66 + @skip_if_server_version_lt('5.0.0') + def test_strict_xclaim(self, sr): + stream_name = 'xclaim_test_stream' + group_name = 'xclaim_test_consumer_group' + sr.delete(stream_name) + + stamp = sr.xadd(stream_name, {"john": "wick"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, 'action_movie_consumer', + xclaim_test_stream=0) + assert sr.xinfo_consumers(stream_name, group_name)[0][ + b('name')] == b('action_movie_consumer') + assert sr.xclaim(stream_name, group_name, 'reeves_fan', + min_idle_time=0, message_ids=(stamp,))[0][0] == stamp + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From 1afa815ae629795ef78de06baafa5f5ec2994264 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 29 Oct 2018 10:05:07 +0200 Subject: streams commands wrap all string literals in Token.get_token --- redis/client.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/redis/client.py b/redis/client.py index 13c01d0..a47f16f 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1749,9 +1749,9 @@ class StrictRedis(object): if maxlen is not None: if not isinstance(maxlen, (int, long)) or maxlen < 1: raise RedisError('XADD maxlen must be a positive integer') - pieces.append('MAXLEN') + pieces.append(Token.get_token('MAXLEN')) if approximate: - pieces.append('~') + pieces.append(Token.get_token('~')) pieces.append(str(maxlen)) pieces.append(id) if not isinstance(fields, dict) or len(fields) == 0: @@ -1775,7 +1775,7 @@ class StrictRedis(object): if count is not None: if not isinstance(count, (int, long)) or count < 1: raise RedisError('XRANGE count must be a positive integer') - pieces.append('COUNT') + pieces.append(Token.get_token('COUNT')) pieces.append(str(count)) return self.execute_command('XRANGE', name, *pieces) @@ -1795,7 +1795,7 @@ class StrictRedis(object): if count is not None: if not isinstance(count, (int, long)) or count < 1: raise RedisError('XREVRANGE count must be a positive integer') - pieces.append('COUNT') + pieces.append(Token.get_token('COUNT')) pieces.append(str(count)) return self.execute_command('XREVRANGE', name, *pieces) @@ -1819,12 +1819,12 @@ class StrictRedis(object): if block is not None: if not isinstance(block, (int, long)) or block < 0: raise RedisError('XREAD block must be a non-negative integer') - pieces.append('BLOCK') + pieces.append(Token.get_token('BLOCK')) pieces.append(str(block)) if count is not None: if not isinstance(count, (int, long)) or count < 1: raise RedisError('XREAD count must be a positive integer') - pieces.append('COUNT') + pieces.append(Token.get_token('COUNT')) pieces.append(str(count)) pieces.append('STREAMS') @@ -1920,9 +1920,9 @@ class StrictRedis(object): maxlen: truncate old stream messages beyond this size approximate: actual stream length may be slightly more than maxlen """ - pieces = ['MAXLEN'] + pieces = [Token.get_token('MAXLEN')] if approximate: - pieces.append('~') + pieces.append(Token.get_token('~')) pieces.append(maxlen) return self.execute_command('XTRIM', name, *pieces) @@ -1972,7 +1972,7 @@ class StrictRedis(object): maxlen: truncate old stream messages beyond this size approximate: actual stream length may be slightly more than maxlen """ - pieces = ['MAXLEN'] + pieces = [Token.get_token('MAXLEN')] if approximate: pieces.append('~') pieces.append(maxlen) @@ -1992,19 +1992,19 @@ class StrictRedis(object): """ if streams is None: streams = {} - pieces = ['GROUP', groupname, consumername] + pieces = [Token.get_token('GROUP'), groupname, consumername] if count is not None: if not isinstance(count, (int, long)) or count < 1: raise RedisError("XREADGROUP count must be a positive integer") - pieces.append("COUNT") + pieces.append(Token.get_token("COUNT")) pieces.append(str(count)) if block is not None: if not isinstance(block, (int, long)) or block < 0: raise RedisError("XREADGROUP block must be a non-negative " "integer") - pieces.append("BLOCK") + pieces.append(Token.get_token("BLOCK")) pieces.append(str(block)) - pieces.append("STREAMS") + pieces.append(Token.get_token("STREAMS")) ids = [] for partial_stream in iteritems(streams): pieces.append(partial_stream[0]) @@ -2034,7 +2034,7 @@ class StrictRedis(object): " it must be provided with start, end and" " count parameters") pieces.append(consumername) - return self.execute_command('XPENDING', *pieces) + return self.execute_command('XPENDING', *pieces, parse_detail=True) def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, idle=None, time=None, retrycount=None, force=False, @@ -2072,6 +2072,8 @@ class StrictRedis(object): pieces.extend(list(message_ids)) optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'} + optional_ints = {k: Token.get_token(v) for k, v in + optional_ints.items()} for param_value, param_name in optional_ints.items(): if param_value is not None: if not isinstance(param_value, (int, long)): @@ -2080,6 +2082,8 @@ class StrictRedis(object): pieces.extend((param_name, str(param_value))) optional_bools = {force: 'force', justid: 'justid'} + optional_bools = {k: Token.get_token(v) for k, v in + optional_bools.items()} for param_value, param_name in optional_bools.items(): if param_value: if not isinstance(param_value, bool): -- cgit v1.2.1 From f11c1a16685d79fc26ef701c29e2299327cc7ec3 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 29 Oct 2018 10:36:46 +0200 Subject: XREAD and XREADGROUP should take streams as a required dict arg rather than kwargs --- redis/client.py | 40 +++++++++++++++++----------------------- tests/test_commands.py | 8 ++++---- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/redis/client.py b/redis/client.py index a47f16f..55dcc75 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1806,14 +1806,14 @@ class StrictRedis(object): """ return self.execute_command('XLEN', name) - def xread(self, count=None, block=None, **streams): + def xread(self, streams, count=None, block=None): """ Block and monitor multiple streams for new data. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. count: if set, only return this many items, beginning with the earliest available. block: number of milliseconds to wait, if nothing already present. - **streams: a mapping of stream names to stream IDs, where - IDs indicate the last ID already seen. """ pieces = [] if block is not None: @@ -1826,14 +1826,11 @@ class StrictRedis(object): raise RedisError('XREAD count must be a positive integer') pieces.append(Token.get_token('COUNT')) pieces.append(str(count)) - - pieces.append('STREAMS') - ids = [] - for partial_stream in iteritems(streams): - pieces.append(partial_stream[0]) - ids.append(partial_stream[1]) - - pieces.extend(ids) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREAD streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) return self.execute_command('XREAD', *pieces) def xgroup_create(self, name, groupname, id): @@ -1978,20 +1975,18 @@ class StrictRedis(object): pieces.append(maxlen) return self.execute_command('XTRIM', name, *pieces) - def xreadgroup(self, groupname, consumername, count=None, block=None, - **streams): + def xreadgroup(self, groupname, consumername, streams, count=None, + block=None): """ Read from a stream via a consumer group. groupname: name of the consumer group. consumername: name of the requesting consumer. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. count: if set, only return this many items, beginning with the earliest available. block: number of milliseconds to wait, if nothing already present. - **streams: a mapping of stream names to stream IDs, where - IDs indicate the last ID already seen. """ - if streams is None: - streams = {} pieces = [Token.get_token('GROUP'), groupname, consumername] if count is not None: if not isinstance(count, (int, long)) or count < 1: @@ -2004,12 +1999,11 @@ class StrictRedis(object): "integer") pieces.append(Token.get_token("BLOCK")) pieces.append(str(block)) - pieces.append(Token.get_token("STREAMS")) - ids = [] - for partial_stream in iteritems(streams): - pieces.append(partial_stream[0]) - ids.append(partial_stream[1]) - pieces.extend(ids) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREADGROUP streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) return self.execute_command('XREADGROUP', *pieces) def xpending(self, name, groupname, start=None, end=None, count=None, diff --git a/tests/test_commands.py b/tests/test_commands.py index d5fae9e..7ecc9f4 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1659,10 +1659,10 @@ class TestStrictCommands(object): stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) assert stamp1 != stamp2 - results = sr.xread(varname='$', count=10, block=10) + results = sr.xread(streams={varname: '$'}, count=10, block=10) assert results is None - results = sr.xread(count=3, block=0, **{varname: stamp1}) + results = sr.xread(count=3, block=0, streams={varname: stamp1}) assert results[0][1][0][0] == stamp2 @skip_if_server_version_lt('5.0.0') @@ -1695,7 +1695,7 @@ class TestStrictCommands(object): expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] assert sr.xreadgroup(groupname=group_name, consumername=consumer_name, - **{stream_name: '0'}) == expected_value + streams={stream_name: '0'}) == expected_value assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 sr.xgroup_delconsumer(stream_name, group_name, consumer_name) @@ -1792,7 +1792,7 @@ class TestStrictCommands(object): stamp = sr.xadd(stream_name, {"john": "wick"}) sr.xgroup_create(stream_name, group_name, id='0') sr.xreadgroup(group_name, 'action_movie_consumer', - xclaim_test_stream=0) + streams={stream_name: 0}) assert sr.xinfo_consumers(stream_name, group_name)[0][ b('name')] == b('action_movie_consumer') assert sr.xclaim(stream_name, group_name, 'reeves_fan', -- cgit v1.2.1 From b24d007716f77627ee0b0999eb5b997baf976fc1 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 29 Oct 2018 10:41:54 +0200 Subject: string interpolation for the error messages needs to include the positional index --- redis/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 55dcc75..4751f9d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2071,7 +2071,7 @@ class StrictRedis(object): for param_value, param_name in optional_ints.items(): if param_value is not None: if not isinstance(param_value, (int, long)): - raise RedisError("XCLAIM {} must be an integer" + raise RedisError("XCLAIM {0} must be an integer" .format(param_name)) pieces.extend((param_name, str(param_value))) @@ -2081,7 +2081,7 @@ class StrictRedis(object): for param_value, param_name in optional_bools.items(): if param_value: if not isinstance(param_value, bool): - raise RedisError("XCLAIM {} must be a boolean" + raise RedisError("XCLAIM {0} must be a boolean" .format(param_name)) pieces.append(param_name.upper()) return self.execute_command('XCLAIM', *pieces) -- cgit v1.2.1 From bb30ec5aae98c9649008303eacdcc64034bd506c Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 29 Oct 2018 10:50:50 +0200 Subject: remove code duplication --- redis/client.py | 52 -------------------------------------------------- tests/test_commands.py | 30 ----------------------------- 2 files changed, 82 deletions(-) diff --git a/redis/client.py b/redis/client.py index 4751f9d..2b39c2d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1923,58 +1923,6 @@ class StrictRedis(object): pieces.append(maxlen) return self.execute_command('XTRIM', name, *pieces) - def xinfo_stream(self, name): - """ - Returns general information about the stream. - name: name of the stream. - """ - return self.execute_command('XINFO STREAM', name) - - def xinfo_consumers(self, name, groupname): - """ - Returns general information about the consumers in the group. - name: name of the stream. - groupname: name of the consumer group. - """ - return self.execute_command('XINFO CONSUMERS', name, groupname) - - def xinfo_groups(self, name): - """ - Returns general information about the consumer groups of the stream. - name: name of the stream. - """ - return self.execute_command('XINFO GROUPS', name) - - def xack(self, name, groupname, *ids): - """ - Acknowledges the successful processing of one or more messages. - name: name of the stream. - groupname: name of the consumer group. - *ids: message ids to acknowlege. - """ - return self.execute_command('XACK', name, groupname, *ids) - - def xdel(self, name, *ids): - """ - Deletes one or more messages from a stream. - name: name of the stream. - *ids: message ids to delete. - """ - return self.execute_command('XDEL', name, *ids) - - def xtrim(self, name, maxlen, approximate=True): - """ - Trims old messages from a stream. - name: name of the stream. - maxlen: truncate old stream messages beyond this size - approximate: actual stream length may be slightly more than maxlen - """ - pieces = [Token.get_token('MAXLEN')] - if approximate: - pieces.append('~') - pieces.append(maxlen) - return self.execute_command('XTRIM', name, *pieces) - def xreadgroup(self, groupname, consumername, streams, count=None, block=None): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index 7ecc9f4..a79d6a3 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1753,36 +1753,6 @@ class TestStrictCommands(object): assert sr.xack(stream_name, group_name, '1-1') == 0 assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 - @skip_if_server_version_lt('5.0.0') - def test_strict_xdel(self, sr): - stream_name = 'xdel_test_stream' - sr.delete(stream_name) - - assert sr.xdel(stream_name, 1) == 0 - - sr.xadd(stream_name, {"foo": "bar"}, id=1) - assert sr.xdel(stream_name, 1) == 1 - - stamp = sr.xadd(stream_name, {"baz": "qaz"}) - assert sr.xdel(stream_name, 1, stamp) == 1 - assert sr.xdel(stream_name, 1, stamp, 42) == 0 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xtrim(self, sr): - stream_name = 'xtrim_test_stream' - sr.delete(stream_name) - - assert sr.xtrim(stream_name, 1000) == 0 - - for i in range(300): - sr.xadd(stream_name, {"index": i}) - - assert sr.xtrim(stream_name, 1000, approximate=False) == 0 - assert sr.xtrim(stream_name, 300) == 0 - assert sr.xtrim(stream_name, 299) == 0 - assert sr.xtrim(stream_name, 234) == 0 - assert sr.xtrim(stream_name, 234, approximate=False) == 66 - @skip_if_server_version_lt('5.0.0') def test_strict_xclaim(self, sr): stream_name = 'xclaim_test_stream' -- cgit v1.2.1 From 1dfda00588bc02ea37b76e4465111c91dd79635e Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 29 Oct 2018 11:32:34 +0200 Subject: XCLAIM simplify argument checks --- redis/client.py | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/redis/client.py b/redis/client.py index 2b39c2d..762b3a5 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2013,25 +2013,28 @@ class StrictRedis(object): pieces = [name, groupname, consumername, str(min_idle_time)] pieces.extend(list(message_ids)) - optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'} - optional_ints = {k: Token.get_token(v) for k, v in - optional_ints.items()} - for param_value, param_name in optional_ints.items(): - if param_value is not None: - if not isinstance(param_value, (int, long)): - raise RedisError("XCLAIM {0} must be an integer" - .format(param_name)) - pieces.extend((param_name, str(param_value))) - - optional_bools = {force: 'force', justid: 'justid'} - optional_bools = {k: Token.get_token(v) for k, v in - optional_bools.items()} - for param_value, param_name in optional_bools.items(): - if param_value: - if not isinstance(param_value, bool): - raise RedisError("XCLAIM {0} must be a boolean" - .format(param_name)) - pieces.append(param_name.upper()) + if idle is not None: + if not isinstance(idle, (int, long)): + raise RedisError("XCLAIM idle must be an integer") + pieces.extend((Token.get_token('IDLE'), str(idle))) + if time is not None: + if not isinstance(time, (int, long)): + raise RedisError("XCLAIM time must be an integer") + pieces.extend((Token.get_token('TIME'), str(time))) + if retrycount is not None: + if not isinstance(retrycount, (int, long)): + raise RedisError("XCLAIM retrycount must be an integer") + pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount))) + + if force: + if not isinstance(force, bool): + raise RedisError("XCLAIM force must be a boolean") + pieces.append(Token.get_token('FORCE')) + if justid: + if not isinstance(justid, bool): + raise RedisError("XCLAIM justid must be a boolean") + pieces.append(Token.get_token('JUSTID')) + return self.execute_command('XCLAIM', *pieces) # SORTED SET COMMANDS -- cgit v1.2.1 From e76b8e27fa7eabf1b547724d404fb9e4d621ed7e Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 29 Oct 2018 12:01:54 +0200 Subject: fix XCLAIM to handle justid parameter correctly --- redis/client.py | 4 ++-- tests/test_commands.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 762b3a5..dddaaf5 100755 --- a/redis/client.py +++ b/redis/client.py @@ -261,7 +261,7 @@ def parse_list_of_recursive_dicts(response): def parse_xclaim(response): - if isinstance(response, str): + if all(isinstance(r, (basestring, bytes)) for r in response): return response return stream_list(response) @@ -2034,7 +2034,7 @@ class StrictRedis(object): if not isinstance(justid, bool): raise RedisError("XCLAIM justid must be a boolean") pieces.append(Token.get_token('JUSTID')) - + print(pieces) return self.execute_command('XCLAIM', *pieces) # SORTED SET COMMANDS diff --git a/tests/test_commands.py b/tests/test_commands.py index a79d6a3..bacafd0 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1767,6 +1767,9 @@ class TestStrictCommands(object): b('name')] == b('action_movie_consumer') assert sr.xclaim(stream_name, group_name, 'reeves_fan', min_idle_time=0, message_ids=(stamp,))[0][0] == stamp + assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', + min_idle_time=0, message_ids=(stamp,), + justid=True) == [b(stamp), ] def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) -- cgit v1.2.1 From b7cd888ea1e93d06ea58794d18ac509474360b06 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Mon, 29 Oct 2018 15:27:13 +0200 Subject: XPENDING parse response and unit test --- redis/client.py | 56 ++++++++++++++++++++++++++++++++++++++++++++------ tests/test_commands.py | 21 +++++++++++++++++++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/redis/client.py b/redis/client.py index dddaaf5..7100795 100755 --- a/redis/client.py +++ b/redis/client.py @@ -272,6 +272,36 @@ def parse_xreadgroup(response): return [[nativestr(r[0]), stream_list(r[1])] for r in response] +def parse_xpending(response, **options): + if isinstance(response, list): + if options.get('parse_detail', False): + return parse_range_xpending(response) + consumers = [] + for consumer_name, consumer_pending in response[3]: + consumers.append({ + 'name': consumer_name, + 'pending': consumer_pending + }) + return { + 'pending': response[0], + 'lower': response[1], + 'upper': response[2], + 'consumers': consumers + } + + +def parse_range_xpending(response): + result = [] + for message in response: + result.append({ + 'message_id': message[0], + 'consumer': message[1], + 'time_since_delivered': message[2], + 'times_delivered': message[3] + }) + return result + + def float_or_none(response): if response is None: return None @@ -410,6 +440,7 @@ class StrictRedis(object): int ), string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XPENDING', parse_xpending), string_keys_to_dict('XREAD XREADGROUP', parse_xreadgroup), { 'XGROUP CREATE': bool_ok, @@ -1954,12 +1985,26 @@ class StrictRedis(object): pieces.extend(streams.values()) return self.execute_command('XREADGROUP', *pieces) - def xpending(self, name, groupname, start=None, end=None, count=None, - consumername=None): + def xpending(self, name, groupname): """ - Returns information about pending messages. + Returns information about pending messages of a group. name: name of the stream. groupname: name of the consumer group. + """ + return self.execute_command('XPENDING', name, groupname) + + def xpending_range(self, name, groupname, start='-', end='+', count=-1, + consumername=None): + """ + Returns information about pending messages, in a range. + name: name of the stream. + groupname: name of the consumer group. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. consumername: name of a consumer to filter by (optional). """ pieces = [name, groupname] @@ -1967,8 +2012,8 @@ class StrictRedis(object): if start is None or end is None or count is None: raise RedisError("XPENDING must be provided with start, end " "and count parameters, or none of them. ") - if not isinstance(count, (int, long)) or count < 1: - raise RedisError("XPENDING count must be a positive integer") + if not isinstance(count, (int, long)) or count < -1: + raise RedisError("XPENDING count must be a integer >= -1") pieces.extend((start, end, str(count))) if consumername is not None: if start is None or end is None or count is None: @@ -2034,7 +2079,6 @@ class StrictRedis(object): if not isinstance(justid, bool): raise RedisError("XCLAIM justid must be a boolean") pieces.append(Token.get_token('JUSTID')) - print(pieces) return self.execute_command('XCLAIM', *pieces) # SORTED SET COMMANDS diff --git a/tests/test_commands.py b/tests/test_commands.py index bacafd0..4909410 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1771,6 +1771,27 @@ class TestStrictCommands(object): min_idle_time=0, message_ids=(stamp,), justid=True) == [b(stamp), ] + @skip_if_server_version_lt('5.0.0') + def test_strict_xpending(self, sr): + stream_name = 'xpending_test_stream' + group_name = 'xpending_test_consumer_group' + consumer_name = 'marie' + sr.delete(stream_name) + + sr.xadd(stream_name, {"foo": "bar"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, consumer_name, + streams={stream_name: 0}) + response = sr.xpending(stream_name, group_name) + assert sorted(response.keys()) == ['consumers', 'lower', 'pending', + 'upper'] + + response = sr.xpending_range(stream_name, group_name, + consumername=consumer_name) + assert sorted(response[0].keys()) == ['consumer', 'message_id', + 'time_since_delivered', + 'times_delivered'] + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From a32a8e630c25a2a2e8b637ac7af80ba7df048f23 Mon Sep 17 00:00:00 2001 From: Roey Prat Date: Wed, 31 Oct 2018 09:40:40 +0200 Subject: XREAD and XREADGROUP return empty lists when the server returns no messages --- redis/client.py | 6 +++--- tests/test_commands.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/redis/client.py b/redis/client.py index 7100795..6108087 100755 --- a/redis/client.py +++ b/redis/client.py @@ -266,9 +266,9 @@ def parse_xclaim(response): return stream_list(response) -def parse_xreadgroup(response): +def parse_xread(response): if response is None: - return None + return [] return [[nativestr(r[0]), stream_list(r[1])] for r in response] @@ -441,7 +441,7 @@ class StrictRedis(object): ), string_keys_to_dict('XREVRANGE XRANGE', stream_list), string_keys_to_dict('XPENDING', parse_xpending), - string_keys_to_dict('XREAD XREADGROUP', parse_xreadgroup), + string_keys_to_dict('XREAD XREADGROUP', parse_xread), { 'XGROUP CREATE': bool_ok, 'XGROUP DESTROY': int, diff --git a/tests/test_commands.py b/tests/test_commands.py index 4909410..9635732 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1660,7 +1660,7 @@ class TestStrictCommands(object): assert stamp1 != stamp2 results = sr.xread(streams={varname: '$'}, count=10, block=10) - assert results is None + assert results == [] results = sr.xread(count=3, block=0, streams={varname: stamp1}) assert results[0][1][0][0] == stamp2 -- cgit v1.2.1 From 7b618768190432c3a7ed15206c6e9cbb08f3e18f Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 08:17:50 -0700 Subject: alphabetize the callback list --- redis/client.py | 78 ++++++++++++++++++++++++++------------------------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/redis/client.py b/redis/client.py index 6108087..af38105 100755 --- a/redis/client.py +++ b/redis/client.py @@ -436,26 +436,7 @@ class StrictRedis(object): 'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE ' 'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD ' 'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ' - 'GEOADD XLEN', - int - ), - string_keys_to_dict('XREVRANGE XRANGE', stream_list), - string_keys_to_dict('XPENDING', parse_xpending), - string_keys_to_dict('XREAD XREADGROUP', parse_xread), - { - 'XGROUP CREATE': bool_ok, - 'XGROUP DESTROY': int, - 'XGROUP SETID': bool_ok, - 'XGROUP DELCONSUMER': int - }, - { - 'XINFO STREAM': parse_recursive_dict, - 'XINFO CONSUMERS': parse_list_of_recursive_dicts, - 'XINFO GROUPS': parse_list_of_recursive_dicts - }, - string_keys_to_dict('XCLAIM', parse_xclaim), - string_keys_to_dict( - 'XACK XDEL XTRIM', + 'GEOADD XACK XDEL XLEN XTRIM', int ), string_keys_to_dict( @@ -484,22 +465,47 @@ class StrictRedis(object): zset_score_pairs ), string_keys_to_dict('ZRANK ZREVRANK', int_or_none), + string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XREAD XREADGROUP', parse_xread), string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True), { 'CLIENT GETNAME': lambda r: r and nativestr(r), 'CLIENT KILL': bool_ok, 'CLIENT LIST': parse_client_list, 'CLIENT SETNAME': bool_ok, + 'CLUSTER ADDSLOTS': bool_ok, + 'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x), + 'CLUSTER COUNTKEYSINSLOT': lambda x: int(x), + 'CLUSTER DELSLOTS': bool_ok, + 'CLUSTER FAILOVER': bool_ok, + 'CLUSTER FORGET': bool_ok, + 'CLUSTER INFO': parse_cluster_info, + 'CLUSTER KEYSLOT': lambda x: int(x), + 'CLUSTER MEET': bool_ok, + 'CLUSTER NODES': parse_cluster_nodes, + 'CLUSTER REPLICATE': bool_ok, + 'CLUSTER RESET': bool_ok, + 'CLUSTER SAVECONFIG': bool_ok, + 'CLUSTER SET-CONFIG-EPOCH': bool_ok, + 'CLUSTER SETSLOT': bool_ok, + 'CLUSTER SLAVES': parse_cluster_nodes, 'CONFIG GET': parse_config_get, 'CONFIG RESETSTAT': bool_ok, 'CONFIG SET': bool_ok, 'DEBUG OBJECT': parse_debug_object, + 'GEOHASH': lambda r: list(map(nativestr, r)), + 'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]), + float(ll[1])) + if ll is not None else None, r)), + 'GEORADIUS': parse_georadius_generic, + 'GEORADIUSBYMEMBER': parse_georadius_generic, 'HGETALL': lambda r: r and pairs_to_dict(r) or {}, 'HSCAN': parse_hscan, 'INFO': parse_info, 'LASTSAVE': timestamp_to_datetime, 'OBJECT': parse_object, 'PING': lambda r: nativestr(r) == 'PONG', + 'PUBSUB NUMSUB': parse_pubsub_numsub, 'RANDOMKEY': lambda r: r and r or None, 'SCAN': parse_scan, 'SCRIPT EXISTS': lambda r: list(imap(bool, r)), @@ -520,30 +526,16 @@ class StrictRedis(object): 'SLOWLOG RESET': bool_ok, 'SSCAN': parse_scan, 'TIME': lambda x: (int(x[0]), int(x[1])), + 'XCLAIM': parse_xclaim, + 'XGROUP CREATE': bool_ok, + 'XGROUP DELCONSUMER': int, + 'XGROUP DESTROY': int, + 'XGROUP SETID': bool_ok, + 'XINFO CONSUMERS': parse_list_of_recursive_dicts, + 'XINFO GROUPS': parse_list_of_recursive_dicts, + 'XINFO STREAM': parse_recursive_dict, + 'XPENDING': parse_xpending, 'ZSCAN': parse_zscan, - 'CLUSTER ADDSLOTS': bool_ok, - 'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x), - 'CLUSTER COUNTKEYSINSLOT': lambda x: int(x), - 'CLUSTER DELSLOTS': bool_ok, - 'CLUSTER FAILOVER': bool_ok, - 'CLUSTER FORGET': bool_ok, - 'CLUSTER INFO': parse_cluster_info, - 'CLUSTER KEYSLOT': lambda x: int(x), - 'CLUSTER MEET': bool_ok, - 'CLUSTER NODES': parse_cluster_nodes, - 'CLUSTER REPLICATE': bool_ok, - 'CLUSTER RESET': bool_ok, - 'CLUSTER SAVECONFIG': bool_ok, - 'CLUSTER SET-CONFIG-EPOCH': bool_ok, - 'CLUSTER SETSLOT': bool_ok, - 'CLUSTER SLAVES': parse_cluster_nodes, - 'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]), - float(ll[1])) - if ll is not None else None, r)), - 'GEOHASH': lambda r: list(map(nativestr, r)), - 'GEORADIUS': parse_georadius_generic, - 'GEORADIUSBYMEMBER': parse_georadius_generic, - 'PUBSUB NUMSUB': parse_pubsub_numsub, } ) -- cgit v1.2.1 From 7295ee0f29253ed19be01a69e6d1efc225242a1e Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 08:18:48 -0700 Subject: rename _name -> name --- redis/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index af38105..291a428 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1757,7 +1757,7 @@ class StrictRedis(object): return self.execute_command('SUNIONSTORE', dest, *args) # STREAMS COMMANDS - def xadd(self, _name, fields, id='*', maxlen=None, approximate=True): + def xadd(self, name, fields, id='*', maxlen=None, approximate=True): """ Add to a stream. _name: name of the stream (not using 'name' as this would @@ -1781,7 +1781,7 @@ class StrictRedis(object): raise RedisError('XADD fields must be a non-empty dict') for pair in iteritems(fields): pieces.extend(pair) - return self.execute_command('XADD', _name, *pieces) + return self.execute_command('XADD', name, *pieces) def xrange(self, name, start='-', finish='+', count=None): """ -- cgit v1.2.1 From 0bb9ab9f57f708b5c32c8e70bdbf19f92fc3e49f Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 08:24:52 -0700 Subject: alphabetize stream functions --- redis/client.py | 360 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 180 insertions(+), 180 deletions(-) diff --git a/redis/client.py b/redis/client.py index 291a428..c5e769d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1783,78 +1783,80 @@ class StrictRedis(object): pieces.extend(pair) return self.execute_command('XADD', name, *pieces) - def xrange(self, name, start='-', finish='+', count=None): + def xack(self, name, groupname, *ids): """ - Read stream values within an interval. + Acknowledges the successful processing of one or more messages. name: name of the stream. - start: first stream ID. defaults to '-', - meaning the earliest available. - finish: last stream ID. defaults to '+', - meaning the latest available. - count: if set, only return this many items, beginning with the - earliest available. + groupname: name of the consumer group. + *ids: message ids to acknowlege. """ - pieces = [start, finish] - if count is not None: - if not isinstance(count, (int, long)) or count < 1: - raise RedisError('XRANGE count must be a positive integer') - pieces.append(Token.get_token('COUNT')) - pieces.append(str(count)) - - return self.execute_command('XRANGE', name, *pieces) + return self.execute_command('XACK', name, groupname, *ids) - def xrevrange(self, name, start='+', finish='-', count=None): + def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, + idle=None, time=None, retrycount=None, force=False, + justid=False): """ - Read stream values within an interval, in reverse order. - name: name of the stream - start: first stream ID. defaults to '+', - meaning the latest available. - finish: last stream ID. defaults to '-', - meaning the earliest available. - count: if set, only return this many items, beginning with the - latest available. + Changes the ownership of a pending message. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of a consumer that claims the message. + min_idle_time: filter messages that were idle less than this amount of + milliseconds + message_ids: non-empty list or tuple of message IDs to claim + idle: optional. Set the idle time (last time it was delivered) of the + message in ms + time: optional integer. This is the same as idle but instead of a + relative amount of milliseconds, it sets the idle time to a specific + Unix time (in milliseconds). + retrycount: optional integer. set the retry counter to the specified + value. This counter is incremented every time a message is delivered + again. + force: optional boolean, false by default. Creates the pending message + entry in the PEL even if certain specified IDs are not already in the + PEL assigned to a different client. + justid: optional boolean, false by default. Return just an array of IDs + of messages successfully claimed, without returning the actual message """ - pieces = [start, finish] - if count is not None: - if not isinstance(count, (int, long)) or count < 1: - raise RedisError('XREVRANGE count must be a positive integer') - pieces.append(Token.get_token('COUNT')) - pieces.append(str(count)) + if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0: + raise RedisError("XCLAIM min_idle_time must be a non negative " + "integer") + if not isinstance(message_ids, (list, tuple)) or not message_ids: + raise RedisError("XCLAIM message_ids must be a non empty list or " + "tuple of message IDs to claim") - return self.execute_command('XREVRANGE', name, *pieces) + pieces = [name, groupname, consumername, str(min_idle_time)] + pieces.extend(list(message_ids)) - def xlen(self, name): - """ - Returns the number of elements in a given stream. - """ - return self.execute_command('XLEN', name) + if idle is not None: + if not isinstance(idle, (int, long)): + raise RedisError("XCLAIM idle must be an integer") + pieces.extend((Token.get_token('IDLE'), str(idle))) + if time is not None: + if not isinstance(time, (int, long)): + raise RedisError("XCLAIM time must be an integer") + pieces.extend((Token.get_token('TIME'), str(time))) + if retrycount is not None: + if not isinstance(retrycount, (int, long)): + raise RedisError("XCLAIM retrycount must be an integer") + pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount))) - def xread(self, streams, count=None, block=None): + if force: + if not isinstance(force, bool): + raise RedisError("XCLAIM force must be a boolean") + pieces.append(Token.get_token('FORCE')) + if justid: + if not isinstance(justid, bool): + raise RedisError("XCLAIM justid must be a boolean") + pieces.append(Token.get_token('JUSTID')) + return self.execute_command('XCLAIM', *pieces) + + def xdel(self, name, *ids): """ - Block and monitor multiple streams for new data. - streams: a dict of stream names to stream IDs, where - IDs indicate the last ID already seen. - count: if set, only return this many items, beginning with the - earliest available. - block: number of milliseconds to wait, if nothing already present. + Deletes one or more messages from a stream. + name: name of the stream. + *ids: message ids to delete. """ - pieces = [] - if block is not None: - if not isinstance(block, (int, long)) or block < 0: - raise RedisError('XREAD block must be a non-negative integer') - pieces.append(Token.get_token('BLOCK')) - pieces.append(str(block)) - if count is not None: - if not isinstance(count, (int, long)) or count < 1: - raise RedisError('XREAD count must be a positive integer') - pieces.append(Token.get_token('COUNT')) - pieces.append(str(count)) - if not isinstance(streams, dict) or len(streams) == 0: - raise RedisError('XREAD streams must be a non empty dict') - pieces.append(Token.get_token('STREAMS')) - pieces.extend(streams.keys()) - pieces.extend(streams.values()) - return self.execute_command('XREAD', *pieces) + return self.execute_command('XDEL', name, *ids) def xgroup_create(self, name, groupname, id): """ @@ -1865,6 +1867,18 @@ class StrictRedis(object): """ return self.execute_command('XGROUP CREATE', name, groupname, id) + def xgroup_delconsumer(self, name, groupname, consumername): + """ + Remove a specific consumer from a consumer group. + Returns the number of pending messages that the consumer had before it + was deleted. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to delete + """ + return self.execute_command('XGROUP DELCONSUMER', name, groupname, + consumername) + def xgroup_destroy(self, name, groupname): """ Destroy a consumer group. @@ -1882,17 +1896,20 @@ class StrictRedis(object): """ return self.execute_command('XGROUP SETID', name, groupname, id) - def xgroup_delconsumer(self, name, groupname, consumername): + def xinfo_consumers(self, name, groupname): """ - Remove a specific consumer from a consumer group. - Returns the number of pending messages that the consumer had before it - was deleted. + Returns general information about the consumers in the group. name: name of the stream. groupname: name of the consumer group. - consumername: name of consumer to delete """ - return self.execute_command('XGROUP DELCONSUMER', name, groupname, - consumername) + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) def xinfo_stream(self, name): """ @@ -1901,37 +1918,116 @@ class StrictRedis(object): """ return self.execute_command('XINFO STREAM', name) - def xinfo_consumers(self, name, groupname): + def xlen(self, name): """ - Returns general information about the consumers in the group. - name: name of the stream. - groupname: name of the consumer group. + Returns the number of elements in a given stream. """ - return self.execute_command('XINFO CONSUMERS', name, groupname) + return self.execute_command('XLEN', name) - def xinfo_groups(self, name): + def xpending(self, name, groupname): """ - Returns general information about the consumer groups of the stream. + Returns information about pending messages of a group. name: name of the stream. + groupname: name of the consumer group. """ - return self.execute_command('XINFO GROUPS', name) + return self.execute_command('XPENDING', name, groupname) - def xack(self, name, groupname, *ids): + def xpending_range(self, name, groupname, start='-', end='+', count=-1, + consumername=None): """ - Acknowledges the successful processing of one or more messages. + Returns information about pending messages, in a range. name: name of the stream. groupname: name of the consumer group. - *ids: message ids to acknowlege. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. + consumername: name of a consumer to filter by (optional). """ - return self.execute_command('XACK', name, groupname, *ids) + pieces = [name, groupname] + if start is not None or end is not None or count is not None: + if start is None or end is None or count is None: + raise RedisError("XPENDING must be provided with start, end " + "and count parameters, or none of them. ") + if not isinstance(count, (int, long)) or count < -1: + raise RedisError("XPENDING count must be a integer >= -1") + pieces.extend((start, end, str(count))) + if consumername is not None: + if start is None or end is None or count is None: + raise RedisError("if XPENDING is provided with consumername," + " it must be provided with start, end and" + " count parameters") + pieces.append(consumername) + return self.execute_command('XPENDING', *pieces, parse_detail=True) - def xdel(self, name, *ids): + def xrange(self, name, start='-', finish='+', count=None): """ - Deletes one or more messages from a stream. + Read stream values within an interval. name: name of the stream. - *ids: message ids to delete. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. """ - return self.execute_command('XDEL', name, *ids) + pieces = [start, finish] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XRANGE', name, *pieces) + + def xread(self, streams, count=None, block=None): + """ + Block and monitor multiple streams for new data. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + """ + pieces = [] + if block is not None: + if not isinstance(block, (int, long)) or block < 0: + raise RedisError('XREAD block must be a non-negative integer') + pieces.append(Token.get_token('BLOCK')) + pieces.append(str(block)) + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREAD count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREAD streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) + return self.execute_command('XREAD', *pieces) + + def xrevrange(self, name, start='+', finish='-', count=None): + """ + Read stream values within an interval, in reverse order. + name: name of the stream + start: first stream ID. defaults to '+', + meaning the latest available. + finish: last stream ID. defaults to '-', + meaning the earliest available. + count: if set, only return this many items, beginning with the + latest available. + """ + pieces = [start, finish] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError('XREVRANGE count must be a positive integer') + pieces.append(Token.get_token('COUNT')) + pieces.append(str(count)) + + return self.execute_command('XREVRANGE', name, *pieces) def xtrim(self, name, maxlen, approximate=True): """ @@ -1977,102 +2073,6 @@ class StrictRedis(object): pieces.extend(streams.values()) return self.execute_command('XREADGROUP', *pieces) - def xpending(self, name, groupname): - """ - Returns information about pending messages of a group. - name: name of the stream. - groupname: name of the consumer group. - """ - return self.execute_command('XPENDING', name, groupname) - - def xpending_range(self, name, groupname, start='-', end='+', count=-1, - consumername=None): - """ - Returns information about pending messages, in a range. - name: name of the stream. - groupname: name of the consumer group. - start: first stream ID. defaults to '-', - meaning the earliest available. - finish: last stream ID. defaults to '+', - meaning the latest available. - count: if set, only return this many items, beginning with the - earliest available. - consumername: name of a consumer to filter by (optional). - """ - pieces = [name, groupname] - if start is not None or end is not None or count is not None: - if start is None or end is None or count is None: - raise RedisError("XPENDING must be provided with start, end " - "and count parameters, or none of them. ") - if not isinstance(count, (int, long)) or count < -1: - raise RedisError("XPENDING count must be a integer >= -1") - pieces.extend((start, end, str(count))) - if consumername is not None: - if start is None or end is None or count is None: - raise RedisError("if XPENDING is provided with consumername," - " it must be provided with start, end and" - " count parameters") - pieces.append(consumername) - return self.execute_command('XPENDING', *pieces, parse_detail=True) - - def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, - idle=None, time=None, retrycount=None, force=False, - justid=False): - """ - Changes the ownership of a pending message. - name: name of the stream. - groupname: name of the consumer group. - consumername: name of a consumer that claims the message. - min_idle_time: filter messages that were idle less than this amount of - milliseconds - message_ids: non-empty list or tuple of message IDs to claim - idle: optional. Set the idle time (last time it was delivered) of the - message in ms - time: optional integer. This is the same as idle but instead of a - relative amount of milliseconds, it sets the idle time to a specific - Unix time (in milliseconds). - retrycount: optional integer. set the retry counter to the specified - value. This counter is incremented every time a message is delivered - again. - force: optional boolean, false by default. Creates the pending message - entry in the PEL even if certain specified IDs are not already in the - PEL assigned to a different client. - justid: optional boolean, false by default. Return just an array of IDs - of messages successfully claimed, without returning the actual message - """ - if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0: - raise RedisError("XCLAIM min_idle_time must be a non negative " - "integer") - if not isinstance(message_ids, (list, tuple)) or not message_ids: - raise RedisError("XCLAIM message_ids must be a non empty list or " - "tuple of message IDs to claim") - - pieces = [name, groupname, consumername, str(min_idle_time)] - pieces.extend(list(message_ids)) - - if idle is not None: - if not isinstance(idle, (int, long)): - raise RedisError("XCLAIM idle must be an integer") - pieces.extend((Token.get_token('IDLE'), str(idle))) - if time is not None: - if not isinstance(time, (int, long)): - raise RedisError("XCLAIM time must be an integer") - pieces.extend((Token.get_token('TIME'), str(time))) - if retrycount is not None: - if not isinstance(retrycount, (int, long)): - raise RedisError("XCLAIM retrycount must be an integer") - pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount))) - - if force: - if not isinstance(force, bool): - raise RedisError("XCLAIM force must be a boolean") - pieces.append(Token.get_token('FORCE')) - if justid: - if not isinstance(justid, bool): - raise RedisError("XCLAIM justid must be a boolean") - pieces.append(Token.get_token('JUSTID')) - return self.execute_command('XCLAIM', *pieces) - # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ -- cgit v1.2.1 From 618deba4daed183e9e9734b18bbac44523b68fd6 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 08:32:17 -0700 Subject: reorganize stream tests --- redis/client.py | 8 +- tests/test_commands.py | 217 +++++++++++++++++++++++-------------------------- 2 files changed, 107 insertions(+), 118 deletions(-) diff --git a/redis/client.py b/redis/client.py index c5e769d..1ef2ace 100755 --- a/redis/client.py +++ b/redis/client.py @@ -232,7 +232,7 @@ def int_or_none(response): return int(response) -def stream_list(response): +def parse_stream_list(response): if response is None: return None return [(r[0], pairs_to_dict(r[1])) for r in response] @@ -263,13 +263,13 @@ def parse_list_of_recursive_dicts(response): def parse_xclaim(response): if all(isinstance(r, (basestring, bytes)) for r in response): return response - return stream_list(response) + return parse_stream_list(response) def parse_xread(response): if response is None: return [] - return [[nativestr(r[0]), stream_list(r[1])] for r in response] + return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response] def parse_xpending(response, **options): @@ -465,7 +465,7 @@ class StrictRedis(object): zset_score_pairs ), string_keys_to_dict('ZRANK ZREVRANK', int_or_none), - string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XREVRANGE XRANGE', parse_stream_list), string_keys_to_dict('XREAD XREADGROUP', parse_xread), string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True), { diff --git a/tests/test_commands.py b/tests/test_commands.py index 9635732..824fc44 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1600,11 +1600,109 @@ class TestRedisCommands(object): ['place1', 0.0, 3471609698139488, (2.1909382939338684, 41.433790281840835)]] + @skip_if_server_version_lt('5.0.0') + def test_xack(self, sr): + stream_name = 'xack_test_stream' + sr.delete(stream_name) + group_name = 'xack_test_group' -class TestStrictCommands(object): + assert sr.xack(stream_name, group_name, 0) == 0 + assert sr.xack(stream_name, group_name, '1-1') == 0 + assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + + @skip_if_server_version_lt('5.0.0') + def test_xclaim(self, sr): + stream_name = 'xclaim_test_stream' + group_name = 'xclaim_test_consumer_group' + sr.delete(stream_name) + + stamp = sr.xadd(stream_name, {"john": "wick"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, 'action_movie_consumer', + streams={stream_name: 0}) + assert sr.xinfo_consumers(stream_name, group_name)[0][ + b('name')] == b('action_movie_consumer') + assert sr.xclaim(stream_name, group_name, 'reeves_fan', + min_idle_time=0, message_ids=(stamp,))[0][0] == stamp + assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', + min_idle_time=0, message_ids=(stamp,), + justid=True) == [b(stamp), ] + + @skip_if_server_version_lt('5.0.0') + def test_xdel(self, sr): + stream_name = 'xdel_test_stream' + sr.delete(stream_name) + + assert sr.xdel(stream_name, 1) == 0 + + sr.xadd(stream_name, {"foo": "bar"}, id=1) + assert sr.xdel(stream_name, 1) == 1 + + stamp = sr.xadd(stream_name, {"baz": "qaz"}) + assert sr.xdel(stream_name, 1, stamp) == 1 + assert sr.xdel(stream_name, 1, stamp, 42) == 0 + + @skip_if_server_version_lt('5.0.0') + def test_xgroup(self, sr): + stream_name = 'xgroup_test_stream' + sr.delete(stream_name) + group_name = 'xgroup_test_group' + message = {'name': 'boaty', 'other': 'mcboatface'} + b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} + + stamp1 = sr.xadd(stream_name, message) + assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] + + assert sr.xinfo_groups(name=stream_name) == [] + assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) + + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b(stamp1) + assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b('0-0') + + consumer_name = 'captain_jack_sparrow' + + expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] + assert sr.xreadgroup(groupname=group_name, + consumername=consumer_name, + streams={stream_name: '0'}) == expected_value + + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 + sr.xgroup_delconsumer(stream_name, group_name, consumer_name) + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 + + assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 @skip_if_server_version_lt('5.0.0') - def test_strict_xrange(self, sr): + def test_xpending(self, sr): + stream_name = 'xpending_test_stream' + group_name = 'xpending_test_consumer_group' + consumer_name = 'marie' + sr.delete(stream_name) + + sr.xadd(stream_name, {"foo": "bar"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, consumer_name, + streams={stream_name: 0}) + response = sr.xpending(stream_name, group_name) + assert sorted(response.keys()) == ['consumers', 'lower', 'pending', + 'upper'] + + response = sr.xpending_range(stream_name, group_name, + consumername=consumer_name) + assert sorted(response[0].keys()) == ['consumer', 'message_id', + 'time_since_delivered', + 'times_delivered'] + + @skip_if_server_version_lt('5.0.0') + def test_xrange(self, sr): varname = 'xrange_test' sr.delete(varname) assert sr.xlen(varname) == 0 @@ -1652,7 +1750,7 @@ class TestStrictCommands(object): assert sr.xlen(varname) == 4 @skip_if_server_version_lt('5.0.0') - def test_strict_xread(self, sr): + def test_xread(self, sr): varname = 'xread_test' sr.delete(varname) stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) @@ -1666,69 +1764,7 @@ class TestStrictCommands(object): assert results[0][1][0][0] == stamp2 @skip_if_server_version_lt('5.0.0') - def test_strict_xgroup(self, sr): - stream_name = 'xgroup_test_stream' - sr.delete(stream_name) - group_name = 'xgroup_test_group' - message = {'name': 'boaty', 'other': 'mcboatface'} - b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} - - stamp1 = sr.xadd(stream_name, message) - assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] - - assert sr.xinfo_groups(name=stream_name) == [] - assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') - assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) - - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b(stamp1) - assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b('0-0') - - consumer_name = 'captain_jack_sparrow' - - expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] - assert sr.xreadgroup(groupname=group_name, - consumername=consumer_name, - streams={stream_name: '0'}) == expected_value - - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 - sr.xgroup_delconsumer(stream_name, group_name, consumer_name) - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 - - assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xack(self, sr): - stream_name = 'xack_test_stream' - sr.delete(stream_name) - group_name = 'xack_test_group' - - assert sr.xack(stream_name, group_name, 0) == 0 - assert sr.xack(stream_name, group_name, '1-1') == 0 - assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xdel(self, sr): - stream_name = 'xdel_test_stream' - sr.delete(stream_name) - - assert sr.xdel(stream_name, 1) == 0 - - sr.xadd(stream_name, {"foo": "bar"}, id=1) - assert sr.xdel(stream_name, 1) == 1 - - stamp = sr.xadd(stream_name, {"baz": "qaz"}) - assert sr.xdel(stream_name, 1, stamp) == 1 - assert sr.xdel(stream_name, 1, stamp, 42) == 0 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xtrim(self, sr): + def test_xtrim(self, sr): stream_name = 'xtrim_test_stream' sr.delete(stream_name) @@ -1743,55 +1779,8 @@ class TestStrictCommands(object): assert sr.xtrim(stream_name, 234) == 0 assert sr.xtrim(stream_name, 234, approximate=False) == 66 - @skip_if_server_version_lt('5.0.0') - def test_strict_xack(self, sr): - stream_name = 'xack_test_stream' - sr.delete(stream_name) - group_name = 'xack_test_group' - - assert sr.xack(stream_name, group_name, 0) == 0 - assert sr.xack(stream_name, group_name, '1-1') == 0 - assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xclaim(self, sr): - stream_name = 'xclaim_test_stream' - group_name = 'xclaim_test_consumer_group' - sr.delete(stream_name) - - stamp = sr.xadd(stream_name, {"john": "wick"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, 'action_movie_consumer', - streams={stream_name: 0}) - assert sr.xinfo_consumers(stream_name, group_name)[0][ - b('name')] == b('action_movie_consumer') - assert sr.xclaim(stream_name, group_name, 'reeves_fan', - min_idle_time=0, message_ids=(stamp,))[0][0] == stamp - assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', - min_idle_time=0, message_ids=(stamp,), - justid=True) == [b(stamp), ] - - @skip_if_server_version_lt('5.0.0') - def test_strict_xpending(self, sr): - stream_name = 'xpending_test_stream' - group_name = 'xpending_test_consumer_group' - consumer_name = 'marie' - sr.delete(stream_name) - - sr.xadd(stream_name, {"foo": "bar"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, consumer_name, - streams={stream_name: 0}) - response = sr.xpending(stream_name, group_name) - assert sorted(response.keys()) == ['consumers', 'lower', 'pending', - 'upper'] - - response = sr.xpending_range(stream_name, group_name, - consumername=consumer_name) - assert sorted(response[0].keys()) == ['consumer', 'message_id', - 'time_since_delivered', - 'times_delivered'] +class TestStrictCommands(object): def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ -- cgit v1.2.1 From a3cfa77ab01e56d8af6d40c2d9e31f56017ef3eb Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 13:13:41 -0700 Subject: fix comment --- redis/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 1ef2ace..a080a5c 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1760,8 +1760,7 @@ class StrictRedis(object): def xadd(self, name, fields, id='*', maxlen=None, approximate=True): """ Add to a stream. - _name: name of the stream (not using 'name' as this would - prevent 'name' used in the kwargs + name: name of the stream fields: dict of field/value pairs to insert into the stream id: Location to insert this record. By default it is appended. maxlen: truncate old stream members beyond this size -- cgit v1.2.1 From 5f7b956fb1d389e23cdfc4e1e1809fa6662ac368 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 15:17:09 -0700 Subject: alphabetize part 2 --- redis/client.py | 62 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/redis/client.py b/redis/client.py index a080a5c..1e06f18 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2008,6 +2008,37 @@ class StrictRedis(object): pieces.extend(streams.values()) return self.execute_command('XREAD', *pieces) + def xreadgroup(self, groupname, consumername, streams, count=None, + block=None): + """ + Read from a stream via a consumer group. + groupname: name of the consumer group. + consumername: name of the requesting consumer. + streams: a dict of stream names to stream IDs, where + IDs indicate the last ID already seen. + count: if set, only return this many items, beginning with the + earliest available. + block: number of milliseconds to wait, if nothing already present. + """ + pieces = [Token.get_token('GROUP'), groupname, consumername] + if count is not None: + if not isinstance(count, (int, long)) or count < 1: + raise RedisError("XREADGROUP count must be a positive integer") + pieces.append(Token.get_token("COUNT")) + pieces.append(str(count)) + if block is not None: + if not isinstance(block, (int, long)) or block < 0: + raise RedisError("XREADGROUP block must be a non-negative " + "integer") + pieces.append(Token.get_token("BLOCK")) + pieces.append(str(block)) + if not isinstance(streams, dict) or len(streams) == 0: + raise RedisError('XREADGROUP streams must be a non empty dict') + pieces.append(Token.get_token('STREAMS')) + pieces.extend(streams.keys()) + pieces.extend(streams.values()) + return self.execute_command('XREADGROUP', *pieces) + def xrevrange(self, name, start='+', finish='-', count=None): """ Read stream values within an interval, in reverse order. @@ -2041,37 +2072,6 @@ class StrictRedis(object): pieces.append(maxlen) return self.execute_command('XTRIM', name, *pieces) - def xreadgroup(self, groupname, consumername, streams, count=None, - block=None): - """ - Read from a stream via a consumer group. - groupname: name of the consumer group. - consumername: name of the requesting consumer. - streams: a dict of stream names to stream IDs, where - IDs indicate the last ID already seen. - count: if set, only return this many items, beginning with the - earliest available. - block: number of milliseconds to wait, if nothing already present. - """ - pieces = [Token.get_token('GROUP'), groupname, consumername] - if count is not None: - if not isinstance(count, (int, long)) or count < 1: - raise RedisError("XREADGROUP count must be a positive integer") - pieces.append(Token.get_token("COUNT")) - pieces.append(str(count)) - if block is not None: - if not isinstance(block, (int, long)) or block < 0: - raise RedisError("XREADGROUP block must be a non-negative " - "integer") - pieces.append(Token.get_token("BLOCK")) - pieces.append(str(block)) - if not isinstance(streams, dict) or len(streams) == 0: - raise RedisError('XREADGROUP streams must be a non empty dict') - pieces.append(Token.get_token('STREAMS')) - pieces.extend(streams.keys()) - pieces.extend(streams.values()) - return self.execute_command('XREADGROUP', *pieces) - # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ -- cgit v1.2.1 From ad67d89d0fe34b59b798be68c787277f3ab781fc Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 22:32:14 -0700 Subject: refactor a bunch of the tests. - split out tests for each client function - alphabetize - make sure response callbacks return system info dicts with native string keys rather than byte strings. - make sure empty versions of commands that typically return a list return an empty list when streams or messages don't exist --- redis/client.py | 105 +++++----- tests/test_commands.py | 516 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 427 insertions(+), 194 deletions(-) diff --git a/redis/client.py b/redis/client.py index 1e06f18..f8d13b7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -182,10 +182,15 @@ def parse_sentinel_get_master(response): return response and (response[0], int(response[1])) or None -def pairs_to_dict(response): +def pairs_to_dict(response, decode_keys=False): "Create a dict given a list of key/value pairs" - it = iter(response) - return dict(izip(it, it)) + if decode_keys: + # the iter form is faster, but I don't know how to make that work + # with a nativestr() map + return dict(izip(imap(nativestr, response[::2]), response[1::2])) + else: + it = iter(response) + return dict(izip(it, it)) def pairs_to_dict_typed(response, type_info): @@ -238,26 +243,12 @@ def parse_stream_list(response): return [(r[0], pairs_to_dict(r[1])) for r in response] -def parse_recursive_dict(response): - if response is None: - return None - result = {} - while response: - k = response.pop(0) - v = response.pop(0) - if isinstance(v, list): - v = parse_recursive_dict(v) - result[k] = v - return result +def pairs_to_dict_with_nativestr_keys(response): + return pairs_to_dict(response, decode_keys=True) -def parse_list_of_recursive_dicts(response): - if response is None: - return None - result = [] - for group in response: - result.append(parse_recursive_dict(group)) - return result +def parse_list_of_dicts(response): + return list(imap(pairs_to_dict_with_nativestr_keys, response)) def parse_xclaim(response): @@ -266,6 +257,15 @@ def parse_xclaim(response): return parse_stream_list(response) +def parse_xinfo_stream(response): + data = pairs_to_dict(response, decode_keys=True) + first = data['first-entry'] + data['first-entry'] = (first[0], pairs_to_dict(first[1])) + last = data['last-entry'] + data['last-entry'] = (last[0], pairs_to_dict(last[1])) + return data + + def parse_xread(response): if response is None: return [] @@ -273,33 +273,20 @@ def parse_xread(response): def parse_xpending(response, **options): - if isinstance(response, list): - if options.get('parse_detail', False): - return parse_range_xpending(response) - consumers = [] - for consumer_name, consumer_pending in response[3]: - consumers.append({ - 'name': consumer_name, - 'pending': consumer_pending - }) - return { - 'pending': response[0], - 'lower': response[1], - 'upper': response[2], - 'consumers': consumers - } + if options.get('parse_detail', False): + return parse_xpending_range(response) + consumers = [{'name': n, 'pending': long(p)} for n, p in response[3] or []] + return { + 'pending': response[0], + 'min': response[1], + 'max': response[2], + 'consumers': consumers + } -def parse_range_xpending(response): - result = [] - for message in response: - result.append({ - 'message_id': message[0], - 'consumer': message[1], - 'time_since_delivered': message[2], - 'times_delivered': message[3] - }) - return result +def parse_xpending_range(response): + k = ('message_id', 'consumer', 'time_since_delivered', 'times_delivered') + return [dict(izip(k, r)) for r in response] def float_or_none(response): @@ -529,11 +516,11 @@ class StrictRedis(object): 'XCLAIM': parse_xclaim, 'XGROUP CREATE': bool_ok, 'XGROUP DELCONSUMER': int, - 'XGROUP DESTROY': int, + 'XGROUP DESTROY': bool, 'XGROUP SETID': bool_ok, - 'XINFO CONSUMERS': parse_list_of_recursive_dicts, - 'XINFO GROUPS': parse_list_of_recursive_dicts, - 'XINFO STREAM': parse_recursive_dict, + 'XINFO CONSUMERS': parse_list_of_dicts, + 'XINFO GROUPS': parse_list_of_dicts, + 'XINFO STREAM': parse_xinfo_stream, 'XPENDING': parse_xpending, 'ZSCAN': parse_zscan, } @@ -1757,6 +1744,15 @@ class StrictRedis(object): return self.execute_command('SUNIONSTORE', dest, *args) # STREAMS COMMANDS + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + def xadd(self, name, fields, id='*', maxlen=None, approximate=True): """ Add to a stream. @@ -1782,15 +1778,6 @@ class StrictRedis(object): pieces.extend(pair) return self.execute_command('XADD', name, *pieces) - def xack(self, name, groupname, *ids): - """ - Acknowledges the successful processing of one or more messages. - name: name of the stream. - groupname: name of the consumer group. - *ids: message ids to acknowlege. - """ - return self.execute_command('XACK', name, groupname, *ids) - def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, idle=None, time=None, retrycount=None, force=False, justid=False): diff --git a/tests/test_commands.py b/tests/test_commands.py index 824fc44..4e11b55 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2,6 +2,7 @@ from __future__ import with_statement import binascii import datetime import pytest +import re import redis import time @@ -34,6 +35,13 @@ def redis_server_time(client): return datetime.datetime.fromtimestamp(timestamp) +def get_stream_message(client, stream, message_id): + "Fetch a stream message and format it as a (message_id, fields) pair" + response = client.xrange(stream, start=message_id, finish=message_id) + assert len(response) == 1 + return response[0] + + # RESPONSE CALLBACKS class TestResponseCallbacks(object): "Tests for the response callback system" @@ -1601,183 +1609,421 @@ class TestRedisCommands(object): (2.1909382939338684, 41.433790281840835)]] @skip_if_server_version_lt('5.0.0') - def test_xack(self, sr): - stream_name = 'xack_test_stream' - sr.delete(stream_name) - group_name = 'xack_test_group' + def test_xack(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + # xack on a stream that doesn't exist + assert r.xack(stream, group, '0-0') == 0 + + m1 = r.xadd(stream, {'one': 'one'}) + m2 = r.xadd(stream, {'two': 'two'}) + m3 = r.xadd(stream, {'three': 'three'}) + + # xack on a group that doesn't exist + assert r.xack(stream, group, m1) == 0 + + r.xgroup_create(stream, group, 0) + r.xreadgroup(group, consumer, streams={stream: 0}) + # xack returns the number of ack'd elements + assert r.xack(stream, group, m1) == 1 + assert r.xack(stream, group, m2, m3) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xadd(self, r): + stream = 'stream' + message_id = r.xadd(stream, {'foo': 'bar'}) + assert re.match(rb'[0-9]+\-[0-9]+', message_id) - assert sr.xack(stream_name, group_name, 0) == 0 - assert sr.xack(stream_name, group_name, '1-1') == 0 - assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + # explicit message id + message_id = b('9999999999999999999-0') + assert message_id == r.xadd(stream, {'foo': 'bar'}, id=message_id) + + # with maxlen, the list evicts the first message + r.xadd(stream, {'foo': 'bar'}, maxlen=2, approximate=False) + assert r.xlen(stream) == 2 @skip_if_server_version_lt('5.0.0') - def test_xclaim(self, sr): - stream_name = 'xclaim_test_stream' - group_name = 'xclaim_test_consumer_group' - sr.delete(stream_name) - - stamp = sr.xadd(stream_name, {"john": "wick"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, 'action_movie_consumer', - streams={stream_name: 0}) - assert sr.xinfo_consumers(stream_name, group_name)[0][ - b('name')] == b('action_movie_consumer') - assert sr.xclaim(stream_name, group_name, 'reeves_fan', - min_idle_time=0, message_ids=(stamp,))[0][0] == stamp - assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', - min_idle_time=0, message_ids=(stamp,), - justid=True) == [b(stamp), ] + def test_xclaim(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + + message_id = r.xadd(stream, {'john': 'wick'}) + message = get_stream_message(r, stream, message_id) + r.xgroup_create(stream, group, 0) + + # trying to claim a message that isn't already pending doesn't + # do anything + response = r.xclaim(stream, group, consumer2, + min_idle_time=0, message_ids=(message_id,)) + assert response == [] + + # read the group as consumer1 to initially claim the messages + r.xreadgroup(group, consumer1, streams={stream: 0}) + + # claim the message as consumer2 + response = r.xclaim(stream, group, consumer2, + min_idle_time=0, message_ids=(message_id,)) + assert response[0] == message + + # reclaim the message as consumer1, but use the justid argument + # which only returns message ids + assert r.xclaim(stream, group, consumer1, + min_idle_time=0, message_ids=(message_id,), + justid=True) == [message_id] @skip_if_server_version_lt('5.0.0') - def test_xdel(self, sr): - stream_name = 'xdel_test_stream' - sr.delete(stream_name) + def test_xdel(self, r): + stream = 'stream' - assert sr.xdel(stream_name, 1) == 0 + # deleting from an empty stream doesn't do anything + assert r.xdel(stream, 1) == 0 - sr.xadd(stream_name, {"foo": "bar"}, id=1) - assert sr.xdel(stream_name, 1) == 1 + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) - stamp = sr.xadd(stream_name, {"baz": "qaz"}) - assert sr.xdel(stream_name, 1, stamp) == 1 - assert sr.xdel(stream_name, 1, stamp, 42) == 0 + # xdel returns the number of deleted elements + assert r.xdel(stream, m1) == 1 + assert r.xdel(stream, m2, m3) == 2 @skip_if_server_version_lt('5.0.0') - def test_xgroup(self, sr): - stream_name = 'xgroup_test_stream' - sr.delete(stream_name) - group_name = 'xgroup_test_group' - message = {'name': 'boaty', 'other': 'mcboatface'} - b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} + def test_xgroup_create(self, r): + # tests xgroup_create and xinfo_groups + stream = 'stream' + group = 'group' + r.xadd(stream, {'foo': 'bar'}) + + # no group is setup yet, no info to obtain + assert r.xinfo_groups(stream) == [] + + assert r.xgroup_create(stream, group, 0) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': b('0-0') + }] + assert r.xinfo_groups(stream) == expected - stamp1 = sr.xadd(stream_name, message) - assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] + @skip_if_server_version_lt('5.0.0') + def test_xgroup_delconsumer(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) - assert sr.xinfo_groups(name=stream_name) == [] - assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') - assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) + # a consumer that hasn't yet read any messages doesn't do anything + assert r.xgroup_delconsumer(stream, group, consumer) == 0 - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b(stamp1) - assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b('0-0') + # read all messages from the group + r.xreadgroup(group, consumer, streams={stream: 0}) - consumer_name = 'captain_jack_sparrow' + # deleting the consumer should return 2 pending messages + assert r.xgroup_delconsumer(stream, group, consumer) == 2 - expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] - assert sr.xreadgroup(groupname=group_name, - consumername=consumer_name, - streams={stream_name: '0'}) == expected_value + @skip_if_server_version_lt('5.0.0') + def test_xgroup_destroy(self, r): + stream = 'stream' + group = 'group' + r.xadd(stream, {'foo': 'bar'}) - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 - sr.xgroup_delconsumer(stream_name, group_name, consumer_name) - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 + # destroying a nonexistent group returns False + assert not r.xgroup_destroy(stream, group) - assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 + r.xgroup_create(stream, group, 0) + assert r.xgroup_destroy(stream, group) @skip_if_server_version_lt('5.0.0') - def test_xpending(self, sr): - stream_name = 'xpending_test_stream' - group_name = 'xpending_test_consumer_group' - consumer_name = 'marie' - sr.delete(stream_name) - - sr.xadd(stream_name, {"foo": "bar"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, consumer_name, - streams={stream_name: 0}) - response = sr.xpending(stream_name, group_name) - assert sorted(response.keys()) == ['consumers', 'lower', 'pending', - 'upper'] - - response = sr.xpending_range(stream_name, group_name, - consumername=consumer_name) - assert sorted(response[0].keys()) == ['consumer', 'message_id', - 'time_since_delivered', - 'times_delivered'] + def test_xgroup_setid(self, r): + stream = 'stream' + group = 'group' + message_id = r.xadd(stream, {'foo': 'bar'}) + + r.xgroup_create(stream, group, 0) + # advance the last_delivered_id to the message_id + r.xgroup_setid(stream, group, message_id) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': message_id + }] + assert r.xinfo_groups(stream) == expected @skip_if_server_version_lt('5.0.0') - def test_xrange(self, sr): - varname = 'xrange_test' - sr.delete(varname) - assert sr.xlen(varname) == 0 - stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) - assert sr.xlen(varname) == 1 - stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) - assert sr.xlen(varname) == 2 - assert stamp1 != stamp2 - - milli, offset = stamp2.decode('utf-8').split('-') - new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8') - stamp3 = sr.xadd(varname, {"foo": "bar"}, id=new_id) - assert sr.xlen(varname) == 3 - assert stamp3 == new_id - stamp4 = sr.xadd(varname, {"foo": "baz"}) - assert sr.xlen(varname) == 4 + def test_xinfo_consumers(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + r.xadd(stream, {'foo': 'bar'}) + + r.xgroup_create(stream, group, 0) + r.xreadgroup(group, consumer1, streams={stream: 0}) + r.xreadgroup(group, consumer2, streams={stream: 0}) + info = r.xinfo_consumers(stream, group) + assert len(info) == 2 + expected = [ + {'name': b(consumer1), 'pending': 1}, + {'name': b(consumer2), 'pending': 0}, + ] + + # we can't determine the idle time, so just make sure it's an int + assert isinstance(info[0].pop('idle'), int) + assert isinstance(info[1].pop('idle'), int) + assert info == expected + + @skip_if_server_version_lt('5.0.0') + def test_xinfo_stream(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + info = r.xinfo_stream(stream) + + assert info['length'] == 2 + assert info['first-entry'] == get_stream_message(r, stream, m1) + assert info['last-entry'] == get_stream_message(r, stream, m2) + + @skip_if_server_version_lt('5.0.0') + def test_xlen(self, r): + stream = 'stream' + assert r.xlen(stream) == 0 + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + assert r.xlen(stream) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xpending(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + + # xpending on a group that has no consumers yet + expected = { + 'pending': 0, + 'min': None, + 'max': None, + 'consumers': [] + } + assert r.xpending(stream, group) == expected + + # read 1 message from the group with each consumer + r.xreadgroup(group, consumer1, streams={stream: 0}, count=1) + r.xreadgroup(group, consumer2, streams={stream: m1}, count=1) + + expected = { + 'pending': 2, + 'min': m1, + 'max': m2, + 'consumers': [ + {'name': b(consumer1), 'pending': 1}, + {'name': b(consumer2), 'pending': 1}, + ] + } + assert r.xpending(stream, group) == expected + + @skip_if_server_version_lt('5.0.0') + def test_xpending_range(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + + # xpending range on a group that has no consumers yet + assert r.xpending_range(stream, group) == [] + + # read 1 message from the group with each consumer + r.xreadgroup(group, consumer1, streams={stream: 0}, count=1) + r.xreadgroup(group, consumer2, streams={stream: m1}, count=1) + + response = r.xpending_range(stream, group) + assert len(response) == 2 + assert response[0]['message_id'] == m1 + assert response[0]['consumer'] == b(consumer1) + assert response[1]['message_id'] == m2 + assert response[1]['consumer'] == b(consumer2) + + @skip_if_server_version_lt('5.0.0') + def test_xrange(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) + m4 = r.xadd(stream, {'foo': 'bar'}) def get_ids(results): return [result[0] for result in results] - results = sr.xrange(varname, start=stamp1) - assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4] + results = r.xrange(stream, start=m1) + assert get_ids(results) == [m1, m2, m3, m4] - results = sr.xrange(varname, start=stamp2, finish=stamp3) - assert get_ids(results) == [stamp2, stamp3] + results = r.xrange(stream, start=m2, finish=m3) + assert get_ids(results) == [m2, m3] - results = sr.xrange(varname, finish=stamp3) - assert get_ids(results) == [stamp1, stamp2, stamp3] + results = r.xrange(stream, finish=m3) + assert get_ids(results) == [m1, m2, m3] - results = sr.xrange(varname, finish=stamp2, count=1) - assert get_ids(results) == [stamp1] + results = r.xrange(stream, finish=m2, count=1) + assert get_ids(results) == [m1] - results = sr.xrevrange(varname, start=stamp4) - assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1] + @skip_if_server_version_lt('5.0.0') + def test_xread(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'bing': 'baz'}) - results = sr.xrevrange(varname, start=stamp3, finish=stamp2) - assert get_ids(results) == [stamp3, stamp2] + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at 0 returns both messages + assert r.xread(streams={stream: 0}) == expected - results = sr.xrevrange(varname, finish=stamp3) - assert get_ids(results) == [stamp4, stamp3] + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + ] + ] + ] + # xread starting at 0 and count=1 returns only the first message + assert r.xread(streams={stream: 0}, count=1) == expected + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at m1 returns only the second message + assert r.xread(streams={stream: m1}) == expected + + # xread starting at the last message returns an empty list + assert r.xread(streams={stream: m2}) == [] + + @skip_if_server_version_lt('5.0.0') + def test_xreadgroup(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'bing': 'baz'}) + r.xgroup_create(stream, group, 0) + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at 0 returns both messages + assert r.xreadgroup(group, consumer, streams={stream: 0}) == expected + + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + ] + ] + ] + # xread starting at 0 and count=1 returns only the first message + assert r.xreadgroup(group, consumer, streams={stream: 0}, count=1) == \ + expected + + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) - results = sr.xrevrange(varname, finish=stamp2, count=1) - assert get_ids(results) == [stamp4] + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at m1 returns only the second message + assert r.xreadgroup(group, consumer, streams={stream: m1}) == expected - assert sr.xlen(varname) == 4 + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) + + # xread starting at the last message returns an empty message list + expected = [ + [ + stream, + [] + ] + ] + assert r.xreadgroup(group, consumer, streams={stream: m2}) == expected @skip_if_server_version_lt('5.0.0') - def test_xread(self, sr): - varname = 'xread_test' - sr.delete(varname) - stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) - stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) - assert stamp1 != stamp2 + def test_xrevrange(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) + m4 = r.xadd(stream, {'foo': 'bar'}) - results = sr.xread(streams={varname: '$'}, count=10, block=10) - assert results == [] + def get_ids(results): + return [result[0] for result in results] + + results = r.xrevrange(stream, start=m4) + assert get_ids(results) == [m4, m3, m2, m1] - results = sr.xread(count=3, block=0, streams={varname: stamp1}) - assert results[0][1][0][0] == stamp2 + results = r.xrevrange(stream, start=m3, finish=m2) + assert get_ids(results) == [m3, m2] + + results = r.xrevrange(stream, finish=m3) + assert get_ids(results) == [m4, m3] + + results = r.xrevrange(stream, finish=m2, count=1) + assert get_ids(results) == [m4] @skip_if_server_version_lt('5.0.0') - def test_xtrim(self, sr): - stream_name = 'xtrim_test_stream' - sr.delete(stream_name) + def test_xtrim(self, r): + stream = 'stream' + + # trimming an empty key doesn't do anything + assert r.xtrim(stream, 1000) == 0 - assert sr.xtrim(stream_name, 1000) == 0 + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) - for i in range(300): - sr.xadd(stream_name, {"index": i}) + # trimming an amount large than the number of messages + # doesn't do anything + assert r.xtrim(stream, 5, approximate=False) == 0 - assert sr.xtrim(stream_name, 1000, approximate=False) == 0 - assert sr.xtrim(stream_name, 300) == 0 - assert sr.xtrim(stream_name, 299) == 0 - assert sr.xtrim(stream_name, 234) == 0 - assert sr.xtrim(stream_name, 234, approximate=False) == 66 + # 1 message is trimmed + assert r.xtrim(stream, 3, approximate=False) == 1 class TestStrictCommands(object): -- cgit v1.2.1 From eaf6b2d23ad3e9a46f141cc4fa0fa62240fadba9 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 22:45:02 -0700 Subject: rename start/finish and start/end args to min/max on all stream commands this is consistent with the rest of the library and is clearer terminology to the end user --- redis/client.py | 22 +++++++++++----------- tests/test_commands.py | 18 +++++++++--------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/redis/client.py b/redis/client.py index f8d13b7..2ab938a 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1918,7 +1918,7 @@ class StrictRedis(object): """ return self.execute_command('XPENDING', name, groupname) - def xpending_range(self, name, groupname, start='-', end='+', count=-1, + def xpending_range(self, name, groupname, min='-', max='+', count=-1, consumername=None): """ Returns information about pending messages, in a range. @@ -1933,22 +1933,22 @@ class StrictRedis(object): consumername: name of a consumer to filter by (optional). """ pieces = [name, groupname] - if start is not None or end is not None or count is not None: - if start is None or end is None or count is None: - raise RedisError("XPENDING must be provided with start, end " + if min is not None or max is not None or count is not None: + if min is None or max is None or count is None: + raise RedisError("XPENDING must be provided with min, max " "and count parameters, or none of them. ") if not isinstance(count, (int, long)) or count < -1: raise RedisError("XPENDING count must be a integer >= -1") - pieces.extend((start, end, str(count))) + pieces.extend((min, max, str(count))) if consumername is not None: - if start is None or end is None or count is None: + if min is None or max is None or count is None: raise RedisError("if XPENDING is provided with consumername," - " it must be provided with start, end and" + " it must be provided with min, max and" " count parameters") pieces.append(consumername) return self.execute_command('XPENDING', *pieces, parse_detail=True) - def xrange(self, name, start='-', finish='+', count=None): + def xrange(self, name, min='-', max='+', count=None): """ Read stream values within an interval. name: name of the stream. @@ -1959,7 +1959,7 @@ class StrictRedis(object): count: if set, only return this many items, beginning with the earliest available. """ - pieces = [start, finish] + pieces = [min, max] if count is not None: if not isinstance(count, (int, long)) or count < 1: raise RedisError('XRANGE count must be a positive integer') @@ -2026,7 +2026,7 @@ class StrictRedis(object): pieces.extend(streams.values()) return self.execute_command('XREADGROUP', *pieces) - def xrevrange(self, name, start='+', finish='-', count=None): + def xrevrange(self, name, max='+', min='-', count=None): """ Read stream values within an interval, in reverse order. name: name of the stream @@ -2037,7 +2037,7 @@ class StrictRedis(object): count: if set, only return this many items, beginning with the latest available. """ - pieces = [start, finish] + pieces = [max, min] if count is not None: if not isinstance(count, (int, long)) or count < 1: raise RedisError('XREVRANGE count must be a positive integer') diff --git a/tests/test_commands.py b/tests/test_commands.py index 4e11b55..991d50a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -37,7 +37,7 @@ def redis_server_time(client): def get_stream_message(client, stream, message_id): "Fetch a stream message and format it as a (message_id, fields) pair" - response = client.xrange(stream, start=message_id, finish=message_id) + response = client.xrange(stream, min=message_id, max=message_id) assert len(response) == 1 return response[0] @@ -1866,16 +1866,16 @@ class TestRedisCommands(object): def get_ids(results): return [result[0] for result in results] - results = r.xrange(stream, start=m1) + results = r.xrange(stream, min=m1) assert get_ids(results) == [m1, m2, m3, m4] - results = r.xrange(stream, start=m2, finish=m3) + results = r.xrange(stream, min=m2, max=m3) assert get_ids(results) == [m2, m3] - results = r.xrange(stream, finish=m3) + results = r.xrange(stream, max=m3) assert get_ids(results) == [m1, m2, m3] - results = r.xrange(stream, finish=m2, count=1) + results = r.xrange(stream, max=m2, count=1) assert get_ids(results) == [m1] @skip_if_server_version_lt('5.0.0') @@ -1994,16 +1994,16 @@ class TestRedisCommands(object): def get_ids(results): return [result[0] for result in results] - results = r.xrevrange(stream, start=m4) + results = r.xrevrange(stream, max=m4) assert get_ids(results) == [m4, m3, m2, m1] - results = r.xrevrange(stream, start=m3, finish=m2) + results = r.xrevrange(stream, max=m3, min=m2) assert get_ids(results) == [m3, m2] - results = r.xrevrange(stream, finish=m3) + results = r.xrevrange(stream, min=m3) assert get_ids(results) == [m4, m3] - results = r.xrevrange(stream, finish=m2, count=1) + results = r.xrevrange(stream, min=m2, count=1) assert get_ids(results) == [m4] @skip_if_server_version_lt('5.0.0') -- cgit v1.2.1 From ad6d36257692f5f3a5874ba1cc004ad57f145d0e Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 22:50:35 -0700 Subject: explicit parsing of xclaim response with justid=True --- redis/client.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/redis/client.py b/redis/client.py index 2ab938a..45f1ff0 100755 --- a/redis/client.py +++ b/redis/client.py @@ -251,8 +251,8 @@ def parse_list_of_dicts(response): return list(imap(pairs_to_dict_with_nativestr_keys, response)) -def parse_xclaim(response): - if all(isinstance(r, (basestring, bytes)) for r in response): +def parse_xclaim(response, **options): + if options.get('parse_justid', False): return response return parse_stream_list(response) @@ -1810,6 +1810,7 @@ class StrictRedis(object): raise RedisError("XCLAIM message_ids must be a non empty list or " "tuple of message IDs to claim") + kwargs = {} pieces = [name, groupname, consumername, str(min_idle_time)] pieces.extend(list(message_ids)) @@ -1834,7 +1835,8 @@ class StrictRedis(object): if not isinstance(justid, bool): raise RedisError("XCLAIM justid must be a boolean") pieces.append(Token.get_token('JUSTID')) - return self.execute_command('XCLAIM', *pieces) + kwargs['parse_justid'] = True + return self.execute_command('XCLAIM', *pieces, **kwargs) def xdel(self, name, *ids): """ -- cgit v1.2.1 From 84efaa73e71d83c8e2ff86c9e0d7fade851cf1e8 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 22:56:45 -0700 Subject: avoid calling pytest fixtures directly since that's frowned upon --- tests/test_encoding.py | 7 ++++--- tests/test_pubsub.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 8b5bf5a..4c8a36c 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -1,14 +1,15 @@ from __future__ import with_statement import pytest +import redis from redis._compat import unichr, u, unicode -from .conftest import r as _redis_client +from .conftest import _get_client class TestEncoding(object): @pytest.fixture() def r(self, request): - return _redis_client(request=request, decode_responses=True) + return _get_client(redis.Redis, request=request, decode_responses=True) def test_simple_encoding(self, r): unicode_string = unichr(3456) + u('abcd') + unichr(3421) @@ -34,7 +35,7 @@ class TestEncoding(object): class TestCommandsAndTokensArentEncoded(object): @pytest.fixture() def r(self, request): - return _redis_client(request=request, encoding='utf-16') + return _get_client(redis.Redis, request=request, encoding='utf-16') def test_basic_command(self, r): r.set('hello', 'world') diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index a240248..01a7129 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -6,7 +6,7 @@ import redis from redis.exceptions import ConnectionError from redis._compat import basestring, u, unichr, b -from .conftest import r as _redis_client +from .conftest import _get_client from .conftest import skip_if_server_version_lt @@ -316,7 +316,7 @@ class TestPubSubAutoDecoding(object): @pytest.fixture() def r(self, request): - return _redis_client(request=request, decode_responses=True) + return _get_client(redis.Redis, request=request, decode_responses=True) def test_channel_subscribe_unsubscribe(self, r): p = r.pubsub() -- cgit v1.2.1 From c22956dcb23845830b471b5c8bbdf675f3b9debb Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 23:03:12 -0700 Subject: guarentee consistency by mapping the keys/values with one call --- redis/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 45f1ff0..15be7b8 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1993,8 +1993,9 @@ class StrictRedis(object): if not isinstance(streams, dict) or len(streams) == 0: raise RedisError('XREAD streams must be a non empty dict') pieces.append(Token.get_token('STREAMS')) - pieces.extend(streams.keys()) - pieces.extend(streams.values()) + keys, values = izip(*iteritems(streams)) + pieces.extend(keys) + pieces.extend(values) return self.execute_command('XREAD', *pieces) def xreadgroup(self, groupname, consumername, streams, count=None, -- cgit v1.2.1 From 47dc6aec85dab10a2a1a6dfbb25f78062c7285d9 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 23:12:18 -0700 Subject: python 2 likes binary regex hints specified as 'br' rather than 'rb' --- tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 991d50a..78888fb 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1633,7 +1633,7 @@ class TestRedisCommands(object): def test_xadd(self, r): stream = 'stream' message_id = r.xadd(stream, {'foo': 'bar'}) - assert re.match(rb'[0-9]+\-[0-9]+', message_id) + assert re.match(br'[0-9]+\-[0-9]+', message_id) # explicit message id message_id = b('9999999999999999999-0') -- cgit v1.2.1 From c7dbaa142917faaf6075ad8baec9f10b3731794d Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Wed, 31 Oct 2018 23:19:59 -0700 Subject: python2 compat --- tests/test_commands.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 78888fb..3f6c5b1 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -7,7 +7,7 @@ import redis import time from redis._compat import (unichr, u, b, ascii_letters, iteritems, iterkeys, - itervalues) + itervalues, long) from redis.client import parse_info from redis import exceptions @@ -1774,8 +1774,8 @@ class TestRedisCommands(object): ] # we can't determine the idle time, so just make sure it's an int - assert isinstance(info[0].pop('idle'), int) - assert isinstance(info[1].pop('idle'), int) + assert isinstance(info[0].pop('idle'), (int, long)) + assert isinstance(info[1].pop('idle'), (int, long)) assert info == expected @skip_if_server_version_lt('5.0.0') -- cgit v1.2.1 From 00aa265b5b30c6d9181306c8de2bd21611815f58 Mon Sep 17 00:00:00 2001 From: Faheel Ahmad Date: Thu, 1 Nov 2018 12:28:57 +0530 Subject: Add missing space --- redis/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 45f1ff0..df67955 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2525,14 +2525,14 @@ class StrictRedis(object): def geohash(self, name, *values): """ Return the geo hash string for each item of ``values`` members of - the specified key identified by the ``name``argument. + the specified key identified by the ``name`` argument. """ return self.execute_command('GEOHASH', name, *values) def geopos(self, name, *values): """ Return the positions of each item of ``values`` as members of - the specified key identified by the ``name``argument. Each position + the specified key identified by the ``name`` argument. Each position is represented by the pairs lon and lat. """ return self.execute_command('GEOPOS', name, *values) -- cgit v1.2.1 From e1b13b3b5366a422b182bbb7ff2b30d479740a96 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Thu, 1 Nov 2018 07:15:09 -0700 Subject: fix test that breaks if other clients are connected to the redis server --- tests/test_commands.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 3f6c5b1..22aafd3 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -78,11 +78,10 @@ class TestRedisCommands(object): @skip_if_server_version_lt('2.6.9') def test_client_list_after_client_setname(self, r): - r.client_setname('cl=i=ent') + r.client_setname('redis_py_test') clients = r.client_list() - assert isinstance(clients[0], dict) - assert 'name' in clients[0] - assert clients[0]['name'] == 'cl=i=ent' + # we don't know which client ours will be + assert 'redis_py_test' in [c['name'] for c in clients] def test_config_get(self, r): data = r.config_get() -- cgit v1.2.1 From c2227960548169c0f0549750e0a1f7bd0a126177 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Thu, 1 Nov 2018 14:11:28 -0700 Subject: remove unncessary deletes at beginning of tests --- tests/test_commands.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 579a85e..1d86e93 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -978,7 +978,6 @@ class TestRedisCommands(object): @skip_if_server_version_lt('4.9.0') def test_zpopmax(self, r): - r.delete('a') r.zadd('a', a1=1, a2=2, a3=3) assert r.zpopmax('a') == [(b('a3'), 3)] @@ -988,7 +987,6 @@ class TestRedisCommands(object): @skip_if_server_version_lt('4.9.0') def test_zpopmin(self, r): - r.delete('a') r.zadd('a', a1=1, a2=2, a3=3) assert r.zpopmin('a') == [(b('a1'), 1)] @@ -998,9 +996,6 @@ class TestRedisCommands(object): @skip_if_server_version_lt('4.9.0') def test_bzpopmax(self, r): - r.delete('a') - r.delete('b') - r.delete('c') r.zadd('a', a1=1, a2=2) r.zadd('b', b1=10, b2=20) assert r.bzpopmax(['b', 'a'], timeout=1) == (b('b'), b('b2'), 20) @@ -1013,9 +1008,6 @@ class TestRedisCommands(object): @skip_if_server_version_lt('4.9.0') def test_bzpopmin(self, r): - r.delete('a') - r.delete('b') - r.delete('c') r.zadd('a', a1=1, a2=2) r.zadd('b', b1=10, b2=20) assert r.bzpopmin(['b', 'a'], timeout=1) == (b('b'), b('b1'), 10) -- cgit v1.2.1 From c8a7445186059ac24e7ba5d3343d69b3a8527f5b Mon Sep 17 00:00:00 2001 From: Jon Dufresne Date: Thu, 1 Nov 2018 18:02:34 -0700 Subject: Remove reference to deprecated easy_install easy_install is deprecated and its use is discouraged by PyPA: https://setuptools.readthedocs.io/en/latest/easy_install.html > Warning: Easy Install is deprecated. Do not use it. Instead use pip. Follow upstream advice and only recommended supported tools. --- README.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.rst b/README.rst index a53d8c4..e89014d 100644 --- a/README.rst +++ b/README.rst @@ -156,8 +156,7 @@ kind enough to create Python bindings. Using Hiredis can provide up to a performance increase is most noticeable when retrieving many pieces of data, such as from LRANGE or SMEMBERS operations. -Hiredis is available on PyPI, and can be installed via pip or easy_install -just like redis-py. +Hiredis is available on PyPI, and can be installed via pip just like redis-py. .. code-block:: bash -- cgit v1.2.1 From d8b1fa623e6aac09b5b10d793de9d13562d5570d Mon Sep 17 00:00:00 2001 From: Jon Dufresne Date: Sat, 22 Sep 2018 21:29:01 -0700 Subject: Prefer https:// for URLs when available --- CHANGES | 6 +++--- README.rst | 12 ++++++------ redis/client.py | 2 +- redis/connection.py | 7 ++++--- setup.py | 2 +- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/CHANGES b/CHANGES index d048ea5..fb10389 100644 --- a/CHANGES +++ b/CHANGES @@ -190,7 +190,7 @@ for the report. * Connections now call socket.shutdown() prior to socket.close() to ensure communication ends immediately per the note at - http://docs.python.org/2/library/socket.html#socket.socket.close + https://docs.python.org/2/library/socket.html#socket.socket.close Thanks to David Martin for pointing this out. * Lock checks are now based on floats rather than ints. Thanks Vitja Makarov. @@ -224,11 +224,11 @@ * Prevent DISCARD from being called if MULTI wasn't also called. Thanks Pete Aykroyd. * SREM now returns an integer indicating the number of items removed from - the set. Thanks http://github.com/ronniekk. + the set. Thanks https://github.com/ronniekk. * Fixed a bug with BGSAVE and BGREWRITEAOF response callbacks with Python3. Thanks Nathan Wan. * Added CLIENT GETNAME and CLIENT SETNAME commands. - Thanks http://github.com/bitterb. + Thanks https://github.com/bitterb. * It's now possible to use len() on a pipeline instance to determine the number of commands that will be executed. Thanks Jon Parise. * Fixed a bug in INFO's parse routine with floating point numbers. Thanks diff --git a/README.rst b/README.rst index a53d8c4..d750026 100644 --- a/README.rst +++ b/README.rst @@ -4,7 +4,7 @@ redis-py The Python interface to the Redis key-value store. .. image:: https://secure.travis-ci.org/andymccurdy/redis-py.png?branch=master - :target: http://travis-ci.org/andymccurdy/redis-py + :target: https://travis-ci.org/andymccurdy/redis-py .. image:: https://readthedocs.org/projects/redis-py/badge/?version=latest&style=flat :target: https://redis-py.readthedocs.io/en/latest/ .. image:: https://badge.fury.io/py/redis.svg @@ -14,7 +14,7 @@ Installation ------------ redis-py requires a running Redis server. See `Redis's quickstart -`_ for installation instructions. +`_ for installation instructions. To install redis-py, simply: @@ -53,7 +53,7 @@ specified. API Reference ------------- -The `official Redis command documentation `_ does a +The `official Redis command documentation `_ does a great job of explaining each command in detail. redis-py exposes two client classes that implement these commands. The StrictRedis class attempts to adhere to the official command syntax. There are a few exceptions: @@ -614,7 +614,7 @@ execution. Sentinel support ^^^^^^^^^^^^^^^^ -redis-py can be used together with `Redis Sentinel `_ +redis-py can be used together with `Redis Sentinel `_ to discover Redis nodes. You need to have at least one Sentinel daemon running in order to use redis-py's Sentinel support. @@ -655,7 +655,7 @@ If no slaves can be connected to, a connection will be established with the master. See `Guidelines for Redis clients with support for Redis Sentinel -`_ to learn more about Redis Sentinel. +`_ to learn more about Redis Sentinel. Scan Iterators ^^^^^^^^^^^^^^ @@ -679,7 +679,7 @@ Author ^^^^^^ redis-py is developed and maintained by Andy McCurdy (sedrik@gmail.com). -It can be found here: http://github.com/andymccurdy/redis-py +It can be found here: https://github.com/andymccurdy/redis-py Special thanks to: diff --git a/redis/client.py b/redis/client.py index b6c0da3..ffa3938 100755 --- a/redis/client.py +++ b/redis/client.py @@ -533,7 +533,7 @@ class StrictRedis(object): """ Return a Redis client object configured from the given URL, which must use either `the ``redis://`` scheme - `_ for RESP + `_ for RESP connections or the ``unix://`` scheme for Unix domain sockets. For example:: diff --git a/redis/connection.py b/redis/connection.py index 1190260..9893191 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -799,11 +799,11 @@ class ConnectionPool(object): Three URL schemes are supported: - ```redis://`` - `_ creates a + `_ creates a normal TCP socket connection - ```rediss://`` - `_ creates a - SSL wrapped TCP socket connection + `_ creates + a SSL wrapped TCP socket connection - ``unix://`` creates a Unix Domain Socket connection There are several ways to specify a database number. The parse function @@ -829,6 +829,7 @@ class ConnectionPool(object): True/False, Yes/No values to indicate state. Invalid types cause a ``UserWarning`` to be raised. In the case of conflicting arguments, querystring arguments always win. + """ url_string = url url = urlparse(url) diff --git a/setup.py b/setup.py index 0e8f07d..da0afc5 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ setup( version=__version__, description='Python client for Redis key-value store', long_description=long_description, - url='http://github.com/andymccurdy/redis-py', + url='https://github.com/andymccurdy/redis-py', author='Andy McCurdy', author_email='sedrik@gmail.com', maintainer='Andy McCurdy', -- cgit v1.2.1 From 6cc00db02aebcbb6584a67e40e1a42617e34a6cb Mon Sep 17 00:00:00 2001 From: Jon Dufresne Date: Sat, 21 Oct 2017 13:04:19 -0700 Subject: Sync tox.ini configuration with supported Python versions Per Travis CI configuration and trove classifiers, Python 3.2 is not supported. Remove it from tox.ini. --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index ac086ed..ab7cb7c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] minversion = 1.8 -envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pycodestyle +envlist = {py26,py27,py33,py34,py35,py36}-{plain,hiredis}, pycodestyle [testenv] deps = -- cgit v1.2.1 From f0858228ae0dfd040ed2f701a6db35945f614f0a Mon Sep 17 00:00:00 2001 From: Jon Dufresne Date: Thu, 1 Nov 2018 21:19:03 -0700 Subject: Remove duplicate import of errno --- redis/_compat.py | 1 - 1 file changed, 1 deletion(-) diff --git a/redis/_compat.py b/redis/_compat.py index 32063e7..de856f3 100644 --- a/redis/_compat.py +++ b/redis/_compat.py @@ -13,7 +13,6 @@ if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and # Adapted from https://bugs.python.org/review/23863/patch/14532/54418 import socket import time - import errno from select import select as _select -- cgit v1.2.1 From ff3bbdf903f10fe48bc36b43e4b682e89ad86295 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Thu, 1 Nov 2018 21:48:20 -0700 Subject: added MKSTREAM option to xgroup_create --- redis/client.py | 7 +++++-- tests/test_commands.py | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/redis/client.py b/redis/client.py index 0cb6fca..e095455 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1848,14 +1848,17 @@ class StrictRedis(object): """ return self.execute_command('XDEL', name, *ids) - def xgroup_create(self, name, groupname, id): + def xgroup_create(self, name, groupname, id='$', mkstream=False): """ Create a new consumer group associated with a stream. name: name of the stream. groupname: name of the consumer group. id: ID of the last item in the stream to consider already delivered. """ - return self.execute_command('XGROUP CREATE', name, groupname, id) + pieces = ['XGROUP CREATE', name, groupname, id] + if mkstream: + pieces.append('MKSTREAM') + return self.execute_command(*pieces) def xgroup_delconsumer(self, name, groupname, consumername): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index 1d86e93..6394a7a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1749,6 +1749,28 @@ class TestRedisCommands(object): }] assert r.xinfo_groups(stream) == expected + @skip_if_server_version_lt('5.0.0') + def test_xgroup_create_mkstream(self, r): + # tests xgroup_create and xinfo_groups + stream = 'stream' + group = 'group' + + # an error is raised if a group is created on a stream that + # doesn't already exist + with pytest.raises(exceptions.ResponseError): + r.xgroup_create(stream, group, 0) + + # however, with mkstream=True, the underlying stream is created + # automatically + assert r.xgroup_create(stream, group, 0, mkstream=True) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': b('0-0') + }] + assert r.xinfo_groups(stream) == expected + @skip_if_server_version_lt('5.0.0') def test_xgroup_delconsumer(self, r): stream = 'stream' -- cgit v1.2.1 From e0032fc62634926ff17127e73327b03162f1d2f8 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Sat, 3 Nov 2018 21:12:49 +0200 Subject: Adds SWAPDB No tests were added - consistent with basic operations such as FLUSHDB Signed-off-by: Itamar Haber --- redis/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/redis/client.py b/redis/client.py index 160cfb6..ae4bbd3 100755 --- a/redis/client.py +++ b/redis/client.py @@ -439,7 +439,7 @@ class StrictRedis(object): string_keys_to_dict('ZSCORE ZINCRBY', float_or_none), string_keys_to_dict( 'FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE RENAME ' - 'SAVE SELECT SHUTDOWN SLAVEOF WATCH UNWATCH', + 'SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH', bool_ok ), string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None), @@ -817,6 +817,10 @@ class StrictRedis(object): "Delete all keys in the current database" return self.execute_command('FLUSHDB') + def swapdb(self, first, second): + "Swap two databases" + return self.execute_command('SWAPDB', first, second) + def info(self, section=None): """ Returns a dictionary containing information about the Redis server -- cgit v1.2.1 From f0f0192356ab45479101b6b9098cda10d58f24ff Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Sun, 4 Nov 2018 01:09:37 -0700 Subject: hook for graceful command failure, even in pipelines allow commands that expect 1 or more keys to fail gracefully when 0 keys are provided --- redis/client.py | 31 ++++++++++++++++++++++--------- tests/test_commands.py | 1 + tests/test_pipeline.py | 14 ++++++++++++++ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/redis/client.py b/redis/client.py index a597d3a..e1f1103 100755 --- a/redis/client.py +++ b/redis/client.py @@ -26,6 +26,7 @@ from redis.exceptions import ( ) SYM_EMPTY = b('') +EMPTY_ERROR = 'EMPTY_ERROR' def list_or_args(keys, args): @@ -757,7 +758,12 @@ class StrictRedis(object): def parse_response(self, connection, command_name, **options): "Parses a response from the Redis server" - response = connection.read_response() + try: + response = connection.read_response() + except ResponseError: + if EMPTY_ERROR in options: + return options[EMPTY_ERROR] + raise if command_name in self.response_callbacks: return self.response_callbacks[command_name](response, **options) return response @@ -1120,7 +1126,10 @@ class StrictRedis(object): Returns a list of values ordered identically to ``keys`` """ args = list_or_args(keys, args) - return self.execute_command('MGET', *args) + options = {} + if not args: + options[EMPTY_ERROR] = [] + return self.execute_command('MGET', *args, **options) def mset(self, *args, **kwargs): """ @@ -3214,7 +3223,8 @@ class BasePipeline(object): def _execute_transaction(self, connection, commands, raise_on_error): cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})]) - all_cmds = connection.pack_commands([args for args, _ in cmds]) + all_cmds = connection.pack_commands([args for args, options in cmds + if EMPTY_ERROR not in options]) connection.send_packed_command(all_cmds) errors = [] @@ -3229,12 +3239,15 @@ class BasePipeline(object): # and all the other commands for i, command in enumerate(commands): - try: - self.parse_response(connection, '_') - except ResponseError: - ex = sys.exc_info()[1] - self.annotate_exception(ex, i + 1, command[0]) - errors.append((i, ex)) + if EMPTY_ERROR in command[1]: + errors.append((i, command[1][EMPTY_ERROR])) + else: + try: + self.parse_response(connection, '_') + except ResponseError: + ex = sys.exc_info()[1] + self.annotate_exception(ex, i + 1, command[0]) + errors.append((i, ex)) # parse the EXEC. try: diff --git a/tests/test_commands.py b/tests/test_commands.py index 6394a7a..b6636b0 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -427,6 +427,7 @@ class TestRedisCommands(object): assert set(r.keys(pattern='test*')) == keys def test_mget(self, r): + assert r.mget([]) == [] assert r.mget(['a', 'b']) == [None, None] r['a'] = '1' r['b'] = '2' diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 46fc994..8de08d2 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -114,6 +114,20 @@ class TestPipeline(object): assert pipe.set('z', 'zzz').execute() == [True] assert r['z'] == b('zzz') + def test_command_with_on_error_option_returns_default_value(self, r): + """ + Commands with custom ON_ERROR functionality return their default + values in the pipeline no matter the raise_on_error preference + """ + for error_switch in (True, False): + with r.pipeline() as pipe: + pipe.set('a', 1).mget([]).set('c', 3) + result = pipe.execute(raise_on_error=error_switch) + + assert result[0] + assert result[1] == [] + assert result[2] + def test_parse_error_raised(self, r): with r.pipeline() as pipe: # the zrem is invalid because we don't pass any keys to it -- cgit v1.2.1 From e31f8fbfb673f6b35406992ff3cbb36fd4a50d63 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Sun, 4 Nov 2018 01:37:14 -0700 Subject: add test for non-transaction pipelines as well --- tests/test_pipeline.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 8de08d2..1f3947e 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -114,9 +114,9 @@ class TestPipeline(object): assert pipe.set('z', 'zzz').execute() == [True] assert r['z'] == b('zzz') - def test_command_with_on_error_option_returns_default_value(self, r): + def test_transaction_with_empty_error_command(self, r): """ - Commands with custom ON_ERROR functionality return their default + Commands with custom EMPTY_ERROR functionality return their default values in the pipeline no matter the raise_on_error preference """ for error_switch in (True, False): @@ -128,6 +128,20 @@ class TestPipeline(object): assert result[1] == [] assert result[2] + def test_pipeline_with_empty_error_command(self, r): + """ + Commands with custom EMPTY_ERROR functionality return their default + values in the pipeline no matter the raise_on_error preference + """ + for error_switch in (True, False): + with r.pipeline(transaction=False) as pipe: + pipe.set('a', 1).mget([]).set('c', 3) + result = pipe.execute(raise_on_error=error_switch) + + assert result[0] + assert result[1] == [] + assert result[2] + def test_parse_error_raised(self, r): with r.pipeline() as pipe: # the zrem is invalid because we don't pass any keys to it -- cgit v1.2.1 From f1169b5fbae225032cdca69ed801f880e3ab83df Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Mon, 5 Nov 2018 12:07:14 -0800 Subject: rename empty_errow -> empty_response as the value is the actual response --- redis/client.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/redis/client.py b/redis/client.py index e1f1103..13aa689 100755 --- a/redis/client.py +++ b/redis/client.py @@ -26,7 +26,7 @@ from redis.exceptions import ( ) SYM_EMPTY = b('') -EMPTY_ERROR = 'EMPTY_ERROR' +EMPTY_RESPONSE = 'EMPTY_RESPONSE' def list_or_args(keys, args): @@ -761,8 +761,8 @@ class StrictRedis(object): try: response = connection.read_response() except ResponseError: - if EMPTY_ERROR in options: - return options[EMPTY_ERROR] + if EMPTY_RESPONSE in options: + return options[EMPTY_RESPONSE] raise if command_name in self.response_callbacks: return self.response_callbacks[command_name](response, **options) @@ -1128,7 +1128,7 @@ class StrictRedis(object): args = list_or_args(keys, args) options = {} if not args: - options[EMPTY_ERROR] = [] + options[EMPTY_RESPONSE] = [] return self.execute_command('MGET', *args, **options) def mset(self, *args, **kwargs): @@ -3224,7 +3224,7 @@ class BasePipeline(object): def _execute_transaction(self, connection, commands, raise_on_error): cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})]) all_cmds = connection.pack_commands([args for args, options in cmds - if EMPTY_ERROR not in options]) + if EMPTY_RESPONSE not in options]) connection.send_packed_command(all_cmds) errors = [] @@ -3239,8 +3239,8 @@ class BasePipeline(object): # and all the other commands for i, command in enumerate(commands): - if EMPTY_ERROR in command[1]: - errors.append((i, command[1][EMPTY_ERROR])) + if EMPTY_RESPONSE in command[1]: + errors.append((i, command[1][EMPTY_RESPONSE])) else: try: self.parse_response(connection, '_') -- cgit v1.2.1