diff options
author | Alexey Popravka <a.popravka@smartweb.com.ua> | 2018-12-19 15:50:12 +0200 |
---|---|---|
committer | Alexey Popravka <a.popravka@smartweb.com.ua> | 2019-01-03 13:06:56 +0200 |
commit | 733754e13b52bce6687d5a88c1075c963d034722 (patch) | |
tree | 0c607bcc6db22b4befcbb1fa736530687d4c601a /tests/test_multiprocessing.py | |
parent | a3cfded93afa2a65908f05ac251b18d77fa84dd2 (diff) | |
download | redis-py-733754e13b52bce6687d5a88c1075c963d034722.tar.gz |
Add failing tests to show difference between protocol parsers on_disconnect
implementation/behavior (related to #1085).
When hiredis is installed and HiredisParser is used (implicitly),
connection can not be securily shared between process forks.
Diffstat (limited to 'tests/test_multiprocessing.py')
-rw-r--r-- | tests/test_multiprocessing.py | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py new file mode 100644 index 0000000..8af1459 --- /dev/null +++ b/tests/test_multiprocessing.py @@ -0,0 +1,127 @@ +import pytest +import multiprocessing +import contextlib + +from redis.connection import Connection, ConnectionPool + + +@contextlib.contextmanager +def exit_callback(callback, *args): + try: + yield + finally: + callback(*args) + + +class TestMultiprocessing(object): + # Test connection sharing between forks. + # See issue #1085 for details. + + def test_connection(self): + conn = Connection() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + conn.disconnect() + + proc = multiprocessing.Process(target=target, args=(conn,)) + proc.start() + proc.join(3) + assert proc.exitcode is 0 + + # Check that connection is still alive after fork process has exited. + conn.send_command('ping') + assert conn.read_response() == b'PONG' + + def test_close_connection_in_main(self): + conn = Connection() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(conn, ev): + ev.wait() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + ev = multiprocessing.Event() + proc = multiprocessing.Process(target=target, args=(conn, ev)) + proc.start() + + conn.disconnect() + ev.set() + + proc.join(3) + assert proc.exitcode is 0 + + @pytest.mark.parametrize('max_connections', [1, 2, None]) + def test_pool(self, max_connections): + pool = ConnectionPool.from_url('redis://localhost', + max_connections=max_connections) + + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(pool): + with exit_callback(pool.disconnect): + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + proc = multiprocessing.Process(target=target, args=(pool,)) + proc.start() + proc.join(3) + assert proc.exitcode is 0 + + # Check that connection is still alive after fork process has exited. + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + conn.send_command('ping') + assert conn.read_response() == b'PONG' + + @pytest.mark.parametrize('max_connections', [1, 2, None]) + def test_close_pool_in_main(self, max_connections): + pool = ConnectionPool.from_url('redis://localhost', + max_connections=max_connections) + + conn = pool.get_connection('ping') + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(pool, disconnect_event): + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + disconnect_event.wait() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + ev = multiprocessing.Event() + + proc = multiprocessing.Process(target=target, args=(pool, ev)) + proc.start() + + pool.disconnect() + ev.set() + proc.join(3) + assert proc.exitcode is 0 + + def test_redis(self, r): + assert r.ping() is True + + def target(redis): + assert redis.ping() is True + del redis + + proc = multiprocessing.Process(target=target, args=(r,)) + proc.start() + proc.join(3) + assert proc.exitcode is 0 + + assert r.ping() is True |