summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-01-31 19:30:41 -0800
committerAndy McCurdy <andy@andymccurdy.com>2019-01-31 19:30:41 -0800
commit4e1e74809235edc19e03edb79c97c80a3e4e9eca (patch)
treeac55201f0922cffffa9c0454160cb0d99c8eb8bf
parente24e9770eb9e27453b52c433366cd8a033640cb4 (diff)
downloadredis-py-4e1e74809235edc19e03edb79c97c80a3e4e9eca.tar.gz
Improve how connection pools operate in forked/child proceeses.
Sometimes a process with an active connection to Redis forks and creates child processes taht also want to talk to Redis. Prior to this change there were a number of potential conflicts that could cause this to fail. Retrieving a connection from the pool and releasing a connection back to the pool check the current proceeses PID. If it's different than the PID that created the pool, reset() is called to get a fresh set of connections for the current process. However in doing so, pool.disconnect() was caused which closes the file descriptors that the parent may still be using. Further when the available_connections and in_use_connections lists are reset, all of those connections inherited from the parent are GC'd and the connection's `__del__` was called, which also closed the socket and file descriptor. This change prevents pool.disconnect() from being called when a pid is changed. It also removes the `__del__` destructor from connections. Neither of these are necessary or practical. Child processes still reset() their copy of the pool when first accessed causing their own connections to be created. `ConnectionPool.disconnect()` now checks the current process ID so that a child or parent can't disconnect the other's connections. Additionally, `Connection.disconnect()` now checks the current process ID and only calls `socket.shutdown()` if `disconnect()` is called by the same process that created the connection. This allows for a child process that inherited a connection to call `Connection.disconnect()` and not shutdown the parent's copy of the socket. Fixes #863 Fixes #784 Fixes #732 Fixes #1085 Fixes #504
-rwxr-xr-xredis/connection.py12
-rw-r--r--tests/test_multiprocessing.py66
2 files changed, 49 insertions, 29 deletions
diff --git a/redis/connection.py b/redis/connection.py
index c81e4c1..ee0b92a 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -471,12 +471,6 @@ class Connection(object):
def __repr__(self):
return self.description_format % self._description_args
- def __del__(self):
- try:
- self.disconnect()
- except Exception:
- pass
-
def register_connect_callback(self, callback):
self._connect_callbacks.append(callback)
@@ -580,7 +574,8 @@ class Connection(object):
if self._sock is None:
return
try:
- self._sock.shutdown(socket.SHUT_RDWR)
+ if os.getpid() == self.pid:
+ self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error:
pass
@@ -973,7 +968,6 @@ class ConnectionPool(object):
# another thread already did the work while we waited
# on the lock.
return
- self.disconnect()
self.reset()
def get_connection(self, command_name, *keys, **options):
@@ -1012,6 +1006,7 @@ class ConnectionPool(object):
def disconnect(self):
"Disconnects all connections in the pool"
+ self._checkpid()
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
@@ -1133,5 +1128,6 @@ class BlockingConnectionPool(ConnectionPool):
def disconnect(self):
"Disconnects all connections in the pool."
+ self._checkpid()
for connection in self._connections:
connection.disconnect()
diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py
index dae35bc..bb31a06 100644
--- a/tests/test_multiprocessing.py
+++ b/tests/test_multiprocessing.py
@@ -18,13 +18,17 @@ class TestMultiprocessing(object):
# Test connection sharing between forks.
# See issue #1085 for details.
- def test_connection(self):
+ def test_close_connection_in_child(self):
+ """
+ A connection owned by a parent and closed by a child doesn't
+ destroy the file descriptors so a parent can still use it.
+ """
conn = Connection()
- assert conn.send_command('ping') is None
+ conn.send_command('ping')
assert conn.read_response() == b'PONG'
def target(conn):
- assert conn.send_command('ping') is None
+ conn.send_command('ping')
assert conn.read_response() == b'PONG'
conn.disconnect()
@@ -33,20 +37,29 @@ class TestMultiprocessing(object):
proc.join(3)
assert proc.exitcode is 0
- # Check that connection is still alive after fork process has exited.
- with pytest.raises(ConnectionError):
- assert conn.send_command('ping') is None
- assert conn.read_response() == b'PONG'
+ # The connection was created in the parent but disconnected in the
+ # child. The child called socket.close() but did not call
+ # socket.shutdown() because it wasn't the "owning" process.
+ # Therefore the connection still works in the parent.
+ conn.send_command('ping')
+ assert conn.read_response() == b'PONG'
- def test_close_connection_in_main(self):
+ def test_close_connection_in_parent(self):
+ """
+ A connection owned by a parent is unusable by a child if the parent
+ (the owning process) closes the connection.
+ """
conn = Connection()
- assert conn.send_command('ping') is None
+ conn.send_command('ping')
assert conn.read_response() == b'PONG'
def target(conn, ev):
ev.wait()
- assert conn.send_command('ping') is None
- assert conn.read_response() == b'PONG'
+ # the parent closed the connection. because it also created the
+ # connection, the connection is shutdown and the child
+ # cannot use it.
+ with pytest.raises(ConnectionError):
+ conn.send_command('ping')
ev = multiprocessing.Event()
proc = multiprocessing.Process(target=target, args=(conn, ev))
@@ -56,21 +69,27 @@ class TestMultiprocessing(object):
ev.set()
proc.join(3)
- assert proc.exitcode is 1
+ assert proc.exitcode is 0
@pytest.mark.parametrize('max_connections', [1, 2, None])
def test_pool(self, max_connections):
+ """
+ A child will create its own connections when using a pool created
+ by a parent.
+ """
pool = ConnectionPool.from_url('redis://localhost',
max_connections=max_connections)
conn = pool.get_connection('ping')
+ main_conn_pid = conn.pid
with exit_callback(pool.release, conn):
- assert conn.send_command('ping') is None
+ conn.send_command('ping')
assert conn.read_response() == b'PONG'
def target(pool):
with exit_callback(pool.disconnect):
conn = pool.get_connection('ping')
+ assert conn.pid != main_conn_pid
with exit_callback(pool.release, conn):
assert conn.send_command('ping') is None
assert conn.read_response() == b'PONG'
@@ -80,15 +99,19 @@ class TestMultiprocessing(object):
proc.join(3)
assert proc.exitcode is 0
- # Check that connection is still alive after fork process has exited.
+ # Check that connection is still alive after fork process has exited
+ # and disconnected the connections in its pool
conn = pool.get_connection('ping')
with exit_callback(pool.release, conn):
- with pytest.raises(ConnectionError):
- assert conn.send_command('ping') is None
- assert conn.read_response() == b'PONG'
+ assert conn.send_command('ping') is None
+ assert conn.read_response() == b'PONG'
@pytest.mark.parametrize('max_connections', [1, 2, None])
def test_close_pool_in_main(self, max_connections):
+ """
+ A child process that uses the same pool as its parent isn't affected
+ when the parent disconnects all connections within the pool.
+ """
pool = ConnectionPool.from_url('redis://localhost',
max_connections=max_connections)
@@ -115,12 +138,13 @@ class TestMultiprocessing(object):
proc.join(3)
assert proc.exitcode is 0
- def test_redis(self, r):
+ def test_redis_client(self, r):
+ "A redis client created in a parent can also be used in a child"
assert r.ping() is True
- def target(redis):
- assert redis.ping() is True
- del redis
+ def target(client):
+ assert client.ping() is True
+ del client
proc = multiprocessing.Process(target=target, args=(r,))
proc.start()