diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2020-01-27 11:13:14 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-01-28 22:51:06 +0000 |
commit | 681631228dbaa98e00d1ea6d98c00662ef293a2b (patch) | |
tree | 087ad02de3606d57317764bc34d70ff23aa1cfd4 /src/mongo/db | |
parent | f2ea3303ab1116992442102c03371b9e50de789b (diff) | |
download | mongo-681631228dbaa98e00d1ea6d98c00662ef293a2b.tar.gz |
SERVER-45503 Arrange events so cloner onCompletion is never run with lock held
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/base_cloner.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner.h | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 60 |
3 files changed, 57 insertions, 36 deletions
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp index 40a17c02a0c..68411039509 100644 --- a/src/mongo/db/repl/base_cloner.cpp +++ b/src/mongo/db/repl/base_cloner.cpp @@ -258,7 +258,8 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* } } -Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) { +std::pair<Future<void>, TaskExecutor::EventHandle> BaseCloner::runOnExecutorEvent( + TaskExecutor* executor) { { stdx::lock_guard<Latch> lk(_mutex); invariant(!_active && !_startedAsync); @@ -286,11 +287,18 @@ Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) { return status; }); }; - auto cbhStatus = executor->scheduleWork(callback); - if (!cbhStatus.isOK()) { - _promise.setError(cbhStatus.getStatus()); + TaskExecutor::EventHandle event; + auto statusEvent = executor->makeEvent(); + if (!statusEvent.isOK()) { + _promise.setError(statusEvent.getStatus()); + } else { + event = statusEvent.getValue(); + auto cbhStatus = executor->onEvent(event, callback); + if (!cbhStatus.isOK()) { + _promise.setError(cbhStatus.getStatus()); + } } - return std::move(pf.future); + return std::make_pair(std::move(pf.future), event); } diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h index 7efdd9e640b..38079eeebac 100644 --- a/src/mongo/db/repl/base_cloner.h +++ b/src/mongo/db/repl/base_cloner.h @@ -64,10 +64,15 @@ public: Status run(); /** - * Executes the run() method asychronously on the given taskExecutor, returning the result - * as a Future. + * Executes the run() method asychronously on the given taskExecutor when the event is + * signalled, returning the result as a Future. + * + * If the executor is valid, the Future is guaranteed to not be ready until the event is + * signalled. If the executor is not valid (e.g. shutting down), the future will be + * ready immediately after the call and the EventHandle will be invalid. */ - Future<void> runOnExecutor(executor::TaskExecutor* executor); + std::pair<Future<void>, executor::TaskExecutor::EventHandle> runOnExecutorEvent( + executor::TaskExecutor* executor); /** * For unit testing, allow stopping after any given stage. @@ -258,10 +263,10 @@ private: // invariant checking. bool _active = false; // (M) Status _status = Status::OK(); // (M) - // _startedAsync indicates the cloner is being run on some executor using runOnExecutor(), + // _startedAsync indicates the cloner is being run on some executor using runOnExecutorEvent(), // and is used only for invariant checking. bool _startedAsync = false; // (M) - // _promise corresponds to the Future returned by runOnExecutor(). When not running + // _promise corresponds to the Future returned by runOnExecutorEvent(). When not running // asynchronously, this is a null promise. Promise<void> _promise; // (S) // _stopAfterStage is used for unit testing and causes the cloner to exit after a given diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 0c9bda496d3..6914434ccfb 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -1075,35 +1075,43 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> LOG(2) << "Starting AllDatabaseCloner: " << _initialSyncState->allDatabaseCloner->toString(); - _initialSyncState->allDatabaseClonerFuture = - _initialSyncState->allDatabaseCloner->runOnExecutor(_clonerExec) - .onCompletion([this, onCompletionGuard](Status status) mutable { - // The completion guard must run on the main executor. This only makes a difference - // for unit tests, but we always schedule it that way to avoid special casing test - // code. - stdx::unique_lock<Latch> lock(_mutex); - auto exec_status = _exec->scheduleWork( - [this, status, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { - _allDatabaseClonerCallback(status, onCompletionGuard); - }); - if (!exec_status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock( - lock, exec_status.getStatus()); - // In the shutdown case, it is possible the completion guard will be run - // from this thread (since the lambda holding another copy didn't schedule). - // If it does, we will self-deadlock if we're holding the lock, so release it. - lock.unlock(); - } - // In unit tests, this reset ensures the completion guard does not run during the - // destruction of the lambda (which occurs on the wrong executor), except in the - // shutdown case. - onCompletionGuard.reset(); - }); - - if (!status.isOK()) { + auto [startClonerFuture, startCloner] = + _initialSyncState->allDatabaseCloner->runOnExecutorEvent(_clonerExec); + // runOnExecutorEvent ensures the future is not ready unless an error has occurred. + if (startClonerFuture.isReady()) { + status = startClonerFuture.getNoThrow(); + invariant(!status.isOK()); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; } + _initialSyncState->allDatabaseClonerFuture = + std::move(startClonerFuture).onCompletion([this, onCompletionGuard](Status status) mutable { + // The completion guard must run on the main executor, and never inline. In unit tests, + // without the executor call, it would run on the wrong executor. In both production + // and in unit tests, if the cloner finishes very quickly, the callback could run + // in-line and result in self-deadlock. + stdx::unique_lock<Latch> lock(_mutex); + auto exec_status = _exec->scheduleWork( + [this, status, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { + _allDatabaseClonerCallback(status, onCompletionGuard); + }); + if (!exec_status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, + exec_status.getStatus()); + // In the shutdown case, it is possible the completion guard will be run + // from this thread (since the lambda holding another copy didn't schedule). + // If it does, we will self-deadlock if we're holding the lock, so release it. + lock.unlock(); + } + // In unit tests, this reset ensures the completion guard does not run during the + // destruction of the lambda (which occurs on the wrong executor), except in the + // shutdown case. + onCompletionGuard.reset(); + }); + lock.unlock(); + // Start (and therefore finish) the cloners outside the lock. This ensures onCompletion + // is not run with the mutex held, which would result in self-deadlock. + _clonerExec->signalEvent(startCloner); } void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus, |