summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexey Popravka <a.popravka@smartweb.com.ua>2018-12-19 15:50:12 +0200
committerAlexey Popravka <a.popravka@smartweb.com.ua>2019-01-03 13:06:56 +0200
commit733754e13b52bce6687d5a88c1075c963d034722 (patch)
tree0c607bcc6db22b4befcbb1fa736530687d4c601a
parenta3cfded93afa2a65908f05ac251b18d77fa84dd2 (diff)
downloadredis-py-733754e13b52bce6687d5a88c1075c963d034722.tar.gz
Add failing tests to show difference between protocol parsers on_disconnect
implementation/behavior (related to #1085). When hiredis is installed and HiredisParser is used (implicitly), connection can not be securily shared between process forks.
-rw-r--r--tests/test_multiprocessing.py127
1 files changed, 127 insertions, 0 deletions
diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py
new file mode 100644
index 0000000..8af1459
--- /dev/null
+++ b/tests/test_multiprocessing.py
@@ -0,0 +1,127 @@
+import pytest
+import multiprocessing
+import contextlib
+
+from redis.connection import Connection, ConnectionPool
+
+
+@contextlib.contextmanager
+def exit_callback(callback, *args):
+ try:
+ yield
+ finally:
+ callback(*args)
+
+
+class TestMultiprocessing(object):
+ # Test connection sharing between forks.
+ # See issue #1085 for details.
+
+ def test_connection(self):
+ conn = Connection()
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+
+ def target(conn):
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+ conn.disconnect()
+
+ proc = multiprocessing.Process(target=target, args=(conn,))
+ proc.start()
+ proc.join(3)
+ assert proc.exitcode is 0
+
+ # Check that connection is still alive after fork process has exited.
+ conn.send_command('ping')
+ assert conn.read_response() == b'PONG'
+
+ def test_close_connection_in_main(self):
+ conn = Connection()
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+
+ def target(conn, ev):
+ ev.wait()
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+
+ ev = multiprocessing.Event()
+ proc = multiprocessing.Process(target=target, args=(conn, ev))
+ proc.start()
+
+ conn.disconnect()
+ ev.set()
+
+ proc.join(3)
+ assert proc.exitcode is 0
+
+ @pytest.mark.parametrize('max_connections', [1, 2, None])
+ def test_pool(self, max_connections):
+ pool = ConnectionPool.from_url('redis://localhost',
+ max_connections=max_connections)
+
+ conn = pool.get_connection('ping')
+ with exit_callback(pool.release, conn):
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+
+ def target(pool):
+ with exit_callback(pool.disconnect):
+ conn = pool.get_connection('ping')
+ with exit_callback(pool.release, conn):
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+
+ proc = multiprocessing.Process(target=target, args=(pool,))
+ proc.start()
+ proc.join(3)
+ assert proc.exitcode is 0
+
+ # Check that connection is still alive after fork process has exited.
+ conn = pool.get_connection('ping')
+ with exit_callback(pool.release, conn):
+ conn.send_command('ping')
+ assert conn.read_response() == b'PONG'
+
+ @pytest.mark.parametrize('max_connections', [1, 2, None])
+ def test_close_pool_in_main(self, max_connections):
+ pool = ConnectionPool.from_url('redis://localhost',
+ max_connections=max_connections)
+
+ conn = pool.get_connection('ping')
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+
+ def target(pool, disconnect_event):
+ conn = pool.get_connection('ping')
+ with exit_callback(pool.release, conn):
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+ disconnect_event.wait()
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
+
+ ev = multiprocessing.Event()
+
+ proc = multiprocessing.Process(target=target, args=(pool, ev))
+ proc.start()
+
+ pool.disconnect()
+ ev.set()
+ proc.join(3)
+ assert proc.exitcode is 0
+
+ def test_redis(self, r):
+ assert r.ping() is True
+
+ def target(redis):
+ assert redis.ping() is True
+ del redis
+
+ proc = multiprocessing.Process(target=target, args=(r,))
+ proc.start()
+ proc.join(3)
+ assert proc.exitcode is 0
+
+ assert r.ping() is True