summaryrefslogtreecommitdiff
path: root/tests/test_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_pipeline.py')
-rw-r--r--tests/test_pipeline.py289
1 files changed, 147 insertions, 142 deletions
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index a87ed71..0518893 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -1,7 +1,8 @@
import pytest
import redis
-from .conftest import wait_for_command, skip_if_server_version_lt
+
+from .conftest import skip_if_server_version_lt, wait_for_command
class TestPipeline:
@@ -12,31 +13,30 @@ class TestPipeline:
def test_pipeline(self, r):
with r.pipeline() as pipe:
- (pipe.set('a', 'a1')
- .get('a')
- .zadd('z', {'z1': 1})
- .zadd('z', {'z2': 4})
- .zincrby('z', 1, 'z1')
- .zrange('z', 0, 5, withscores=True))
- assert pipe.execute() == \
- [
- True,
- b'a1',
- True,
- True,
- 2.0,
- [(b'z1', 2.0), (b'z2', 4)],
- ]
+ (
+ pipe.set("a", "a1")
+ .get("a")
+ .zadd("z", {"z1": 1})
+ .zadd("z", {"z2": 4})
+ .zincrby("z", 1, "z1")
+ .zrange("z", 0, 5, withscores=True)
+ )
+ assert pipe.execute() == [
+ True,
+ b"a1",
+ True,
+ True,
+ 2.0,
+ [(b"z1", 2.0), (b"z2", 4)],
+ ]
def test_pipeline_memoryview(self, r):
with r.pipeline() as pipe:
- (pipe.set('a', memoryview(b'a1'))
- .get('a'))
- assert pipe.execute() == \
- [
- True,
- b'a1',
- ]
+ (pipe.set("a", memoryview(b"a1")).get("a"))
+ assert pipe.execute() == [
+ True,
+ b"a1",
+ ]
def test_pipeline_length(self, r):
with r.pipeline() as pipe:
@@ -44,7 +44,7 @@ class TestPipeline:
assert len(pipe) == 0
# Fill 'er up!
- pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1')
+ pipe.set("a", "a1").set("b", "b1").set("c", "c1")
assert len(pipe) == 3
# Execute calls reset(), so empty once again.
@@ -53,83 +53,84 @@ class TestPipeline:
def test_pipeline_no_transaction(self, r):
with r.pipeline(transaction=False) as pipe:
- pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1')
+ pipe.set("a", "a1").set("b", "b1").set("c", "c1")
assert pipe.execute() == [True, True, True]
- assert r['a'] == b'a1'
- assert r['b'] == b'b1'
- assert r['c'] == b'c1'
+ assert r["a"] == b"a1"
+ assert r["b"] == b"b1"
+ assert r["c"] == b"c1"
@pytest.mark.onlynoncluster
def test_pipeline_no_transaction_watch(self, r):
- r['a'] = 0
+ r["a"] = 0
with r.pipeline(transaction=False) as pipe:
- pipe.watch('a')
- a = pipe.get('a')
+ pipe.watch("a")
+ a = pipe.get("a")
pipe.multi()
- pipe.set('a', int(a) + 1)
+ pipe.set("a", int(a) + 1)
assert pipe.execute() == [True]
@pytest.mark.onlynoncluster
def test_pipeline_no_transaction_watch_failure(self, r):
- r['a'] = 0
+ r["a"] = 0
with r.pipeline(transaction=False) as pipe:
- pipe.watch('a')
- a = pipe.get('a')
+ pipe.watch("a")
+ a = pipe.get("a")
- r['a'] = 'bad'
+ r["a"] = "bad"
pipe.multi()
- pipe.set('a', int(a) + 1)
+ pipe.set("a", int(a) + 1)
with pytest.raises(redis.WatchError):
pipe.execute()
- assert r['a'] == b'bad'
+ assert r["a"] == b"bad"
def test_exec_error_in_response(self, r):
"""
an invalid pipeline command at exec time adds the exception instance
to the list of returned values
"""
- r['c'] = 'a'
+ r["c"] = "a"
with r.pipeline() as pipe:
- pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4)
+ pipe.set("a", 1).set("b", 2).lpush("c", 3).set("d", 4)
result = pipe.execute(raise_on_error=False)
assert result[0]
- assert r['a'] == b'1'
+ assert r["a"] == b"1"
assert result[1]
- assert r['b'] == b'2'
+ assert r["b"] == b"2"
# we can't lpush to a key that's a string value, so this should
# be a ResponseError exception
assert isinstance(result[2], redis.ResponseError)
- assert r['c'] == b'a'
+ assert r["c"] == b"a"
# since this isn't a transaction, the other commands after the
# error are still executed
assert result[3]
- assert r['d'] == b'4'
+ assert r["d"] == b"4"
# make sure the pipe was restored to a working state
- assert pipe.set('z', 'zzz').execute() == [True]
- assert r['z'] == b'zzz'
+ assert pipe.set("z", "zzz").execute() == [True]
+ assert r["z"] == b"zzz"
def test_exec_error_raised(self, r):
- r['c'] = 'a'
+ r["c"] = "a"
with r.pipeline() as pipe:
- pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4)
+ pipe.set("a", 1).set("b", 2).lpush("c", 3).set("d", 4)
with pytest.raises(redis.ResponseError) as ex:
pipe.execute()
- assert str(ex.value).startswith('Command # 3 (LPUSH c 3) of '
- 'pipeline caused error: ')
+ assert str(ex.value).startswith(
+ "Command # 3 (LPUSH c 3) of " "pipeline caused error: "
+ )
# make sure the pipe was restored to a working state
- assert pipe.set('z', 'zzz').execute() == [True]
- assert r['z'] == b'zzz'
+ assert pipe.set("z", "zzz").execute() == [True]
+ assert r["z"] == b"zzz"
@pytest.mark.onlynoncluster
def test_transaction_with_empty_error_command(self, r):
@@ -139,7 +140,7 @@ class TestPipeline:
"""
for error_switch in (True, False):
with r.pipeline() as pipe:
- pipe.set('a', 1).mget([]).set('c', 3)
+ pipe.set("a", 1).mget([]).set("c", 3)
result = pipe.execute(raise_on_error=error_switch)
assert result[0]
@@ -154,7 +155,7 @@ class TestPipeline:
"""
for error_switch in (True, False):
with r.pipeline(transaction=False) as pipe:
- pipe.set('a', 1).mget([]).set('c', 3)
+ pipe.set("a", 1).mget([]).set("c", 3)
result = pipe.execute(raise_on_error=error_switch)
assert result[0]
@@ -164,61 +165,63 @@ class TestPipeline:
def test_parse_error_raised(self, r):
with r.pipeline() as pipe:
# the zrem is invalid because we don't pass any keys to it
- pipe.set('a', 1).zrem('b').set('b', 2)
+ pipe.set("a", 1).zrem("b").set("b", 2)
with pytest.raises(redis.ResponseError) as ex:
pipe.execute()
- assert str(ex.value).startswith('Command # 2 (ZREM b) of '
- 'pipeline caused error: ')
+ assert str(ex.value).startswith(
+ "Command # 2 (ZREM b) of " "pipeline caused error: "
+ )
# make sure the pipe was restored to a working state
- assert pipe.set('z', 'zzz').execute() == [True]
- assert r['z'] == b'zzz'
+ assert pipe.set("z", "zzz").execute() == [True]
+ assert r["z"] == b"zzz"
@pytest.mark.onlynoncluster
def test_parse_error_raised_transaction(self, r):
with r.pipeline() as pipe:
pipe.multi()
# the zrem is invalid because we don't pass any keys to it
- pipe.set('a', 1).zrem('b').set('b', 2)
+ pipe.set("a", 1).zrem("b").set("b", 2)
with pytest.raises(redis.ResponseError) as ex:
pipe.execute()
- assert str(ex.value).startswith('Command # 2 (ZREM b) of '
- 'pipeline caused error: ')
+ assert str(ex.value).startswith(
+ "Command # 2 (ZREM b) of " "pipeline caused error: "
+ )
# make sure the pipe was restored to a working state
- assert pipe.set('z', 'zzz').execute() == [True]
- assert r['z'] == b'zzz'
+ assert pipe.set("z", "zzz").execute() == [True]
+ assert r["z"] == b"zzz"
@pytest.mark.onlynoncluster
def test_watch_succeed(self, r):
- r['a'] = 1
- r['b'] = 2
+ r["a"] = 1
+ r["b"] = 2
with r.pipeline() as pipe:
- pipe.watch('a', 'b')
+ pipe.watch("a", "b")
assert pipe.watching
- a_value = pipe.get('a')
- b_value = pipe.get('b')
- assert a_value == b'1'
- assert b_value == b'2'
+ a_value = pipe.get("a")
+ b_value = pipe.get("b")
+ assert a_value == b"1"
+ assert b_value == b"2"
pipe.multi()
- pipe.set('c', 3)
+ pipe.set("c", 3)
assert pipe.execute() == [True]
assert not pipe.watching
@pytest.mark.onlynoncluster
def test_watch_failure(self, r):
- r['a'] = 1
- r['b'] = 2
+ r["a"] = 1
+ r["b"] = 2
with r.pipeline() as pipe:
- pipe.watch('a', 'b')
- r['b'] = 3
+ pipe.watch("a", "b")
+ r["b"] = 3
pipe.multi()
- pipe.get('a')
+ pipe.get("a")
with pytest.raises(redis.WatchError):
pipe.execute()
@@ -226,12 +229,12 @@ class TestPipeline:
@pytest.mark.onlynoncluster
def test_watch_failure_in_empty_transaction(self, r):
- r['a'] = 1
- r['b'] = 2
+ r["a"] = 1
+ r["b"] = 2
with r.pipeline() as pipe:
- pipe.watch('a', 'b')
- r['b'] = 3
+ pipe.watch("a", "b")
+ r["b"] = 3
pipe.multi()
with pytest.raises(redis.WatchError):
pipe.execute()
@@ -240,103 +243,104 @@ class TestPipeline:
@pytest.mark.onlynoncluster
def test_unwatch(self, r):
- r['a'] = 1
- r['b'] = 2
+ r["a"] = 1
+ r["b"] = 2
with r.pipeline() as pipe:
- pipe.watch('a', 'b')
- r['b'] = 3
+ pipe.watch("a", "b")
+ r["b"] = 3
pipe.unwatch()
assert not pipe.watching
- pipe.get('a')
- assert pipe.execute() == [b'1']
+ pipe.get("a")
+ assert pipe.execute() == [b"1"]
@pytest.mark.onlynoncluster
def test_watch_exec_no_unwatch(self, r):
- r['a'] = 1
- r['b'] = 2
+ r["a"] = 1
+ r["b"] = 2
with r.monitor() as m:
with r.pipeline() as pipe:
- pipe.watch('a', 'b')
+ pipe.watch("a", "b")
assert pipe.watching
- a_value = pipe.get('a')
- b_value = pipe.get('b')
- assert a_value == b'1'
- assert b_value == b'2'
+ a_value = pipe.get("a")
+ b_value = pipe.get("b")
+ assert a_value == b"1"
+ assert b_value == b"2"
pipe.multi()
- pipe.set('c', 3)
+ pipe.set("c", 3)
assert pipe.execute() == [True]
assert not pipe.watching
- unwatch_command = wait_for_command(r, m, 'UNWATCH')
+ unwatch_command = wait_for_command(r, m, "UNWATCH")
assert unwatch_command is None, "should not send UNWATCH"
@pytest.mark.onlynoncluster
def test_watch_reset_unwatch(self, r):
- r['a'] = 1
+ r["a"] = 1
with r.monitor() as m:
with r.pipeline() as pipe:
- pipe.watch('a')
+ pipe.watch("a")
assert pipe.watching
pipe.reset()
assert not pipe.watching
- unwatch_command = wait_for_command(r, m, 'UNWATCH')
+ unwatch_command = wait_for_command(r, m, "UNWATCH")
assert unwatch_command is not None
- assert unwatch_command['command'] == 'UNWATCH'
+ assert unwatch_command["command"] == "UNWATCH"
@pytest.mark.onlynoncluster
def test_transaction_callable(self, r):
- r['a'] = 1
- r['b'] = 2
+ r["a"] = 1
+ r["b"] = 2
has_run = []
def my_transaction(pipe):
- a_value = pipe.get('a')
- assert a_value in (b'1', b'2')
- b_value = pipe.get('b')
- assert b_value == b'2'
+ a_value = pipe.get("a")
+ assert a_value in (b"1", b"2")
+ b_value = pipe.get("b")
+ assert b_value == b"2"
# silly run-once code... incr's "a" so WatchError should be raised
# forcing this all to run again. this should incr "a" once to "2"
if not has_run:
- r.incr('a')
- has_run.append('it has')
+ r.incr("a")
+ has_run.append("it has")
pipe.multi()
- pipe.set('c', int(a_value) + int(b_value))
+ pipe.set("c", int(a_value) + int(b_value))
- result = r.transaction(my_transaction, 'a', 'b')
+ result = r.transaction(my_transaction, "a", "b")
assert result == [True]
- assert r['c'] == b'4'
+ assert r["c"] == b"4"
@pytest.mark.onlynoncluster
def test_transaction_callable_returns_value_from_callable(self, r):
def callback(pipe):
# No need to do anything here since we only want the return value
- return 'a'
+ return "a"
- res = r.transaction(callback, 'my-key', value_from_callable=True)
- assert res == 'a'
+ res = r.transaction(callback, "my-key", value_from_callable=True)
+ assert res == "a"
def test_exec_error_in_no_transaction_pipeline(self, r):
- r['a'] = 1
+ r["a"] = 1
with r.pipeline(transaction=False) as pipe:
- pipe.llen('a')
- pipe.expire('a', 100)
+ pipe.llen("a")
+ pipe.expire("a", 100)
with pytest.raises(redis.ResponseError) as ex:
pipe.execute()
- assert str(ex.value).startswith('Command # 1 (LLEN a) of '
- 'pipeline caused error: ')
+ assert str(ex.value).startswith(
+ "Command # 1 (LLEN a) of " "pipeline caused error: "
+ )
- assert r['a'] == b'1'
+ assert r["a"] == b"1"
def test_exec_error_in_no_transaction_pipeline_unicode_command(self, r):
- key = chr(3456) + 'abcd' + chr(3421)
+ key = chr(3456) + "abcd" + chr(3421)
r[key] = 1
with r.pipeline(transaction=False) as pipe:
pipe.llen(key)
@@ -345,51 +349,52 @@ class TestPipeline:
with pytest.raises(redis.ResponseError) as ex:
pipe.execute()
- expected = f'Command # 1 (LLEN {key}) of pipeline caused error: '
+ expected = f"Command # 1 (LLEN {key}) of pipeline caused error: "
assert str(ex.value).startswith(expected)
- assert r[key] == b'1'
+ assert r[key] == b"1"
def test_pipeline_with_bitfield(self, r):
with r.pipeline() as pipe:
- pipe.set('a', '1')
- bf = pipe.bitfield('b')
- pipe2 = (bf
- .set('u8', 8, 255)
- .get('u8', 0)
- .get('u4', 8) # 1111
- .get('u4', 12) # 1111
- .get('u4', 13) # 1110
- .execute())
- pipe.get('a')
+ pipe.set("a", "1")
+ bf = pipe.bitfield("b")
+ pipe2 = (
+ bf.set("u8", 8, 255)
+ .get("u8", 0)
+ .get("u4", 8) # 1111
+ .get("u4", 12) # 1111
+ .get("u4", 13) # 1110
+ .execute()
+ )
+ pipe.get("a")
response = pipe.execute()
assert pipe == pipe2
- assert response == [True, [0, 0, 15, 15, 14], b'1']
+ assert response == [True, [0, 0, 15, 15, 14], b"1"]
@pytest.mark.onlynoncluster
- @skip_if_server_version_lt('2.0.0')
+ @skip_if_server_version_lt("2.0.0")
def test_pipeline_discard(self, r):
# empty pipeline should raise an error
with r.pipeline() as pipe:
- pipe.set('key', 'someval')
+ pipe.set("key", "someval")
pipe.discard()
with pytest.raises(redis.exceptions.ResponseError):
pipe.execute()
# setting a pipeline and discarding should do the same
with r.pipeline() as pipe:
- pipe.set('key', 'someval')
- pipe.set('someotherkey', 'val')
+ pipe.set("key", "someval")
+ pipe.set("someotherkey", "val")
response = pipe.execute()
- pipe.set('key', 'another value!')
+ pipe.set("key", "another value!")
pipe.discard()
- pipe.set('key', 'another vae!')
+ pipe.set("key", "another vae!")
with pytest.raises(redis.exceptions.ResponseError):
pipe.execute()
- pipe.set('foo', 'bar')
+ pipe.set("foo", "bar")
response = pipe.execute()
assert response[0]
- assert r.get('foo') == b'bar'
+ assert r.get("foo") == b"bar"