summaryrefslogtreecommitdiff
path: root/tests/test_asyncio/test_pipeline.py
diff options
context:
space:
mode:
authorAndrew Chen Wang <60190294+Andrew-Chen-Wang@users.noreply.github.com>2022-02-22 05:29:55 -0500
committerGitHub <noreply@github.com>2022-02-22 12:29:55 +0200
commitd56baeb683fc1935cfa343fa2eeb0fa9bd955283 (patch)
tree47357a74bf1d1428cfbcf0d8b2c781f1f971cf77 /tests/test_asyncio/test_pipeline.py
parente3c989d93e914e6502bd5a72f15ded49a135c5be (diff)
downloadredis-py-d56baeb683fc1935cfa343fa2eeb0fa9bd955283.tar.gz
Add Async Support (#1899)
Co-authored-by: Chayim I. Kirshen <c@kirshen.com> Co-authored-by: dvora-h <dvora.heller@redis.com>
Diffstat (limited to 'tests/test_asyncio/test_pipeline.py')
-rw-r--r--tests/test_asyncio/test_pipeline.py409
1 files changed, 409 insertions, 0 deletions
diff --git a/tests/test_asyncio/test_pipeline.py b/tests/test_asyncio/test_pipeline.py
new file mode 100644
index 0000000..5bb1a8a
--- /dev/null
+++ b/tests/test_asyncio/test_pipeline.py
@@ -0,0 +1,409 @@
+import pytest
+
+import redis
+from tests.conftest import skip_if_server_version_lt
+
+from .conftest import wait_for_command
+
+pytestmark = pytest.mark.asyncio
+
+
+@pytest.mark.onlynoncluster
+class TestPipeline:
+ async def test_pipeline_is_true(self, r):
+ """Ensure pipeline instances are not false-y"""
+ async with r.pipeline() as pipe:
+ assert pipe
+
+ async def test_pipeline(self, r):
+ async with r.pipeline() as pipe:
+ (
+ pipe.set("a", "a1")
+ .get("a")
+ .zadd("z", {"z1": 1})
+ .zadd("z", {"z2": 4})
+ .zincrby("z", 1, "z1")
+ .zrange("z", 0, 5, withscores=True)
+ )
+ assert await pipe.execute() == [
+ True,
+ b"a1",
+ True,
+ True,
+ 2.0,
+ [(b"z1", 2.0), (b"z2", 4)],
+ ]
+
+ async def test_pipeline_memoryview(self, r):
+ async with r.pipeline() as pipe:
+ (pipe.set("a", memoryview(b"a1")).get("a"))
+ assert await pipe.execute() == [
+ True,
+ b"a1",
+ ]
+
+ async def test_pipeline_length(self, r):
+ async with r.pipeline() as pipe:
+ # Initially empty.
+ assert len(pipe) == 0
+
+ # Fill 'er up!
+ pipe.set("a", "a1").set("b", "b1").set("c", "c1")
+ assert len(pipe) == 3
+
+ # Execute calls reset(), so empty once again.
+ await pipe.execute()
+ assert len(pipe) == 0
+
+ @pytest.mark.onlynoncluster
+ async def test_pipeline_no_transaction(self, r):
+ async with r.pipeline(transaction=False) as pipe:
+ pipe.set("a", "a1").set("b", "b1").set("c", "c1")
+ assert await pipe.execute() == [True, True, True]
+ assert await r.get("a") == b"a1"
+ assert await r.get("b") == b"b1"
+ assert await r.get("c") == b"c1"
+
+ async def test_pipeline_no_transaction_watch(self, r):
+ await r.set("a", 0)
+
+ async with r.pipeline(transaction=False) as pipe:
+ await pipe.watch("a")
+ a = await pipe.get("a")
+
+ pipe.multi()
+ pipe.set("a", int(a) + 1)
+ assert await pipe.execute() == [True]
+
+ async def test_pipeline_no_transaction_watch_failure(self, r):
+ await r.set("a", 0)
+
+ async with r.pipeline(transaction=False) as pipe:
+ await pipe.watch("a")
+ a = await pipe.get("a")
+
+ await r.set("a", "bad")
+
+ pipe.multi()
+ pipe.set("a", int(a) + 1)
+
+ with pytest.raises(redis.WatchError):
+ await pipe.execute()
+
+ assert await r.get("a") == b"bad"
+
+ async def test_exec_error_in_response(self, r):
+ """
+ an invalid pipeline command at exec time adds the exception instance
+ to the list of returned values
+ """
+ await r.set("c", "a")
+ async with r.pipeline() as pipe:
+ pipe.set("a", 1).set("b", 2).lpush("c", 3).set("d", 4)
+ result = await pipe.execute(raise_on_error=False)
+
+ assert result[0]
+ assert await r.get("a") == b"1"
+ assert result[1]
+ assert await r.get("b") == b"2"
+
+ # we can't lpush to a key that's a string value, so this should
+ # be a ResponseError exception
+ assert isinstance(result[2], redis.ResponseError)
+ assert await r.get("c") == b"a"
+
+ # since this isn't a transaction, the other commands after the
+ # error are still executed
+ assert result[3]
+ assert await r.get("d") == b"4"
+
+ # make sure the pipe was restored to a working state
+ assert await pipe.set("z", "zzz").execute() == [True]
+ assert await r.get("z") == b"zzz"
+
+ async def test_exec_error_raised(self, r):
+ await r.set("c", "a")
+ async with r.pipeline() as pipe:
+ pipe.set("a", 1).set("b", 2).lpush("c", 3).set("d", 4)
+ with pytest.raises(redis.ResponseError) as ex:
+ await pipe.execute()
+ assert str(ex.value).startswith(
+ "Command # 3 (LPUSH c 3) of " "pipeline caused error: "
+ )
+
+ # make sure the pipe was restored to a working state
+ assert await pipe.set("z", "zzz").execute() == [True]
+ assert await r.get("z") == b"zzz"
+
+ @pytest.mark.onlynoncluster
+ async def test_transaction_with_empty_error_command(self, r):
+ """
+ Commands with custom EMPTY_ERROR functionality return their default
+ values in the pipeline no matter the raise_on_error preference
+ """
+ for error_switch in (True, False):
+ async with r.pipeline() as pipe:
+ pipe.set("a", 1).mget([]).set("c", 3)
+ result = await pipe.execute(raise_on_error=error_switch)
+
+ assert result[0]
+ assert result[1] == []
+ assert result[2]
+
+ @pytest.mark.onlynoncluster
+ async def test_pipeline_with_empty_error_command(self, r):
+ """
+ Commands with custom EMPTY_ERROR functionality return their default
+ values in the pipeline no matter the raise_on_error preference
+ """
+ for error_switch in (True, False):
+ async with r.pipeline(transaction=False) as pipe:
+ pipe.set("a", 1).mget([]).set("c", 3)
+ result = await pipe.execute(raise_on_error=error_switch)
+
+ assert result[0]
+ assert result[1] == []
+ assert result[2]
+
+ async def test_parse_error_raised(self, r):
+ async with r.pipeline() as pipe:
+ # the zrem is invalid because we don't pass any keys to it
+ pipe.set("a", 1).zrem("b").set("b", 2)
+ with pytest.raises(redis.ResponseError) as ex:
+ await pipe.execute()
+
+ assert str(ex.value).startswith(
+ "Command # 2 (ZREM b) of " "pipeline caused error: "
+ )
+
+ # make sure the pipe was restored to a working state
+ assert await pipe.set("z", "zzz").execute() == [True]
+ assert await r.get("z") == b"zzz"
+
+ @pytest.mark.onlynoncluster
+ async def test_parse_error_raised_transaction(self, r):
+ async with r.pipeline() as pipe:
+ pipe.multi()
+ # the zrem is invalid because we don't pass any keys to it
+ pipe.set("a", 1).zrem("b").set("b", 2)
+ with pytest.raises(redis.ResponseError) as ex:
+ await pipe.execute()
+
+ assert str(ex.value).startswith(
+ "Command # 2 (ZREM b) of " "pipeline caused error: "
+ )
+
+ # make sure the pipe was restored to a working state
+ assert await pipe.set("z", "zzz").execute() == [True]
+ assert await r.get("z") == b"zzz"
+
+ @pytest.mark.onlynoncluster
+ async def test_watch_succeed(self, r):
+ await r.set("a", 1)
+ await r.set("b", 2)
+
+ async with r.pipeline() as pipe:
+ await pipe.watch("a", "b")
+ assert pipe.watching
+ a_value = await pipe.get("a")
+ b_value = await pipe.get("b")
+ assert a_value == b"1"
+ assert b_value == b"2"
+ pipe.multi()
+
+ pipe.set("c", 3)
+ assert await pipe.execute() == [True]
+ assert not pipe.watching
+
+ @pytest.mark.onlynoncluster
+ async def test_watch_failure(self, r):
+ await r.set("a", 1)
+ await r.set("b", 2)
+
+ async with r.pipeline() as pipe:
+ await pipe.watch("a", "b")
+ await r.set("b", 3)
+ pipe.multi()
+ pipe.get("a")
+ with pytest.raises(redis.WatchError):
+ await pipe.execute()
+
+ assert not pipe.watching
+
+ @pytest.mark.onlynoncluster
+ async def test_watch_failure_in_empty_transaction(self, r):
+ await r.set("a", 1)
+ await r.set("b", 2)
+
+ async with r.pipeline() as pipe:
+ await pipe.watch("a", "b")
+ await r.set("b", 3)
+ pipe.multi()
+ with pytest.raises(redis.WatchError):
+ await pipe.execute()
+
+ assert not pipe.watching
+
+ @pytest.mark.onlynoncluster
+ async def test_unwatch(self, r):
+ await r.set("a", 1)
+ await r.set("b", 2)
+
+ async with r.pipeline() as pipe:
+ await pipe.watch("a", "b")
+ await r.set("b", 3)
+ await pipe.unwatch()
+ assert not pipe.watching
+ pipe.get("a")
+ assert await pipe.execute() == [b"1"]
+
+ @pytest.mark.onlynoncluster
+ async def test_watch_exec_no_unwatch(self, r):
+ await r.set("a", 1)
+ await r.set("b", 2)
+
+ async with r.monitor() as m:
+ async with r.pipeline() as pipe:
+ await pipe.watch("a", "b")
+ assert pipe.watching
+ a_value = await pipe.get("a")
+ b_value = await pipe.get("b")
+ assert a_value == b"1"
+ assert b_value == b"2"
+ pipe.multi()
+ pipe.set("c", 3)
+ assert await pipe.execute() == [True]
+ assert not pipe.watching
+
+ unwatch_command = await wait_for_command(r, m, "UNWATCH")
+ assert unwatch_command is None, "should not send UNWATCH"
+
+ @pytest.mark.onlynoncluster
+ async def test_watch_reset_unwatch(self, r):
+ await r.set("a", 1)
+
+ async with r.monitor() as m:
+ async with r.pipeline() as pipe:
+ await pipe.watch("a")
+ assert pipe.watching
+ await pipe.reset()
+ assert not pipe.watching
+
+ unwatch_command = await wait_for_command(r, m, "UNWATCH")
+ assert unwatch_command is not None
+ assert unwatch_command["command"] == "UNWATCH"
+
+ @pytest.mark.onlynoncluster
+ async def test_transaction_callable(self, r):
+ await r.set("a", 1)
+ await r.set("b", 2)
+ has_run = []
+
+ async def my_transaction(pipe):
+ a_value = await pipe.get("a")
+ assert a_value in (b"1", b"2")
+ b_value = await pipe.get("b")
+ assert b_value == b"2"
+
+ # silly run-once code... incr's "a" so WatchError should be raised
+ # forcing this all to run again. this should incr "a" once to "2"
+ if not has_run:
+ await r.incr("a")
+ has_run.append("it has")
+
+ pipe.multi()
+ pipe.set("c", int(a_value) + int(b_value))
+
+ result = await r.transaction(my_transaction, "a", "b")
+ assert result == [True]
+ assert await r.get("c") == b"4"
+
+ @pytest.mark.onlynoncluster
+ async def test_transaction_callable_returns_value_from_callable(self, r):
+ async def callback(pipe):
+ # No need to do anything here since we only want the return value
+ return "a"
+
+ res = await r.transaction(callback, "my-key", value_from_callable=True)
+ assert res == "a"
+
+ async def test_exec_error_in_no_transaction_pipeline(self, r):
+ await r.set("a", 1)
+ async with r.pipeline(transaction=False) as pipe:
+ pipe.llen("a")
+ pipe.expire("a", 100)
+
+ with pytest.raises(redis.ResponseError) as ex:
+ await pipe.execute()
+
+ assert str(ex.value).startswith(
+ "Command # 1 (LLEN a) of " "pipeline caused error: "
+ )
+
+ assert await r.get("a") == b"1"
+
+ async def test_exec_error_in_no_transaction_pipeline_unicode_command(self, r):
+ key = chr(3456) + "abcd" + chr(3421)
+ await r.set(key, 1)
+ async with r.pipeline(transaction=False) as pipe:
+ pipe.llen(key)
+ pipe.expire(key, 100)
+
+ with pytest.raises(redis.ResponseError) as ex:
+ await pipe.execute()
+
+ expected = f"Command # 1 (LLEN {key}) of pipeline caused error: "
+ assert str(ex.value).startswith(expected)
+
+ assert await r.get(key) == b"1"
+
+ async def test_pipeline_with_bitfield(self, r):
+ async with r.pipeline() as pipe:
+ pipe.set("a", "1")
+ bf = pipe.bitfield("b")
+ pipe2 = (
+ bf.set("u8", 8, 255)
+ .get("u8", 0)
+ .get("u4", 8) # 1111
+ .get("u4", 12) # 1111
+ .get("u4", 13) # 1110
+ .execute()
+ )
+ pipe.get("a")
+ response = await pipe.execute()
+
+ assert pipe == pipe2
+ assert response == [True, [0, 0, 15, 15, 14], b"1"]
+
+ async def test_pipeline_get(self, r):
+ await r.set("a", "a1")
+ async with r.pipeline() as pipe:
+ await pipe.get("a")
+ assert await pipe.execute() == [b"a1"]
+
+ @pytest.mark.onlynoncluster
+ @skip_if_server_version_lt("2.0.0")
+ async def test_pipeline_discard(self, r):
+
+ # empty pipeline should raise an error
+ async with r.pipeline() as pipe:
+ pipe.set("key", "someval")
+ await pipe.discard()
+ with pytest.raises(redis.exceptions.ResponseError):
+ await pipe.execute()
+
+ # setting a pipeline and discarding should do the same
+ async with r.pipeline() as pipe:
+ pipe.set("key", "someval")
+ pipe.set("someotherkey", "val")
+ response = await pipe.execute()
+ pipe.set("key", "another value!")
+ await pipe.discard()
+ pipe.set("key", "another vae!")
+ with pytest.raises(redis.exceptions.ResponseError):
+ await pipe.execute()
+
+ pipe.set("foo", "bar")
+ response = await pipe.execute()
+ assert response[0]
+ assert await r.get("foo") == b"bar"