From 733754e13b52bce6687d5a88c1075c963d034722 Mon Sep 17 00:00:00 2001 From: Alexey Popravka Date: Wed, 19 Dec 2018 15:50:12 +0200 Subject: Add failing tests to show difference between protocol parsers on_disconnect implementation/behavior (related to #1085). When hiredis is installed and HiredisParser is used (implicitly), connection can not be securily shared between process forks. --- tests/test_multiprocessing.py | 127 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 tests/test_multiprocessing.py diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py new file mode 100644 index 0000000..8af1459 --- /dev/null +++ b/tests/test_multiprocessing.py @@ -0,0 +1,127 @@ +import pytest +import multiprocessing +import contextlib + +from redis.connection import Connection, ConnectionPool + + +@contextlib.contextmanager +def exit_callback(callback, *args): + try: + yield + finally: + callback(*args) + + +class TestMultiprocessing(object): + # Test connection sharing between forks. + # See issue #1085 for details. + + def test_connection(self): + conn = Connection() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + conn.disconnect() + + proc = multiprocessing.Process(target=target, args=(conn,)) + proc.start() + proc.join(3) + assert proc.exitcode is 0 + + # Check that connection is still alive after fork process has exited. + conn.send_command('ping') + assert conn.read_response() == b'PONG' + + def test_close_connection_in_main(self): + conn = Connection() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(conn, ev): + ev.wait() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + ev = multiprocessing.Event() + proc = multiprocessing.Process(target=target, args=(conn, ev)) + proc.start() + + conn.disconnect() + ev.set() + + proc.join(3) + assert proc.exitcode is 0 + + @pytest.mark.parametrize('max_connections', [1, 2, None]) + def test_pool(self, max_connections): + pool = ConnectionPool.from_url('redis://localhost', + max_connections=max_connections) + + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(pool): + with exit_callback(pool.disconnect): + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + proc = multiprocessing.Process(target=target, args=(pool,)) + proc.start() + proc.join(3) + assert proc.exitcode is 0 + + # Check that connection is still alive after fork process has exited. + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + conn.send_command('ping') + assert conn.read_response() == b'PONG' + + @pytest.mark.parametrize('max_connections', [1, 2, None]) + def test_close_pool_in_main(self, max_connections): + pool = ConnectionPool.from_url('redis://localhost', + max_connections=max_connections) + + conn = pool.get_connection('ping') + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + def target(pool, disconnect_event): + conn = pool.get_connection('ping') + with exit_callback(pool.release, conn): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + disconnect_event.wait() + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' + + ev = multiprocessing.Event() + + proc = multiprocessing.Process(target=target, args=(pool, ev)) + proc.start() + + pool.disconnect() + ev.set() + proc.join(3) + assert proc.exitcode is 0 + + def test_redis(self, r): + assert r.ping() is True + + def target(redis): + assert redis.ping() is True + del redis + + proc = multiprocessing.Process(target=target, args=(r,)) + proc.start() + proc.join(3) + assert proc.exitcode is 0 + + assert r.ping() is True -- cgit v1.2.1 From 6020f43264209b2430a01fc3098d74bb142942a7 Mon Sep 17 00:00:00 2001 From: Alexey Popravka Date: Thu, 3 Jan 2019 13:13:52 +0200 Subject: update test to expect errors --- tests/test_multiprocessing.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index 8af1459..dae35bc 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -3,6 +3,7 @@ import multiprocessing import contextlib from redis.connection import Connection, ConnectionPool +from redis.exceptions import ConnectionError @contextlib.contextmanager @@ -33,8 +34,9 @@ class TestMultiprocessing(object): assert proc.exitcode is 0 # Check that connection is still alive after fork process has exited. - conn.send_command('ping') - assert conn.read_response() == b'PONG' + with pytest.raises(ConnectionError): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' def test_close_connection_in_main(self): conn = Connection() @@ -54,7 +56,7 @@ class TestMultiprocessing(object): ev.set() proc.join(3) - assert proc.exitcode is 0 + assert proc.exitcode is 1 @pytest.mark.parametrize('max_connections', [1, 2, None]) def test_pool(self, max_connections): @@ -81,8 +83,9 @@ class TestMultiprocessing(object): # Check that connection is still alive after fork process has exited. conn = pool.get_connection('ping') with exit_callback(pool.release, conn): - conn.send_command('ping') - assert conn.read_response() == b'PONG' + with pytest.raises(ConnectionError): + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' @pytest.mark.parametrize('max_connections', [1, 2, None]) def test_close_pool_in_main(self, max_connections): -- cgit v1.2.1 From a8bf82fc9edc0040062e5b3ee4c3074f67caaea1 Mon Sep 17 00:00:00 2001 From: Alexey Popravka Date: Thu, 3 Jan 2019 13:15:36 +0200 Subject: =?UTF-8?q?Make=20PythonParser's=20on=5Fdisconnect=20consistent=20?= =?UTF-8?q?with=20Hiredisparser=20and=20Connection=20=E2=80=94=20do=20not?= =?UTF-8?q?=20close=20socket=20on=20disconnect.=20Resolves=20#1085?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redis/connection.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index ea06241..7575b76 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -276,9 +276,7 @@ class PythonParser(BaseParser): def on_disconnect(self): "Called when the socket disconnects" - if self._sock is not None: - self._sock.close() - self._sock = None + self._sock = None if self._buffer is not None: self._buffer.close() self._buffer = None -- cgit v1.2.1 From 4e1e74809235edc19e03edb79c97c80a3e4e9eca Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Thu, 31 Jan 2019 19:30:41 -0800 Subject: Improve how connection pools operate in forked/child proceeses. Sometimes a process with an active connection to Redis forks and creates child processes taht also want to talk to Redis. Prior to this change there were a number of potential conflicts that could cause this to fail. Retrieving a connection from the pool and releasing a connection back to the pool check the current proceeses PID. If it's different than the PID that created the pool, reset() is called to get a fresh set of connections for the current process. However in doing so, pool.disconnect() was caused which closes the file descriptors that the parent may still be using. Further when the available_connections and in_use_connections lists are reset, all of those connections inherited from the parent are GC'd and the connection's `__del__` was called, which also closed the socket and file descriptor. This change prevents pool.disconnect() from being called when a pid is changed. It also removes the `__del__` destructor from connections. Neither of these are necessary or practical. Child processes still reset() their copy of the pool when first accessed causing their own connections to be created. `ConnectionPool.disconnect()` now checks the current process ID so that a child or parent can't disconnect the other's connections. Additionally, `Connection.disconnect()` now checks the current process ID and only calls `socket.shutdown()` if `disconnect()` is called by the same process that created the connection. This allows for a child process that inherited a connection to call `Connection.disconnect()` and not shutdown the parent's copy of the socket. Fixes #863 Fixes #784 Fixes #732 Fixes #1085 Fixes #504 --- redis/connection.py | 12 +++----- tests/test_multiprocessing.py | 66 +++++++++++++++++++++++++++++-------------- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index c81e4c1..ee0b92a 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -471,12 +471,6 @@ class Connection(object): def __repr__(self): return self.description_format % self._description_args - def __del__(self): - try: - self.disconnect() - except Exception: - pass - def register_connect_callback(self, callback): self._connect_callbacks.append(callback) @@ -580,7 +574,8 @@ class Connection(object): if self._sock is None: return try: - self._sock.shutdown(socket.SHUT_RDWR) + if os.getpid() == self.pid: + self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: pass @@ -973,7 +968,6 @@ class ConnectionPool(object): # another thread already did the work while we waited # on the lock. return - self.disconnect() self.reset() def get_connection(self, command_name, *keys, **options): @@ -1012,6 +1006,7 @@ class ConnectionPool(object): def disconnect(self): "Disconnects all connections in the pool" + self._checkpid() all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: @@ -1133,5 +1128,6 @@ class BlockingConnectionPool(ConnectionPool): def disconnect(self): "Disconnects all connections in the pool." + self._checkpid() for connection in self._connections: connection.disconnect() diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index dae35bc..bb31a06 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -18,13 +18,17 @@ class TestMultiprocessing(object): # Test connection sharing between forks. # See issue #1085 for details. - def test_connection(self): + def test_close_connection_in_child(self): + """ + A connection owned by a parent and closed by a child doesn't + destroy the file descriptors so a parent can still use it. + """ conn = Connection() - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' def target(conn): - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' conn.disconnect() @@ -33,20 +37,29 @@ class TestMultiprocessing(object): proc.join(3) assert proc.exitcode is 0 - # Check that connection is still alive after fork process has exited. - with pytest.raises(ConnectionError): - assert conn.send_command('ping') is None - assert conn.read_response() == b'PONG' + # The connection was created in the parent but disconnected in the + # child. The child called socket.close() but did not call + # socket.shutdown() because it wasn't the "owning" process. + # Therefore the connection still works in the parent. + conn.send_command('ping') + assert conn.read_response() == b'PONG' - def test_close_connection_in_main(self): + def test_close_connection_in_parent(self): + """ + A connection owned by a parent is unusable by a child if the parent + (the owning process) closes the connection. + """ conn = Connection() - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' def target(conn, ev): ev.wait() - assert conn.send_command('ping') is None - assert conn.read_response() == b'PONG' + # the parent closed the connection. because it also created the + # connection, the connection is shutdown and the child + # cannot use it. + with pytest.raises(ConnectionError): + conn.send_command('ping') ev = multiprocessing.Event() proc = multiprocessing.Process(target=target, args=(conn, ev)) @@ -56,21 +69,27 @@ class TestMultiprocessing(object): ev.set() proc.join(3) - assert proc.exitcode is 1 + assert proc.exitcode is 0 @pytest.mark.parametrize('max_connections', [1, 2, None]) def test_pool(self, max_connections): + """ + A child will create its own connections when using a pool created + by a parent. + """ pool = ConnectionPool.from_url('redis://localhost', max_connections=max_connections) conn = pool.get_connection('ping') + main_conn_pid = conn.pid with exit_callback(pool.release, conn): - assert conn.send_command('ping') is None + conn.send_command('ping') assert conn.read_response() == b'PONG' def target(pool): with exit_callback(pool.disconnect): conn = pool.get_connection('ping') + assert conn.pid != main_conn_pid with exit_callback(pool.release, conn): assert conn.send_command('ping') is None assert conn.read_response() == b'PONG' @@ -80,15 +99,19 @@ class TestMultiprocessing(object): proc.join(3) assert proc.exitcode is 0 - # Check that connection is still alive after fork process has exited. + # Check that connection is still alive after fork process has exited + # and disconnected the connections in its pool conn = pool.get_connection('ping') with exit_callback(pool.release, conn): - with pytest.raises(ConnectionError): - assert conn.send_command('ping') is None - assert conn.read_response() == b'PONG' + assert conn.send_command('ping') is None + assert conn.read_response() == b'PONG' @pytest.mark.parametrize('max_connections', [1, 2, None]) def test_close_pool_in_main(self, max_connections): + """ + A child process that uses the same pool as its parent isn't affected + when the parent disconnects all connections within the pool. + """ pool = ConnectionPool.from_url('redis://localhost', max_connections=max_connections) @@ -115,12 +138,13 @@ class TestMultiprocessing(object): proc.join(3) assert proc.exitcode is 0 - def test_redis(self, r): + def test_redis_client(self, r): + "A redis client created in a parent can also be used in a child" assert r.ping() is True - def target(redis): - assert redis.ping() is True - del redis + def target(client): + assert client.ping() is True + del client proc = multiprocessing.Process(target=target, args=(r,)) proc.start() -- cgit v1.2.1