summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2022-09-12 15:59:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 18:08:16 +0000
commit8f1e14b4361f3b1478f6bd618af8f9a82a8de3b8 (patch)
tree92df6291078fea8bfcda8df0a836fb602d932ed5
parentf0aa22b7dd7277b5d2546574e6d9ac4fd27cc7b4 (diff)
downloadmongo-8f1e14b4361f3b1478f6bd618af8f9a82a8de3b8.tar.gz
SERVER-68680 Avoid deadlocks when recursively invoking `opportunisticRead/Write`
(cherry picked from commit e884fbf3ac392ca01d080f68beb96328c7ae4c60)
-rw-r--r--src/mongo/transport/session_asio.cpp5
-rw-r--r--src/mongo/transport/session_asio.h5
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp25
3 files changed, 33 insertions, 2 deletions
diff --git a/src/mongo/transport/session_asio.cpp b/src/mongo/transport/session_asio.cpp
index 74e4f42c64d..ad3f934dd8f 100644
--- a/src/mongo/transport/session_asio.cpp
+++ b/src/mongo/transport/session_asio.cpp
@@ -46,6 +46,7 @@ namespace mongo::transport {
MONGO_FAIL_POINT_DEFINE(transportLayerASIOshortOpportunisticReadWrite);
MONGO_FAIL_POINT_DEFINE(transportLayerASIOSessionPauseBeforeSetSocketOption);
MONGO_FAIL_POINT_DEFINE(transportLayerASIOBlockBeforeOpportunisticRead);
+MONGO_FAIL_POINT_DEFINE(transportLayerASIOBlockBeforeAddSession);
namespace {
@@ -583,6 +584,7 @@ Future<void> TransportLayerASIO::ASIOSession::opportunisticRead(
return makeCanceledStatus();
if (auto networkingBaton = baton ? baton->networking() : nullptr;
networkingBaton && networkingBaton->canWait()) {
+ transportLayerASIOBlockBeforeAddSession.pauseWhileSet();
return networkingBaton->addSession(*this, NetworkingBaton::Type::In)
.onError([](Status error) {
if (ErrorCodes::isShutdownError(error)) {
@@ -658,9 +660,10 @@ Future<void> TransportLayerASIO::ASIOSession::opportunisticWrite(Stream& stream,
return makeCanceledStatus();
if (auto networkingBaton = baton ? baton->networking() : nullptr;
networkingBaton && networkingBaton->canWait()) {
+ transportLayerASIOBlockBeforeAddSession.pauseWhileSet();
return networkingBaton->addSession(*this, NetworkingBaton::Type::Out)
.onError([](Status error) {
- if (ErrorCodes::isCancellationError(error)) {
+ if (ErrorCodes::isShutdownError(error)) {
// If the baton has detached, it will cancel its polling. We catch that
// error here and return Status::OK so that we invoke
// opportunisticWrite() again and switch to asio::async_write() below.
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 024fac29a95..6bee5566b98 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -315,8 +315,11 @@ private:
* operation.
* - Holding the mutex while canceling asynchronous operations guarantees no operation can start
* while cancellation is in progress.
+ *
+ * Opportunistic read and write are implemented as recursive future continuations, so we may
+ * recursively acquire the mutex when the future is readied inline.
*/
- stdx::mutex _asyncOpMutex; // NOLINT
+ stdx::recursive_mutex _asyncOpMutex; // NOLINT
};
} // namespace mongo::transport
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index 05e4bf52bdd..2a5c4677254 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -1071,6 +1071,31 @@ TEST_F(BatonASIOLinuxTest, CancelAsyncOperationsInterruptsOngoingOperations) {
ErrorCodes::CallbackCanceled);
}
+TEST_F(BatonASIOLinuxTest, AsyncOpsMakeProgressWhenSessionAddedToDetachedBaton) {
+ Notification<void> ready;
+ auto opCtx = client().makeOperationContext();
+
+ JoinThread thread([&] {
+ auto session = client().session();
+ auto baton = opCtx->getBaton();
+ ready.get();
+ session->asyncSourceMessage(baton).ignoreValue().getAsync([session](Status) {
+ // Capturing `session` is necessary as parts of this continuation run at fixture
+ // destruction.
+ });
+ });
+
+ FailPointEnableBlock fp("transportLayerASIOBlockBeforeAddSession");
+ ready.set();
+ waitForTimesEntered(fp, 1);
+
+ // Destroying the `opCtx` results in detaching the baton. At this point, the thread running
+ // `asyncSourceMessage` has acquired the mutex that orders asynchronous operations (i.e.,
+ // `asyncOpMutex`) and is blocked by `fp`. Once we return from this function, that thread is
+ // unblocked and will run `Baton::addSession` on a detached baton.
+ opCtx.reset();
+}
+
#endif // __linux__
} // namespace