summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis/client.py15
-rw-r--r--tests/test_connection_pool.py31
2 files changed, 40 insertions, 6 deletions
diff --git a/redis/client.py b/redis/client.py
index 91807b1..136aade 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -2257,12 +2257,15 @@ class BasePipeline(object):
except ConnectionError:
conn.disconnect()
# if we're not already watching, we can safely retry the command
- # assuming it was a connection timeout
- if not self.watching:
- conn.send_command(*args)
- return self.parse_response(conn, command_name, **options)
- self.reset()
- raise
+ try:
+ if not self.watching:
+ conn.send_command(*args)
+ return self.parse_response(conn, command_name, **options)
+ except ConnectionError:
+ # the retry failed so cleanup.
+ conn.disconnect()
+ self.reset()
+ raise
def pipeline_execute_command(self, *args, **options):
"""
diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py
index 10e11ab..b4b6268 100644
--- a/tests/test_connection_pool.py
+++ b/tests/test_connection_pool.py
@@ -187,3 +187,34 @@ class TestConnection(object):
pool = r.connection_pool
assert len(pool._available_connections) == 1
assert not pool._available_connections[0]._sock
+
+ @skip_if_server_version_lt('2.8.8')
+ def test_busy_loading_from_pipeline_immediate_command(self, r):
+ """
+ BusyLoadingErrors should raise from Pipelines that execute a
+ command immediately, like WATCH does.
+ """
+ pipe = r.pipeline()
+ with pytest.raises(redis.BusyLoadingError):
+ pipe.immediate_execute_command('DEBUG', 'ERROR',
+ 'LOADING fake message')
+ pool = r.connection_pool
+ assert not pipe.connection
+ assert len(pool._available_connections) == 1
+ assert not pool._available_connections[0]._sock
+
+ # TODO: This fails on hiredis. need to think about this
+ # @skip_if_server_version_lt('2.8.8')
+ # def test_busy_loading_from_pipeline(self, r):
+ # """
+ # BusyLoadingErrors should be raised from a pipeline execution
+ # regardless of the raise_on_error flag.
+ # """
+ # pipe = r.pipeline()
+ # pipe.execute_command('DEBUG', 'ERROR', 'LOADING fake message')
+ # with pytest.raises(redis.BusyLoadingError):
+ # pipe.execute()
+ # pool = r.connection_pool
+ # assert not pipe.connection
+ # assert len(pool._available_connections) == 1
+ # assert not pool._available_connections[0]._sock