diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_asyncio/test_cluster.py | 17 | ||||
-rw-r--r-- | tests/test_asyncio/test_connection.py | 21 | ||||
-rw-r--r-- | tests/test_asyncio/test_cwe_404.py | 146 | ||||
-rw-r--r-- | tests/test_asyncio/test_pubsub.py | 3 |
4 files changed, 149 insertions, 38 deletions
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 0857c05..13e5e26 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -340,23 +340,6 @@ class TestRedisClusterObj: rc = RedisCluster.from_url("rediss://localhost:16379") assert rc.connection_kwargs["connection_class"] is SSLConnection - async def test_asynckills(self, r) -> None: - - await r.set("foo", "foo") - await r.set("bar", "bar") - - t = asyncio.create_task(r.get("foo")) - await asyncio.sleep(1) - t.cancel() - try: - await t - except asyncio.CancelledError: - pytest.fail("connection is left open with unread response") - - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" - async def test_max_connections( self, create_redis: Callable[..., RedisCluster] ) -> None: diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index d3b6285..e2d77fc 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -44,27 +44,6 @@ async def test_invalid_response(create_redis): await r.connection.disconnect() -async def test_asynckills(): - - for b in [True, False]: - r = Redis(single_connection_client=b) - - await r.set("foo", "foo") - await r.set("bar", "bar") - - t = asyncio.create_task(r.get("foo")) - await asyncio.sleep(1) - t.cancel() - try: - await t - except asyncio.CancelledError: - pytest.fail("connection left open with unread response") - - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" - - @pytest.mark.onlynoncluster async def test_single_connection(): """Test that concurrent requests on a single client are synchronised.""" diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py new file mode 100644 index 0000000..6683440 --- /dev/null +++ b/tests/test_asyncio/test_cwe_404.py @@ -0,0 +1,146 @@ +import asyncio +import sys + +import pytest + +from redis.asyncio import Redis +from redis.asyncio.cluster import RedisCluster + + +async def pipe( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name="" +): + while True: + data = await reader.read(1000) + if not data: + break + await asyncio.sleep(delay) + writer.write(data) + await writer.drain() + + +class DelayProxy: + def __init__(self, addr, redis_addr, delay: float): + self.addr = addr + self.redis_addr = redis_addr + self.delay = delay + + async def start(self): + self.server = await asyncio.start_server(self.handle, *self.addr) + self.ROUTINE = asyncio.create_task(self.server.serve_forever()) + + async def handle(self, reader, writer): + # establish connection to redis + redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) + pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, "to redis:")) + pipe2 = asyncio.create_task( + pipe(redis_reader, writer, self.delay, "from redis:") + ) + await asyncio.gather(pipe1, pipe2) + + async def stop(self): + # clean up enough so that we can reuse the looper + self.ROUTINE.cancel() + loop = self.server.get_loop() + await loop.shutdown_asyncgens() + + +@pytest.mark.onlynoncluster +@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) +async def test_standalone(delay): + + # create a tcp socket proxy that relays data to Redis and back, + # inserting 0.1 seconds of delay + dp = DelayProxy( + addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2 + ) + await dp.start() + + for b in [True, False]: + # note that we connect to proxy, rather than to Redis directly + async with Redis(host="localhost", port=5380, single_connection_client=b) as r: + + await r.set("foo", "foo") + await r.set("bar", "bar") + + t = asyncio.create_task(r.get("foo")) + await asyncio.sleep(delay) + t.cancel() + try: + await t + sys.stderr.write("try again, we did not cancel the task in time\n") + except asyncio.CancelledError: + sys.stderr.write( + "canceled task, connection is left open with unread response\n" + ) + + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" + + await dp.stop() + + +@pytest.mark.onlynoncluster +@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) +async def test_standalone_pipeline(delay): + dp = DelayProxy( + addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2 + ) + await dp.start() + async with Redis(host="localhost", port=5380) as r: + await r.set("foo", "foo") + await r.set("bar", "bar") + + pipe = r.pipeline() + + pipe2 = r.pipeline() + pipe2.get("bar") + pipe2.ping() + pipe2.get("foo") + + t = asyncio.create_task(pipe.get("foo").execute()) + await asyncio.sleep(delay) + t.cancel() + + pipe.get("bar") + pipe.ping() + pipe.get("foo") + pipe.reset() + + assert await pipe.execute() is None + + # validating that the pipeline can be used as it could previously + pipe.get("bar") + pipe.ping() + pipe.get("foo") + assert await pipe.execute() == [b"bar", True, b"foo"] + assert await pipe2.execute() == [b"bar", True, b"foo"] + + await dp.stop() + + +@pytest.mark.onlycluster +async def test_cluster(request): + + dp = DelayProxy(addr=("localhost", 5381), redis_addr=("localhost", 6372), delay=0.1) + await dp.start() + + r = RedisCluster.from_url("redis://localhost:5381") + await r.initialize() + await r.set("foo", "foo") + await r.set("bar", "bar") + + t = asyncio.create_task(r.get("foo")) + await asyncio.sleep(0.050) + t.cancel() + try: + await t + except asyncio.CancelledError: + pytest.fail("connection is left open with unread response") + + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" + + await dp.stop() diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py index 8f3817a..ba70782 100644 --- a/tests/test_asyncio/test_pubsub.py +++ b/tests/test_asyncio/test_pubsub.py @@ -973,6 +973,9 @@ class TestBaseException: # the timeout on the read should not cause disconnect assert pubsub.connection.is_connected + @pytest.mark.skipif( + sys.version_info < (3, 8), reason="requires python 3.8 or higher" + ) async def test_base_exception(self, r: redis.Redis): """ Manually trigger a BaseException inside the parser's .read_response method |