summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis/client.py70
-rw-r--r--tests/pipeline.py30
2 files changed, 87 insertions, 13 deletions
diff --git a/redis/client.py b/redis/client.py
index d0f7516..f5a3447 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -225,7 +225,7 @@ class Redis(threading.local):
self.connection.disconnect()
return self._execute_command(command_name, command, **options)
- def _parse_response(self, command_name):
+ def _parse_response(self, command_name, catch_errors):
conn = self.connection
response = conn.read().strip()
if not response:
@@ -261,13 +261,27 @@ class Redis(threading.local):
length = int(response)
if length == -1:
return None
- return [self._parse_response(command_name) for i in range(length)]
+ if not catch_errors:
+ return [self._parse_response(command_name, catch_errors)
+ for i in range(length)]
+ else:
+ # for pipelines, we need to read everything, including response errors.
+ # otherwise we'd completely mess up the receive buffer
+ data = []
+ for i in range(length):
+ try:
+ data.append(
+ self._parse_response(command_name, catch_errors)
+ )
+ except Exception, e:
+ data.append(e)
+ return data
raise InvalidResponse("Unknown response type for: %s" % command_name)
- def parse_response(self, command_name, **options):
+ def parse_response(self, command_name, catch_errors=False, **options):
"Parses a response from the Redis server"
- response = self._parse_response(command_name)
+ response = self._parse_response(command_name, catch_errors)
if command_name in self.RESPONSE_CALLBACKS:
return self.RESPONSE_CALLBACKS[command_name](response, **options)
return response
@@ -886,16 +900,27 @@ class Pipeline(Redis):
in one transmission. This is convenient for batch processing, such as
saving all the values in a list to Redis.
- Note that pipelining does *not* guarantee all the commands will be executed
- together atomically, nor does it guarantee any transactional consistency.
- If the third command in the batch fails, the first two will still have been
- executed and "committed"
+ All commands executed within a pipeline are wrapped with MULTI and EXEC
+ calls. This guarantees all commands executed in the pipeline will be
+ executed atomically.
+
+ Any command raising an exception does *not* halt the execution of
+ subsequent commands in the pipeline. Instead, the exception is caught
+ and its instance is placed into the response list returned by execute().
+ Code iterating over the response list should be able to deal with an
+ instance of an exception as a potential value. In general, these will be
+ ResponseError exceptions, such as those raised when issuing a command
+ on a key of a different datatype.
"""
def __init__(self, connection, charset, errors):
self.connection = connection
- self.command_stack = []
self.encoding = charset
self.errors = errors
+ self.reset()
+
+ def reset(self):
+ self.command_stack = []
+ self.format_inline('MULTI')
def execute_command(self, command_name, command, **options):
"""
@@ -921,15 +946,34 @@ class Pipeline(Redis):
return self
def _execute(self, commands):
- for _, command, options in commands:
+ for name, command, options in commands:
self.connection.send(command, self)
- return [self.parse_response(name, **options)
- for name, _, options in commands]
+ # we only care about the last item in the response, which should be
+ # the EXEC command
+ for i in range(len(commands)-1):
+ _ = self.parse_response('_')
+ # tell the response parse to catch errors and return them as
+ # part of the response
+ response = self.parse_response('_', catch_errors=True)
+ # don't return the results of the MULTI or EXEC command
+ commands = [(c[0], c[2]) for c in commands[1:-1]]
+ if len(response) != len(commands):
+ raise ResponseError("Wrong number of response items from "
+ "pipline execution")
+ # Run any callbacks for the commands run in the pipeline
+ data = []
+ for r, cmd in zip(response, commands):
+ if not isinstance(r, Exception):
+ if cmd[0] in self.RESPONSE_CALLBACKS:
+ r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[1])
+ data.append(r)
+ return data
def execute(self):
"Execute all the commands in the current pipeline"
+ self.format_inline('EXEC')
stack = self.command_stack
- self.command_stack = []
+ self.reset()
try:
return self._execute(stack)
except ConnectionError:
diff --git a/tests/pipeline.py b/tests/pipeline.py
index cd130d1..da85e55 100644
--- a/tests/pipeline.py
+++ b/tests/pipeline.py
@@ -23,3 +23,33 @@ class PipelineTestCase(unittest.TestCase):
[('z1', 2.0), ('z2', 4)]
]
)
+
+ def test_invalid_command_in_pipeline(self):
+ # all commands but the invalid one should be excuted correctly
+ self.client['c'] = 'a'
+ pipe = self.client.pipeline()
+ pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4)
+ result = pipe.execute()
+
+ self.assertEquals(result[0], True)
+ self.assertEquals(self.client['a'], '1')
+ self.assertEquals(result[1], True)
+ self.assertEquals(self.client['b'], '2')
+ # we can't lpush to a key that's a string value, so this should
+ # be a ResponseError exception
+ self.assert_(isinstance(result[2], redis.ResponseError))
+ self.assertEquals(self.client['c'], 'a')
+ self.assertEquals(result[3], True)
+ self.assertEquals(self.client['d'], '4')
+
+ # make sure the pipe was restored to a working state
+ self.assertEquals(pipe.set('z', 'zzz').execute(), [True])
+ self.assertEquals(self.client['z'], 'zzz')
+
+ def test_pipe_cannot_select(self):
+ pipe = self.client.pipeline()
+ self.assertRaises(redis.RedisError,
+ pipe.select, 'localhost', 6379, db=9)
+
+
+