summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2018-10-31 07:30:43 -0700
committerAndy McCurdy <andy@andymccurdy.com>2018-10-31 07:30:43 -0700
commit21c56b372bffe9a38d0a57dd6b3faa6a88ef83bb (patch)
tree3ee472f49348b459aebf2e60c43eb8e3ac07af70
parent0d6c5f28ef87c83df5540abc358252cae3d2060e (diff)
parenta32a8e630c25a2a2e8b637ac7af80ba7df048f23 (diff)
downloadredis-py-21c56b372bffe9a38d0a57dd6b3faa6a88ef83bb.tar.gz
Merge branch 'pr/1040'
-rw-r--r--.gitignore1
-rw-r--r--.travis.yml13
-rw-r--r--benchmarks/basic_operations.py1
-rw-r--r--benchmarks/command_packer_benchmark.py8
-rw-r--r--build_tools/.bash_profile1
-rwxr-xr-xbuild_tools/bootstrap.sh (renamed from vagrant/bootstrap.sh)0
-rwxr-xr-xbuild_tools/build_redis.sh (renamed from vagrant/build_redis.sh)2
-rwxr-xr-xbuild_tools/install_redis.sh (renamed from vagrant/install_redis.sh)2
-rwxr-xr-xbuild_tools/install_sentinel.sh (renamed from vagrant/install_sentinel.sh)2
-rw-r--r--build_tools/redis-configs/001-master (renamed from vagrant/redis-configs/001-master)2
-rw-r--r--build_tools/redis-configs/002-slave (renamed from vagrant/redis-configs/002-slave)2
-rwxr-xr-xbuild_tools/redis_init_script (renamed from vagrant/redis_init_script)6
-rwxr-xr-xbuild_tools/redis_vars.sh (renamed from vagrant/redis_vars.sh)4
-rw-r--r--build_tools/sentinel-configs/001-1 (renamed from vagrant/sentinel-configs/001-1)0
-rw-r--r--build_tools/sentinel-configs/002-2 (renamed from vagrant/sentinel-configs/002-2)0
-rw-r--r--build_tools/sentinel-configs/003-3 (renamed from vagrant/sentinel-configs/003-3)0
-rwxr-xr-xbuild_tools/sentinel_init_script (renamed from vagrant/sentinel_init_script)6
-rw-r--r--redis/_compat.py2
-rwxr-xr-xredis/client.py410
-rwxr-xr-xredis/connection.py10
-rw-r--r--setup.cfg2
-rw-r--r--tests/test_commands.py189
-rw-r--r--tox.ini10
-rw-r--r--vagrant/.bash_profile1
-rw-r--r--vagrant/Vagrantfile10
25 files changed, 641 insertions, 43 deletions
diff --git a/.gitignore b/.gitignore
index 4736334..ab39968 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@ vagrant/.vagrant
.python-version
.cache
.eggs
+.idea \ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index fcbdbb9..4c4e951 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -7,19 +7,20 @@ python:
- "3.3"
- "2.7"
- "2.6"
-services:
- - redis-server
+before_install:
+ - wget http://download.redis.io/releases/redis-5.0.0.tar.gz && mkdir redis_install && tar -xvzf redis-5.0.0.tar.gz -C redis_install && cd redis_install/redis-5.0.0 && make && src/redis-server --daemonize yes && cd ../..
+ - redis-cli info
env:
- TEST_HIREDIS=0
- TEST_HIREDIS=1
install:
- pip install -e .
- - "if [[ $TEST_PEP8 == '1' ]]; then pip install pep8; fi"
+ - "if [[ $TEST_PYCODESTYLE == '1' ]]; then pip install pycodestyle; fi"
- "if [[ $TEST_HIREDIS == '1' ]]; then pip install hiredis; fi"
-script: "if [[ $TEST_PEP8 == '1' ]]; then pep8 --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg .; else python setup.py test; fi"
+script: "if [[ $TEST_PYCODESTYLE == '1' ]]; then pycodestyle --repeat --show-source --exclude=.venv,.tox,dist,docs,build,*.egg,redis_install .; else python setup.py test; fi"
matrix:
include:
- python: "2.7"
- env: TEST_PEP8=1
+ env: TEST_PYCODESTYLE=1
- python: "3.6"
- env: TEST_PEP8=1
+ env: TEST_PYCODESTYLE=1
diff --git a/benchmarks/basic_operations.py b/benchmarks/basic_operations.py
index 2f1e9f4..c50a610 100644
--- a/benchmarks/basic_operations.py
+++ b/benchmarks/basic_operations.py
@@ -195,5 +195,6 @@ def hmset(conn, num, pipeline_size, data_size):
if pipeline_size > 1:
conn.execute()
+
if __name__ == '__main__':
run()
diff --git a/benchmarks/command_packer_benchmark.py b/benchmarks/command_packer_benchmark.py
index 49f48f2..9eb1853 100644
--- a/benchmarks/command_packer_benchmark.py
+++ b/benchmarks/command_packer_benchmark.py
@@ -22,9 +22,9 @@ class StringJoiningConnection(Connection):
_errno, errmsg = e.args
raise ConnectionError("Error %s while writing to socket. %s." %
(_errno, errmsg))
- except:
+ except Exception as e:
self.disconnect()
- raise
+ raise e
def pack_command(self, *args):
"Pack a series of arguments into a value Redis command"
@@ -54,9 +54,9 @@ class ListJoiningConnection(Connection):
_errno, errmsg = e.args
raise ConnectionError("Error %s while writing to socket. %s." %
(_errno, errmsg))
- except:
+ except Exception as e:
self.disconnect()
- raise
+ raise e
def pack_command(self, *args):
output = []
diff --git a/build_tools/.bash_profile b/build_tools/.bash_profile
new file mode 100644
index 0000000..b023cf7
--- /dev/null
+++ b/build_tools/.bash_profile
@@ -0,0 +1 @@
+PATH=$PATH:/var/lib/redis/bin
diff --git a/vagrant/bootstrap.sh b/build_tools/bootstrap.sh
index a5a0d2c..a5a0d2c 100755
--- a/vagrant/bootstrap.sh
+++ b/build_tools/bootstrap.sh
diff --git a/vagrant/build_redis.sh b/build_tools/build_redis.sh
index 728e617..379c6cc 100755
--- a/vagrant/build_redis.sh
+++ b/build_tools/build_redis.sh
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
-source /home/vagrant/redis-py/vagrant/redis_vars.sh
+source /home/vagrant/redis-py/build_tools/redis_vars.sh
pushd /home/vagrant
diff --git a/vagrant/install_redis.sh b/build_tools/install_redis.sh
index bb5f1d2..fd53a1c 100755
--- a/vagrant/install_redis.sh
+++ b/build_tools/install_redis.sh
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
-source /home/vagrant/redis-py/vagrant/redis_vars.sh
+source /home/vagrant/redis-py/build_tools/redis_vars.sh
for filename in `ls $VAGRANT_REDIS_CONF_DIR`; do
# cuts the order prefix off of the filename, e.g. 001-master -> master
diff --git a/vagrant/install_sentinel.sh b/build_tools/install_sentinel.sh
index 58cd808..0597208 100755
--- a/vagrant/install_sentinel.sh
+++ b/build_tools/install_sentinel.sh
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
-source /home/vagrant/redis-py/vagrant/redis_vars.sh
+source /home/vagrant/redis-py/build_tools/redis_vars.sh
for filename in `ls $VAGRANT_SENTINEL_CONF_DIR`; do
# cuts the order prefix off of the filename, e.g. 001-master -> master
diff --git a/vagrant/redis-configs/001-master b/build_tools/redis-configs/001-master
index f04f23d..8591f1a 100644
--- a/vagrant/redis-configs/001-master
+++ b/build_tools/redis-configs/001-master
@@ -5,4 +5,4 @@ daemonize yes
unixsocket /tmp/redis_master.sock
unixsocketperm 777
dbfilename master.rdb
-dir /home/vagrant/redis/backups
+dir /var/lib/redis/backups
diff --git a/vagrant/redis-configs/002-slave b/build_tools/redis-configs/002-slave
index 5d302fe..13eb77e 100644
--- a/vagrant/redis-configs/002-slave
+++ b/build_tools/redis-configs/002-slave
@@ -5,6 +5,6 @@ daemonize yes
unixsocket /tmp/redis-slave.sock
unixsocketperm 777
dbfilename slave.rdb
-dir /home/vagrant/redis/backups
+dir /var/lib/redis/backups
slaveof 127.0.0.1 6379
diff --git a/vagrant/redis_init_script b/build_tools/redis_init_script
index e8bfa08..04cb2db 100755
--- a/vagrant/redis_init_script
+++ b/build_tools/redis_init_script
@@ -12,10 +12,10 @@
REDISPORT={{ PORT }}
PIDFILE=/var/run/{{ PROCESS_NAME }}.pid
-CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf
+CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf
-EXEC=/home/vagrant/redis/bin/redis-server
-CLIEXEC=/home/vagrant/redis/bin/redis-cli
+EXEC=/var/lib/redis/bin/redis-server
+CLIEXEC=/var/lib/redis/bin/redis-cli
case "$1" in
start)
diff --git a/vagrant/redis_vars.sh b/build_tools/redis_vars.sh
index 1ec6295..c52dd4c 100755
--- a/vagrant/redis_vars.sh
+++ b/build_tools/redis_vars.sh
@@ -1,13 +1,13 @@
#!/usr/bin/env bash
-VAGRANT_DIR=/home/vagrant/redis-py/vagrant
+VAGRANT_DIR=/home/vagrant/redis-py/build_tools
VAGRANT_REDIS_CONF_DIR=$VAGRANT_DIR/redis-configs
VAGRANT_SENTINEL_CONF_DIR=$VAGRANT_DIR/sentinel-configs
REDIS_VERSION=3.2.0
REDIS_DOWNLOAD_DIR=/home/vagrant/redis-downloads
REDIS_PACKAGE=redis-$REDIS_VERSION.tar.gz
REDIS_BUILD_DIR=$REDIS_DOWNLOAD_DIR/redis-$REDIS_VERSION
-REDIS_DIR=/home/vagrant/redis
+REDIS_DIR=/var/lib/redis
REDIS_BIN_DIR=$REDIS_DIR/bin
REDIS_CONF_DIR=$REDIS_DIR/conf
REDIS_SAVE_DIR=$REDIS_DIR/backups
diff --git a/vagrant/sentinel-configs/001-1 b/build_tools/sentinel-configs/001-1
index eccc3d1..eccc3d1 100644
--- a/vagrant/sentinel-configs/001-1
+++ b/build_tools/sentinel-configs/001-1
diff --git a/vagrant/sentinel-configs/002-2 b/build_tools/sentinel-configs/002-2
index 0cd2801..0cd2801 100644
--- a/vagrant/sentinel-configs/002-2
+++ b/build_tools/sentinel-configs/002-2
diff --git a/vagrant/sentinel-configs/003-3 b/build_tools/sentinel-configs/003-3
index c7f4fcd..c7f4fcd 100644
--- a/vagrant/sentinel-configs/003-3
+++ b/build_tools/sentinel-configs/003-3
diff --git a/vagrant/sentinel_init_script b/build_tools/sentinel_init_script
index ea93537..1d94804 100755
--- a/vagrant/sentinel_init_script
+++ b/build_tools/sentinel_init_script
@@ -12,10 +12,10 @@
SENTINELPORT={{ PORT }}
PIDFILE=/var/run/{{ PROCESS_NAME }}.pid
-CONF=/home/vagrant/redis/conf/{{ PROCESS_NAME }}.conf
+CONF=/var/lib/redis/conf/{{ PROCESS_NAME }}.conf
-EXEC=/home/vagrant/redis/bin/redis-sentinel
-CLIEXEC=/home/vagrant/redis/bin/redis-cli
+EXEC=/var/lib/redis/bin/redis-sentinel
+CLIEXEC=/var/lib/redis/bin/redis-cli
case "$1" in
start)
diff --git a/redis/_compat.py b/redis/_compat.py
index 307f3cc..32063e7 100644
--- a/redis/_compat.py
+++ b/redis/_compat.py
@@ -4,7 +4,7 @@ import sys
try:
InterruptedError = InterruptedError
-except:
+except NameError:
InterruptedError = OSError
# For Python older than 3.5, retry EINTR.
diff --git a/redis/client.py b/redis/client.py
index 79e94d0..6108087 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -195,7 +195,7 @@ def pairs_to_dict_typed(response, type_info):
if key in type_info:
try:
value = type_info[key](value)
- except:
+ except Exception:
# if for some reason the value can't be coerced, just use
# the string value
pass
@@ -232,6 +232,76 @@ def int_or_none(response):
return int(response)
+def stream_list(response):
+ if response is None:
+ return None
+ return [(r[0], pairs_to_dict(r[1])) for r in response]
+
+
+def parse_recursive_dict(response):
+ if response is None:
+ return None
+ result = {}
+ while response:
+ k = response.pop(0)
+ v = response.pop(0)
+ if isinstance(v, list):
+ v = parse_recursive_dict(v)
+ result[k] = v
+ return result
+
+
+def parse_list_of_recursive_dicts(response):
+ if response is None:
+ return None
+ result = []
+ for group in response:
+ result.append(parse_recursive_dict(group))
+ return result
+
+
+def parse_xclaim(response):
+ if all(isinstance(r, (basestring, bytes)) for r in response):
+ return response
+ return stream_list(response)
+
+
+def parse_xread(response):
+ if response is None:
+ return []
+ return [[nativestr(r[0]), stream_list(r[1])] for r in response]
+
+
+def parse_xpending(response, **options):
+ if isinstance(response, list):
+ if options.get('parse_detail', False):
+ return parse_range_xpending(response)
+ consumers = []
+ for consumer_name, consumer_pending in response[3]:
+ consumers.append({
+ 'name': consumer_name,
+ 'pending': consumer_pending
+ })
+ return {
+ 'pending': response[0],
+ 'lower': response[1],
+ 'upper': response[2],
+ 'consumers': consumers
+ }
+
+
+def parse_range_xpending(response):
+ result = []
+ for message in response:
+ result.append({
+ 'message_id': message[0],
+ 'consumer': message[1],
+ 'time_since_delivered': message[2],
+ 'times_delivered': message[3]
+ })
+ return result
+
+
def float_or_none(response):
if response is None:
return None
@@ -366,7 +436,26 @@ class StrictRedis(object):
'LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD SCARD SDIFFSTORE '
'SETBIT SETRANGE SINTERSTORE SREM STRLEN SUNIONSTORE ZADD ZCARD '
'ZLEXCOUNT ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE '
- 'GEOADD',
+ 'GEOADD XLEN',
+ int
+ ),
+ string_keys_to_dict('XREVRANGE XRANGE', stream_list),
+ string_keys_to_dict('XPENDING', parse_xpending),
+ string_keys_to_dict('XREAD XREADGROUP', parse_xread),
+ {
+ 'XGROUP CREATE': bool_ok,
+ 'XGROUP DESTROY': int,
+ 'XGROUP SETID': bool_ok,
+ 'XGROUP DELCONSUMER': int
+ },
+ {
+ 'XINFO STREAM': parse_recursive_dict,
+ 'XINFO CONSUMERS': parse_list_of_recursive_dicts,
+ 'XINFO GROUPS': parse_list_of_recursive_dicts
+ },
+ string_keys_to_dict('XCLAIM', parse_xclaim),
+ string_keys_to_dict(
+ 'XACK XDEL XTRIM',
int
),
string_keys_to_dict(
@@ -1675,6 +1764,323 @@ class StrictRedis(object):
args = list_or_args(keys, args)
return self.execute_command('SUNIONSTORE', dest, *args)
+ # STREAMS COMMANDS
+ def xadd(self, _name, fields, id='*', maxlen=None, approximate=True):
+ """
+ Add to a stream.
+ _name: name of the stream (not using 'name' as this would
+ prevent 'name' used in the kwargs
+ fields: dict of field/value pairs to insert into the stream
+ id: Location to insert this record. By default it is appended.
+ maxlen: truncate old stream members beyond this size
+ approximate: actual stream length may be slightly more than maxlen
+
+ """
+ pieces = []
+ if maxlen is not None:
+ if not isinstance(maxlen, (int, long)) or maxlen < 1:
+ raise RedisError('XADD maxlen must be a positive integer')
+ pieces.append(Token.get_token('MAXLEN'))
+ if approximate:
+ pieces.append(Token.get_token('~'))
+ pieces.append(str(maxlen))
+ pieces.append(id)
+ if not isinstance(fields, dict) or len(fields) == 0:
+ raise RedisError('XADD fields must be a non-empty dict')
+ for pair in iteritems(fields):
+ pieces.extend(pair)
+ return self.execute_command('XADD', _name, *pieces)
+
+ def xrange(self, name, start='-', finish='+', count=None):
+ """
+ Read stream values within an interval.
+ name: name of the stream.
+ start: first stream ID. defaults to '-',
+ meaning the earliest available.
+ finish: last stream ID. defaults to '+',
+ meaning the latest available.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ """
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, (int, long)) or count < 1:
+ raise RedisError('XRANGE count must be a positive integer')
+ pieces.append(Token.get_token('COUNT'))
+ pieces.append(str(count))
+
+ return self.execute_command('XRANGE', name, *pieces)
+
+ def xrevrange(self, name, start='+', finish='-', count=None):
+ """
+ Read stream values within an interval, in reverse order.
+ name: name of the stream
+ start: first stream ID. defaults to '+',
+ meaning the latest available.
+ finish: last stream ID. defaults to '-',
+ meaning the earliest available.
+ count: if set, only return this many items, beginning with the
+ latest available.
+ """
+ pieces = [start, finish]
+ if count is not None:
+ if not isinstance(count, (int, long)) or count < 1:
+ raise RedisError('XREVRANGE count must be a positive integer')
+ pieces.append(Token.get_token('COUNT'))
+ pieces.append(str(count))
+
+ return self.execute_command('XREVRANGE', name, *pieces)
+
+ def xlen(self, name):
+ """
+ Returns the number of elements in a given stream.
+ """
+ return self.execute_command('XLEN', name)
+
+ def xread(self, streams, count=None, block=None):
+ """
+ Block and monitor multiple streams for new data.
+ streams: a dict of stream names to stream IDs, where
+ IDs indicate the last ID already seen.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ block: number of milliseconds to wait, if nothing already present.
+ """
+ pieces = []
+ if block is not None:
+ if not isinstance(block, (int, long)) or block < 0:
+ raise RedisError('XREAD block must be a non-negative integer')
+ pieces.append(Token.get_token('BLOCK'))
+ pieces.append(str(block))
+ if count is not None:
+ if not isinstance(count, (int, long)) or count < 1:
+ raise RedisError('XREAD count must be a positive integer')
+ pieces.append(Token.get_token('COUNT'))
+ pieces.append(str(count))
+ if not isinstance(streams, dict) or len(streams) == 0:
+ raise RedisError('XREAD streams must be a non empty dict')
+ pieces.append(Token.get_token('STREAMS'))
+ pieces.extend(streams.keys())
+ pieces.extend(streams.values())
+ return self.execute_command('XREAD', *pieces)
+
+ def xgroup_create(self, name, groupname, id):
+ """
+ Create a new consumer group associated with a stream.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ id: ID of the last item in the stream to consider already delivered.
+ """
+ return self.execute_command('XGROUP CREATE', name, groupname, id)
+
+ def xgroup_destroy(self, name, groupname):
+ """
+ Destroy a consumer group.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ """
+ return self.execute_command('XGROUP DESTROY', name, groupname)
+
+ def xgroup_setid(self, name, groupname, id):
+ """
+ Set the consumer group last delivered ID to something else.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ id: ID of the last item in the stream to consider already delivered.
+ """
+ return self.execute_command('XGROUP SETID', name, groupname, id)
+
+ def xgroup_delconsumer(self, name, groupname, consumername):
+ """
+ Remove a specific consumer from a consumer group.
+ Returns the number of pending messages that the consumer had before it
+ was deleted.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of consumer to delete
+ """
+ return self.execute_command('XGROUP DELCONSUMER', name, groupname,
+ consumername)
+
+ def xinfo_stream(self, name):
+ """
+ Returns general information about the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO STREAM', name)
+
+ def xinfo_consumers(self, name, groupname):
+ """
+ Returns general information about the consumers in the group.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ """
+ return self.execute_command('XINFO CONSUMERS', name, groupname)
+
+ def xinfo_groups(self, name):
+ """
+ Returns general information about the consumer groups of the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO GROUPS', name)
+
+ def xack(self, name, groupname, *ids):
+ """
+ Acknowledges the successful processing of one or more messages.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ *ids: message ids to acknowlege.
+ """
+ return self.execute_command('XACK', name, groupname, *ids)
+
+ def xdel(self, name, *ids):
+ """
+ Deletes one or more messages from a stream.
+ name: name of the stream.
+ *ids: message ids to delete.
+ """
+ return self.execute_command('XDEL', name, *ids)
+
+ def xtrim(self, name, maxlen, approximate=True):
+ """
+ Trims old messages from a stream.
+ name: name of the stream.
+ maxlen: truncate old stream messages beyond this size
+ approximate: actual stream length may be slightly more than maxlen
+ """
+ pieces = [Token.get_token('MAXLEN')]
+ if approximate:
+ pieces.append(Token.get_token('~'))
+ pieces.append(maxlen)
+ return self.execute_command('XTRIM', name, *pieces)
+
+ def xreadgroup(self, groupname, consumername, streams, count=None,
+ block=None):
+ """
+ Read from a stream via a consumer group.
+ groupname: name of the consumer group.
+ consumername: name of the requesting consumer.
+ streams: a dict of stream names to stream IDs, where
+ IDs indicate the last ID already seen.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ block: number of milliseconds to wait, if nothing already present.
+ """
+ pieces = [Token.get_token('GROUP'), groupname, consumername]
+ if count is not None:
+ if not isinstance(count, (int, long)) or count < 1:
+ raise RedisError("XREADGROUP count must be a positive integer")
+ pieces.append(Token.get_token("COUNT"))
+ pieces.append(str(count))
+ if block is not None:
+ if not isinstance(block, (int, long)) or block < 0:
+ raise RedisError("XREADGROUP block must be a non-negative "
+ "integer")
+ pieces.append(Token.get_token("BLOCK"))
+ pieces.append(str(block))
+ if not isinstance(streams, dict) or len(streams) == 0:
+ raise RedisError('XREADGROUP streams must be a non empty dict')
+ pieces.append(Token.get_token('STREAMS'))
+ pieces.extend(streams.keys())
+ pieces.extend(streams.values())
+ return self.execute_command('XREADGROUP', *pieces)
+
+ def xpending(self, name, groupname):
+ """
+ Returns information about pending messages of a group.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ """
+ return self.execute_command('XPENDING', name, groupname)
+
+ def xpending_range(self, name, groupname, start='-', end='+', count=-1,
+ consumername=None):
+ """
+ Returns information about pending messages, in a range.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ start: first stream ID. defaults to '-',
+ meaning the earliest available.
+ finish: last stream ID. defaults to '+',
+ meaning the latest available.
+ count: if set, only return this many items, beginning with the
+ earliest available.
+ consumername: name of a consumer to filter by (optional).
+ """
+ pieces = [name, groupname]
+ if start is not None or end is not None or count is not None:
+ if start is None or end is None or count is None:
+ raise RedisError("XPENDING must be provided with start, end "
+ "and count parameters, or none of them. ")
+ if not isinstance(count, (int, long)) or count < -1:
+ raise RedisError("XPENDING count must be a integer >= -1")
+ pieces.extend((start, end, str(count)))
+ if consumername is not None:
+ if start is None or end is None or count is None:
+ raise RedisError("if XPENDING is provided with consumername,"
+ " it must be provided with start, end and"
+ " count parameters")
+ pieces.append(consumername)
+ return self.execute_command('XPENDING', *pieces, parse_detail=True)
+
+ def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
+ idle=None, time=None, retrycount=None, force=False,
+ justid=False):
+ """
+ Changes the ownership of a pending message.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ consumername: name of a consumer that claims the message.
+ min_idle_time: filter messages that were idle less than this amount of
+ milliseconds
+ message_ids: non-empty list or tuple of message IDs to claim
+ idle: optional. Set the idle time (last time it was delivered) of the
+ message in ms
+ time: optional integer. This is the same as idle but instead of a
+ relative amount of milliseconds, it sets the idle time to a specific
+ Unix time (in milliseconds).
+ retrycount: optional integer. set the retry counter to the specified
+ value. This counter is incremented every time a message is delivered
+ again.
+ force: optional boolean, false by default. Creates the pending message
+ entry in the PEL even if certain specified IDs are not already in the
+ PEL assigned to a different client.
+ justid: optional boolean, false by default. Return just an array of IDs
+ of messages successfully claimed, without returning the actual message
+ """
+ if not isinstance(min_idle_time, (int, long)) or min_idle_time < 0:
+ raise RedisError("XCLAIM min_idle_time must be a non negative "
+ "integer")
+ if not isinstance(message_ids, (list, tuple)) or not message_ids:
+ raise RedisError("XCLAIM message_ids must be a non empty list or "
+ "tuple of message IDs to claim")
+
+ pieces = [name, groupname, consumername, str(min_idle_time)]
+ pieces.extend(list(message_ids))
+
+ if idle is not None:
+ if not isinstance(idle, (int, long)):
+ raise RedisError("XCLAIM idle must be an integer")
+ pieces.extend((Token.get_token('IDLE'), str(idle)))
+ if time is not None:
+ if not isinstance(time, (int, long)):
+ raise RedisError("XCLAIM time must be an integer")
+ pieces.extend((Token.get_token('TIME'), str(time)))
+ if retrycount is not None:
+ if not isinstance(retrycount, (int, long)):
+ raise RedisError("XCLAIM retrycount must be an integer")
+ pieces.extend((Token.get_token('RETRYCOUNT'), str(retrycount)))
+
+ if force:
+ if not isinstance(force, bool):
+ raise RedisError("XCLAIM force must be a boolean")
+ pieces.append(Token.get_token('FORCE'))
+ if justid:
+ if not isinstance(justid, bool):
+ raise RedisError("XCLAIM justid must be a boolean")
+ pieces.append(Token.get_token('JUSTID'))
+ return self.execute_command('XCLAIM', *pieces)
+
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
"""
diff --git a/redis/connection.py b/redis/connection.py
index 6ad467a..1190260 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -236,7 +236,7 @@ class SocketBuffer(object):
try:
self.purge()
self._buffer.close()
- except:
+ except Exception:
# issue #633 suggests the purge/close somehow raised a
# BadFileDescriptor error. Perhaps the client ran out of
# memory or something else? It's probably OK to ignore
@@ -602,9 +602,9 @@ class Connection(object):
errmsg = e.args[1]
raise ConnectionError("Error %s while writing to socket. %s." %
(errno, errmsg))
- except:
+ except Exception as e:
self.disconnect()
- raise
+ raise e
def send_command(self, *args):
"Pack and send a command to the Redis server"
@@ -623,9 +623,9 @@ class Connection(object):
"Read the response from a previously sent command"
try:
response = self._parser.read_response()
- except:
+ except Exception as e:
self.disconnect()
- raise
+ raise e
if isinstance(response, ResponseError):
raise response
return response
diff --git a/setup.cfg b/setup.cfg
index cdae7c4..089b37c 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,4 +1,4 @@
-[pep8]
+[pycodestyle]
show-source = 1
exclude = .venv,.tox,dist,docs,build,*.egg
diff --git a/tests/test_commands.py b/tests/test_commands.py
index b9b9b66..9635732 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -1603,6 +1603,195 @@ class TestRedisCommands(object):
class TestStrictCommands(object):
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xrange(self, sr):
+ varname = 'xrange_test'
+ sr.delete(varname)
+ assert sr.xlen(varname) == 0
+ stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4)
+ assert sr.xlen(varname) == 1
+ stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"})
+ assert sr.xlen(varname) == 2
+ assert stamp1 != stamp2
+
+ milli, offset = stamp2.decode('utf-8').split('-')
+ new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8')
+ stamp3 = sr.xadd(varname, {"foo": "bar"}, id=new_id)
+ assert sr.xlen(varname) == 3
+ assert stamp3 == new_id
+ stamp4 = sr.xadd(varname, {"foo": "baz"})
+ assert sr.xlen(varname) == 4
+
+ def get_ids(results):
+ return [result[0] for result in results]
+
+ results = sr.xrange(varname, start=stamp1)
+ assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4]
+
+ results = sr.xrange(varname, start=stamp2, finish=stamp3)
+ assert get_ids(results) == [stamp2, stamp3]
+
+ results = sr.xrange(varname, finish=stamp3)
+ assert get_ids(results) == [stamp1, stamp2, stamp3]
+
+ results = sr.xrange(varname, finish=stamp2, count=1)
+ assert get_ids(results) == [stamp1]
+
+ results = sr.xrevrange(varname, start=stamp4)
+ assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1]
+
+ results = sr.xrevrange(varname, start=stamp3, finish=stamp2)
+ assert get_ids(results) == [stamp3, stamp2]
+
+ results = sr.xrevrange(varname, finish=stamp3)
+ assert get_ids(results) == [stamp4, stamp3]
+
+ results = sr.xrevrange(varname, finish=stamp2, count=1)
+ assert get_ids(results) == [stamp4]
+
+ assert sr.xlen(varname) == 4
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xread(self, sr):
+ varname = 'xread_test'
+ sr.delete(varname)
+ stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4)
+ stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"})
+ assert stamp1 != stamp2
+
+ results = sr.xread(streams={varname: '$'}, count=10, block=10)
+ assert results == []
+
+ results = sr.xread(count=3, block=0, streams={varname: stamp1})
+ assert results[0][1][0][0] == stamp2
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xgroup(self, sr):
+ stream_name = 'xgroup_test_stream'
+ sr.delete(stream_name)
+ group_name = 'xgroup_test_group'
+ message = {'name': 'boaty', 'other': 'mcboatface'}
+ b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')}
+
+ stamp1 = sr.xadd(stream_name, message)
+ assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')]
+
+ assert sr.xinfo_groups(name=stream_name) == []
+ assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$')
+ assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name)
+
+ with pytest.raises(redis.ResponseError):
+ sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0')
+ with pytest.raises(redis.ResponseError):
+ sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0')
+ assert sr.xinfo_groups(name=stream_name)[0][
+ b('last-delivered-id')] == b(stamp1)
+ assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0')
+ assert sr.xinfo_groups(name=stream_name)[0][
+ b('last-delivered-id')] == b('0-0')
+
+ consumer_name = 'captain_jack_sparrow'
+
+ expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]]
+ assert sr.xreadgroup(groupname=group_name,
+ consumername=consumer_name,
+ streams={stream_name: '0'}) == expected_value
+
+ assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1
+ sr.xgroup_delconsumer(stream_name, group_name, consumer_name)
+ assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0
+
+ assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xack(self, sr):
+ stream_name = 'xack_test_stream'
+ sr.delete(stream_name)
+ group_name = 'xack_test_group'
+
+ assert sr.xack(stream_name, group_name, 0) == 0
+ assert sr.xack(stream_name, group_name, '1-1') == 0
+ assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xdel(self, sr):
+ stream_name = 'xdel_test_stream'
+ sr.delete(stream_name)
+
+ assert sr.xdel(stream_name, 1) == 0
+
+ sr.xadd(stream_name, {"foo": "bar"}, id=1)
+ assert sr.xdel(stream_name, 1) == 1
+
+ stamp = sr.xadd(stream_name, {"baz": "qaz"})
+ assert sr.xdel(stream_name, 1, stamp) == 1
+ assert sr.xdel(stream_name, 1, stamp, 42) == 0
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xtrim(self, sr):
+ stream_name = 'xtrim_test_stream'
+ sr.delete(stream_name)
+
+ assert sr.xtrim(stream_name, 1000) == 0
+
+ for i in range(300):
+ sr.xadd(stream_name, {"index": i})
+
+ assert sr.xtrim(stream_name, 1000, approximate=False) == 0
+ assert sr.xtrim(stream_name, 300) == 0
+ assert sr.xtrim(stream_name, 299) == 0
+ assert sr.xtrim(stream_name, 234) == 0
+ assert sr.xtrim(stream_name, 234, approximate=False) == 66
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xack(self, sr):
+ stream_name = 'xack_test_stream'
+ sr.delete(stream_name)
+ group_name = 'xack_test_group'
+
+ assert sr.xack(stream_name, group_name, 0) == 0
+ assert sr.xack(stream_name, group_name, '1-1') == 0
+ assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xclaim(self, sr):
+ stream_name = 'xclaim_test_stream'
+ group_name = 'xclaim_test_consumer_group'
+ sr.delete(stream_name)
+
+ stamp = sr.xadd(stream_name, {"john": "wick"})
+ sr.xgroup_create(stream_name, group_name, id='0')
+ sr.xreadgroup(group_name, 'action_movie_consumer',
+ streams={stream_name: 0})
+ assert sr.xinfo_consumers(stream_name, group_name)[0][
+ b('name')] == b('action_movie_consumer')
+ assert sr.xclaim(stream_name, group_name, 'reeves_fan',
+ min_idle_time=0, message_ids=(stamp,))[0][0] == stamp
+ assert sr.xclaim(stream_name, group_name, 'action_movie_consumer',
+ min_idle_time=0, message_ids=(stamp,),
+ justid=True) == [b(stamp), ]
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_strict_xpending(self, sr):
+ stream_name = 'xpending_test_stream'
+ group_name = 'xpending_test_consumer_group'
+ consumer_name = 'marie'
+ sr.delete(stream_name)
+
+ sr.xadd(stream_name, {"foo": "bar"})
+ sr.xgroup_create(stream_name, group_name, id='0')
+ sr.xreadgroup(group_name, consumer_name,
+ streams={stream_name: 0})
+ response = sr.xpending(stream_name, group_name)
+ assert sorted(response.keys()) == ['consumers', 'lower', 'pending',
+ 'upper']
+
+ response = sr.xpending_range(stream_name, group_name,
+ consumername=consumer_name)
+ assert sorted(response[0].keys()) == ['consumer', 'message_id',
+ 'time_since_delivered',
+ 'times_delivered']
+
def test_strict_zadd(self, sr):
sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0)
assert sr.zrange('a', 0, -1, withscores=True) == \
diff --git a/tox.ini b/tox.ini
index a0945d2..ac086ed 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,6 @@
[tox]
minversion = 1.8
-envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pep8
+envlist = {py26,py27,py32,py33,py34,py35,py36}-{plain,hiredis}, pycodestyle
[testenv]
deps =
@@ -9,9 +9,9 @@ deps =
hiredis: hiredis >= 0.1.3
commands = py.test {posargs}
-[testenv:pep8]
-basepython = python2.6
-deps = pep8
-commands = pep8
+[testenv:pycodestyle]
+basepython = python3.6
+deps = pycodestyle
+commands = pycodestyle
skipsdist = true
skip_install = true
diff --git a/vagrant/.bash_profile b/vagrant/.bash_profile
deleted file mode 100644
index e3d9bca..0000000
--- a/vagrant/.bash_profile
+++ /dev/null
@@ -1 +0,0 @@
-PATH=$PATH:/home/vagrant/redis/bin
diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile
index 7465ccd..3ee7aee 100644
--- a/vagrant/Vagrantfile
+++ b/vagrant/Vagrantfile
@@ -12,11 +12,11 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.synced_folder "../", "/home/vagrant/redis-py"
# install the redis server
- config.vm.provision :shell, :path => "bootstrap.sh"
- config.vm.provision :shell, :path => "build_redis.sh"
- config.vm.provision :shell, :path => "install_redis.sh"
- config.vm.provision :shell, :path => "install_sentinel.sh"
- config.vm.provision :file, :source => ".bash_profile", :destination => "/home/vagrant/.bash_profile"
+ config.vm.provision :shell, :path => "../build_tools/bootstrap.sh"
+ config.vm.provision :shell, :path => "../build_tools/build_redis.sh"
+ config.vm.provision :shell, :path => "../build_tools/install_redis.sh"
+ config.vm.provision :shell, :path => "../build_tools/install_sentinel.sh"
+ config.vm.provision :file, :source => "../build_tools/.bash_profile", :destination => "/home/vagrant/.bash_profile"
# setup forwarded ports
config.vm.network "forwarded_port", guest: 6379, host: 6379