summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2020-01-27 11:13:14 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-01-28 22:51:06 +0000
commit681631228dbaa98e00d1ea6d98c00662ef293a2b (patch)
tree087ad02de3606d57317764bc34d70ff23aa1cfd4 /src/mongo/db
parentf2ea3303ab1116992442102c03371b9e50de789b (diff)
downloadmongo-681631228dbaa98e00d1ea6d98c00662ef293a2b.tar.gz
SERVER-45503 Arrange events so cloner onCompletion is never run with lock held
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/base_cloner.cpp18
-rw-r--r--src/mongo/db/repl/base_cloner.h15
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp60
3 files changed, 57 insertions, 36 deletions
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp
index 40a17c02a0c..68411039509 100644
--- a/src/mongo/db/repl/base_cloner.cpp
+++ b/src/mongo/db/repl/base_cloner.cpp
@@ -258,7 +258,8 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage*
}
}
-Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) {
+std::pair<Future<void>, TaskExecutor::EventHandle> BaseCloner::runOnExecutorEvent(
+ TaskExecutor* executor) {
{
stdx::lock_guard<Latch> lk(_mutex);
invariant(!_active && !_startedAsync);
@@ -286,11 +287,18 @@ Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) {
return status;
});
};
- auto cbhStatus = executor->scheduleWork(callback);
- if (!cbhStatus.isOK()) {
- _promise.setError(cbhStatus.getStatus());
+ TaskExecutor::EventHandle event;
+ auto statusEvent = executor->makeEvent();
+ if (!statusEvent.isOK()) {
+ _promise.setError(statusEvent.getStatus());
+ } else {
+ event = statusEvent.getValue();
+ auto cbhStatus = executor->onEvent(event, callback);
+ if (!cbhStatus.isOK()) {
+ _promise.setError(cbhStatus.getStatus());
+ }
}
- return std::move(pf.future);
+ return std::make_pair(std::move(pf.future), event);
}
diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h
index 7efdd9e640b..38079eeebac 100644
--- a/src/mongo/db/repl/base_cloner.h
+++ b/src/mongo/db/repl/base_cloner.h
@@ -64,10 +64,15 @@ public:
Status run();
/**
- * Executes the run() method asychronously on the given taskExecutor, returning the result
- * as a Future.
+ * Executes the run() method asychronously on the given taskExecutor when the event is
+ * signalled, returning the result as a Future.
+ *
+ * If the executor is valid, the Future is guaranteed to not be ready until the event is
+ * signalled. If the executor is not valid (e.g. shutting down), the future will be
+ * ready immediately after the call and the EventHandle will be invalid.
*/
- Future<void> runOnExecutor(executor::TaskExecutor* executor);
+ std::pair<Future<void>, executor::TaskExecutor::EventHandle> runOnExecutorEvent(
+ executor::TaskExecutor* executor);
/**
* For unit testing, allow stopping after any given stage.
@@ -258,10 +263,10 @@ private:
// invariant checking.
bool _active = false; // (M)
Status _status = Status::OK(); // (M)
- // _startedAsync indicates the cloner is being run on some executor using runOnExecutor(),
+ // _startedAsync indicates the cloner is being run on some executor using runOnExecutorEvent(),
// and is used only for invariant checking.
bool _startedAsync = false; // (M)
- // _promise corresponds to the Future returned by runOnExecutor(). When not running
+ // _promise corresponds to the Future returned by runOnExecutorEvent(). When not running
// asynchronously, this is a null promise.
Promise<void> _promise; // (S)
// _stopAfterStage is used for unit testing and causes the cloner to exit after a given
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 0c9bda496d3..6914434ccfb 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1075,35 +1075,43 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
LOG(2) << "Starting AllDatabaseCloner: " << _initialSyncState->allDatabaseCloner->toString();
- _initialSyncState->allDatabaseClonerFuture =
- _initialSyncState->allDatabaseCloner->runOnExecutor(_clonerExec)
- .onCompletion([this, onCompletionGuard](Status status) mutable {
- // The completion guard must run on the main executor. This only makes a difference
- // for unit tests, but we always schedule it that way to avoid special casing test
- // code.
- stdx::unique_lock<Latch> lock(_mutex);
- auto exec_status = _exec->scheduleWork(
- [this, status, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
- _allDatabaseClonerCallback(status, onCompletionGuard);
- });
- if (!exec_status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(
- lock, exec_status.getStatus());
- // In the shutdown case, it is possible the completion guard will be run
- // from this thread (since the lambda holding another copy didn't schedule).
- // If it does, we will self-deadlock if we're holding the lock, so release it.
- lock.unlock();
- }
- // In unit tests, this reset ensures the completion guard does not run during the
- // destruction of the lambda (which occurs on the wrong executor), except in the
- // shutdown case.
- onCompletionGuard.reset();
- });
-
- if (!status.isOK()) {
+ auto [startClonerFuture, startCloner] =
+ _initialSyncState->allDatabaseCloner->runOnExecutorEvent(_clonerExec);
+ // runOnExecutorEvent ensures the future is not ready unless an error has occurred.
+ if (startClonerFuture.isReady()) {
+ status = startClonerFuture.getNoThrow();
+ invariant(!status.isOK());
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
+ _initialSyncState->allDatabaseClonerFuture =
+ std::move(startClonerFuture).onCompletion([this, onCompletionGuard](Status status) mutable {
+ // The completion guard must run on the main executor, and never inline. In unit tests,
+ // without the executor call, it would run on the wrong executor. In both production
+ // and in unit tests, if the cloner finishes very quickly, the callback could run
+ // in-line and result in self-deadlock.
+ stdx::unique_lock<Latch> lock(_mutex);
+ auto exec_status = _exec->scheduleWork(
+ [this, status, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
+ _allDatabaseClonerCallback(status, onCompletionGuard);
+ });
+ if (!exec_status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
+ exec_status.getStatus());
+ // In the shutdown case, it is possible the completion guard will be run
+ // from this thread (since the lambda holding another copy didn't schedule).
+ // If it does, we will self-deadlock if we're holding the lock, so release it.
+ lock.unlock();
+ }
+ // In unit tests, this reset ensures the completion guard does not run during the
+ // destruction of the lambda (which occurs on the wrong executor), except in the
+ // shutdown case.
+ onCompletionGuard.reset();
+ });
+ lock.unlock();
+ // Start (and therefore finish) the cloners outside the lock. This ensures onCompletion
+ // is not run with the mutex held, which would result in self-deadlock.
+ _clonerExec->signalEvent(startCloner);
}
void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus,