summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-03-05 13:40:25 -0500
committerBen Caimano <ben.caimano@10gen.com>2019-04-05 14:28:40 -0400
commita08bac3e29cd9ec3d030623894928e80c2f572dd (patch)
treea82304a07e67909a8d8a40ec08959f4f1c8ccace
parentb1aa248f71ffb197c6576fd90ff7571ee9a96c3f (diff)
downloadmongo-a08bac3e29cd9ec3d030623894928e80c2f572dd.tar.gz
SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)
-rw-r--r--src/mongo/base/error_codes.err4
-rw-r--r--src/mongo/db/concurrency/deferred_writer.cpp6
-rw-r--r--src/mongo/db/index_builds_coordinator_mongod.cpp34
-rw-r--r--src/mongo/db/repl/oplog_test.cpp20
-rw-r--r--src/mongo/db/repl/sync_tail.cpp16
-rw-r--r--src/mongo/db/repl/task_runner.cpp5
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp6
-rw-r--r--src/mongo/db/s/chunk_splitter.cpp9
-rw-r--r--src/mongo/db/s/metadata_manager.cpp8
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp109
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.h20
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp31
-rw-r--r--src/mongo/dbtests/threadedtests.cpp5
-rw-r--r--src/mongo/executor/connection_pool.cpp9
-rw-r--r--src/mongo/executor/network_interface_thread_pool.cpp11
-rw-r--r--src/mongo/executor/network_interface_thread_pool.h2
-rw-r--r--src/mongo/executor/network_interface_tl.cpp16
-rw-r--r--src/mongo/executor/network_interface_tl.h2
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp125
-rw-r--r--src/mongo/executor/thread_pool_mock.h6
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp28
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp12
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp3
-rw-r--r--src/mongo/transport/service_executor_test.cpp4
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp4
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp24
-rw-r--r--src/mongo/util/concurrency/thread_pool.h2
-rw-r--r--src/mongo/util/concurrency/thread_pool_interface.h7
-rw-r--r--src/mongo/util/concurrency/thread_pool_test.cpp27
-rw-r--r--src/mongo/util/concurrency/thread_pool_test_common.cpp23
-rw-r--r--src/mongo/util/keyed_executor_test.cpp34
-rw-r--r--src/mongo/util/out_of_line_executor.h29
32 files changed, 366 insertions, 275 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 4fa6ca8929b..fe682f8eb62 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -343,6 +343,10 @@ error_class("WriteConcernError", ["WriteConcernFailed",
"UnsatisfiableWriteConcern"])
error_class("ShutdownError", ["ShutdownInProgress", "InterruptedAtShutdown"])
+# isCancelationError() includes all codes that, when passed to a function as its parameter,
+# indicates that it cannot be executed as normal and must abort its intended work.
+error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled"])
+
#TODO SERVER-28679 add checksum failure.
error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag",
"TooManyDocumentSequences"])
diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp
index 3e609009931..2dbda1013c4 100644
--- a/src/mongo/db/concurrency/deferred_writer.cpp
+++ b/src/mongo/db/concurrency/deferred_writer.cpp
@@ -178,7 +178,11 @@ bool DeferredWriter::insertDocument(BSONObj obj) {
// Add the object to the buffer.
_numBytes += obj.objsize();
- fassert(40588, _pool->schedule([this, obj] { _worker(InsertStatement(obj.getOwned())); }));
+ _pool->schedule([this, obj](auto status) {
+ fassert(40588, status);
+
+ _worker(InsertStatement(obj.getOwned()));
+ });
return true;
}
diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp
index cf783ab1676..3bd3ac802ec 100644
--- a/src/mongo/db/index_builds_coordinator_mongod.cpp
+++ b/src/mongo/db/index_builds_coordinator_mongod.cpp
@@ -154,7 +154,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
opDesc = curOp->opDescription().getOwned();
}
- Status status = _threadPool.schedule([
+ _threadPool.schedule([
this,
buildUUID,
deadline,
@@ -162,8 +162,23 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
writesAreReplicated,
shouldNotConflictWithSecondaryBatchApplication,
logicalOp,
- opDesc
- ]() noexcept {
+ opDesc,
+ replState
+ ](auto status) noexcept {
+ // Clean up the index build if we failed to schedule it.
+ if (!status.isOK()) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // Unregister the index build before setting the promises,
+ // so callers do not see the build again.
+ _unregisterIndexBuild(lk, replState);
+
+ // Set the promise in case another thread already joined the index build.
+ replState->sharedPromise.setError(status);
+
+ return;
+ }
+
auto opCtx = Client::getCurrent()->makeOperationContext();
opCtx->setDeadlineByDate(deadline, timeoutError);
@@ -190,19 +205,6 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
_runIndexBuild(opCtx.get(), buildUUID);
});
- // Clean up the index build if we failed to schedule it.
- if (!status.isOK()) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- // Unregister the index build before setting the promises, so callers do not see the build
- // again.
- _unregisterIndexBuild(lk, replState);
-
- // Set the promise in case another thread already joined the index build.
- replState->sharedPromise.setError(status);
-
- return status;
- }
return replState->sharedPromise.getFuture();
}
diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp
index 9aa8fe62628..76fd17eaa5c 100644
--- a/src/mongo/db/repl/oplog_test.cpp
+++ b/src/mongo/db/repl/oplog_test.cpp
@@ -156,12 +156,8 @@ void _checkOplogEntry(const OplogEntry& oplogEntry,
* the contents of the oplog collection.
*/
using OpTimeNamespaceStringMap = std::map<OpTime, NamespaceString>;
-using MakeTaskFunction =
- stdx::function<ThreadPoolInterface::Task(const NamespaceString& nss,
- stdx::mutex* mtx,
- OpTimeNamespaceStringMap* opTimeNssMap,
- unittest::Barrier* barrier)>;
-void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction,
+template <typename F>
+void _testConcurrentLogOp(const F& makeTaskFunction,
OpTimeNamespaceStringMap* opTimeNssMap,
std::vector<OplogEntry>* oplogEntries,
std::size_t expectedNumOplogEntries) {
@@ -181,10 +177,14 @@ void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction,
unittest::Barrier barrier(3U);
const NamespaceString nss1("test1.coll");
const NamespaceString nss2("test2.coll");
- ASSERT_OK(pool.schedule(makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)))
- << "Failed to schedule logOp() task for namespace " << nss1;
- ASSERT_OK(pool.schedule(makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)))
- << "Failed to schedule logOp() task for namespace " << nss2;
+ pool.schedule([&](auto status) mutable {
+ ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss1;
+ makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)();
+ });
+ pool.schedule([&](auto status) mutable {
+ ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss2;
+ makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)();
+ });
barrier.countDownAndWait();
// Shut thread pool down.
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 399bede7329..08e942a956e 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -418,7 +418,9 @@ void scheduleWritesToOplog(OperationContext* opCtx,
// The returned function will be run in a separate thread after this returns. Therefore all
// captures other than 'ops' must be by value since they will not be available. The caller
// guarantees that 'ops' will stay in scope until the spawned threads complete.
- return [storageInterface, &ops, begin, end] {
+ return [storageInterface, &ops, begin, end](auto status) {
+ invariant(status);
+
auto opCtx = cc().makeOperationContext();
UnreplicatedWritesBlock uwb(opCtx.get());
ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
@@ -453,7 +455,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
if (!enoughToMultiThread ||
!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
- invariant(threadPool->schedule(makeOplogWriterForRange(0, ops.size())));
+ threadPool->schedule(makeOplogWriterForRange(0, ops.size()));
return;
}
@@ -463,7 +465,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
for (size_t thread = 0; thread < numOplogThreads; thread++) {
size_t begin = thread * numOpsPerThread;
size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread;
- invariant(threadPool->schedule(makeOplogWriterForRange(begin, end)));
+ threadPool->schedule(makeOplogWriterForRange(begin, end));
}
}
@@ -1356,16 +1358,18 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors
if (writerVectors[i].empty())
continue;
- invariant(_writerPool->schedule([
+ _writerPool->schedule([
this,
&writer = writerVectors.at(i),
&status = statusVector->at(i),
&workerMultikeyPathInfo = workerMultikeyPathInfo->at(i)
- ] {
+ ](auto scheduleStatus) {
+ invariant(scheduleStatus);
+
auto opCtx = cc().makeOperationContext();
status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown(
[&] { return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo); });
- }));
+ });
}
}
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index 2226ad2bcba..b4036617a57 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -113,7 +113,10 @@ void TaskRunner::schedule(Task task) {
return;
}
- invariant(_threadPool->schedule([this] { _runTasks(); }));
+ _threadPool->schedule([this](auto status) {
+ invariant(status);
+ _runTasks();
+ });
_active = true;
_cancelRequested = false;
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
index 9b9b52870cc..89ea61c3697 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -99,7 +99,9 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
- uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept {
+ _threadPool.schedule([ this, notify, callbackFn ](auto status) noexcept {
+ invariant(status);
+
auto opCtx = Client::getCurrent()->makeOperationContext();
auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
@@ -121,7 +123,7 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
callbackFn(opCtx.get(), std::move(swCollAndChunks));
notify->set();
- }));
+ });
return notify;
}
diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp
index 3495a01ac05..cfe972510a7 100644
--- a/src/mongo/db/s/chunk_splitter.cpp
+++ b/src/mongo/db/s/chunk_splitter.cpp
@@ -272,10 +272,13 @@ void ChunkSplitter::trySplitting(std::shared_ptr<ChunkSplitStateDriver> chunkSpl
if (!_isPrimary) {
return;
}
- uassertStatusOK(_threadPool.schedule(
- [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ]() noexcept {
+ _threadPool.schedule(
+ [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ](
+ auto status) noexcept {
+ invariant(status);
+
_runAutosplit(csd, nss, min, max, dataWritten);
- }));
+ });
}
void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSplitStateDriver,
diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp
index c816b1c02e7..0d77fa131a2 100644
--- a/src/mongo/db/s/metadata_manager.cpp
+++ b/src/mongo/db/s/metadata_manager.cpp
@@ -143,7 +143,13 @@ void scheduleCleanup(executor::TaskExecutor* executor,
Date_t when) {
LOG(1) << "Scheduling cleanup on " << nss.ns() << " at " << when;
auto swCallbackHandle = executor->scheduleWorkAt(
- when, [ executor, nss = std::move(nss), epoch = std::move(epoch) ](auto&) {
+ when, [ executor, nss = std::move(nss), epoch = std::move(epoch) ](auto& args) {
+ auto& status = args.status;
+ if (ErrorCodes::isCancelationError(status.code())) {
+ return;
+ }
+ invariant(status);
+
ThreadClient tc("Collection-Range-Deleter", getGlobalServiceContext());
auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
auto opCtx = uniqueOpCtx.get();
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 36700332edb..4384aeeed72 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -394,8 +394,10 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
return std::make_tuple(_role == ReplicaSetRole::Primary, _term);
}();
- uassertStatusOK(_threadPool.schedule(
- [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept {
+ _threadPool.schedule(
+ [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept {
+ invariant(status);
+
auto context = _contexts.makeOperationContext(*Client::getCurrent());
auto const opCtx = context.opCtx();
@@ -420,7 +422,7 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
callbackFn(opCtx, ex.toStatus());
notify->set();
}
- }));
+ });
return notify;
}
@@ -441,36 +443,38 @@ void ShardServerCatalogCacheLoader::getDatabase(
isPrimary = (_role == ReplicaSetRole::Primary);
}
- uassertStatusOK(_threadPool.schedule(
- [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept {
- auto context = _contexts.makeOperationContext(*Client::getCurrent());
+ _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ](
+ auto status) noexcept {
+ invariant(status);
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
-
- // We may have missed an OperationContextGroup interrupt since this operation began
- // but before the OperationContext was added to the group. So we'll check that
- // we're still in the same _term.
- if (_term != currentTerm) {
- callbackFn(context.opCtx(),
- Status{ErrorCodes::Interrupted,
- "Unable to refresh routing table because replica set state "
- "changed or node is shutting down."});
- return;
- }
+ auto context = _contexts.makeOperationContext(*Client::getCurrent());
+
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // We may have missed an OperationContextGroup interrupt since this operation began
+ // but before the OperationContext was added to the group. So we'll check that
+ // we're still in the same _term.
+ if (_term != currentTerm) {
+ callbackFn(context.opCtx(),
+ Status{ErrorCodes::Interrupted,
+ "Unable to refresh routing table because replica set state "
+ "changed or node is shutting down."});
+ return;
}
+ }
- try {
- if (isPrimary) {
- _schedulePrimaryGetDatabase(
- context.opCtx(), StringData(name), currentTerm, callbackFn);
- } else {
- _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn);
- }
- } catch (const DBException& ex) {
- callbackFn(context.opCtx(), ex.toStatus());
+ try {
+ if (isPrimary) {
+ _schedulePrimaryGetDatabase(
+ context.opCtx(), StringData(name), currentTerm, callbackFn);
+ } else {
+ _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn);
}
- }));
+ } catch (const DBException& ex) {
+ callbackFn(context.opCtx(), ex.toStatus());
+ }
+ });
}
void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
@@ -680,9 +684,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
}();
if (termAfterRefresh != termScheduled) {
- // Raising a ConflictingOperationInProgress error here will cause the
- // CatalogCache to attempt the refresh as secondary instead of failing the
- // operation
+ // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to
+ // attempt the refresh as secondary instead of failing the operation
return Status(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Replication stepdown occurred during refresh for '"
<< nss.toString());
@@ -877,7 +880,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun
return;
}
- invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }));
+ _threadPool.schedule([this, nss](auto status) {
+ invariant(status);
+
+ _runCollAndChunksTasks(nss);
+ });
}
void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
@@ -895,8 +902,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper
return;
}
- auto name = dbName.toString();
- invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); }));
+ _threadPool.schedule([ this, name = dbName.toString() ](auto status) {
+ invariant(status);
+
+ _runDbTasks(name);
+ });
}
void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) {
@@ -924,15 +934,11 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString
// Schedule more work if there is any
if (!_collAndChunkTaskLists[nss].empty()) {
- Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
- if (!status.isOK()) {
- LOG(0) << "Cache loader failed to schedule a persisted metadata update"
- << " task for namespace '" << nss << "' due to '" << redact(status)
- << "'. Clearing task list so that scheduling will be attempted by the next"
- << " caller to refresh this namespace.";
-
- _collAndChunkTaskLists.erase(nss);
- }
+ _threadPool.schedule([this, nss](auto status) {
+ invariant(status);
+
+ _runCollAndChunksTasks(nss);
+ });
} else {
_collAndChunkTaskLists.erase(nss);
}
@@ -962,16 +968,11 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) {
// Schedule more work if there is any
if (!_dbTaskLists[dbName.toString()].empty()) {
- Status status =
- _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); });
- if (!status.isOK()) {
- LOG(0) << "Cache loader failed to schedule a persisted metadata update"
- << " task for namespace '" << dbName << "' due to '" << redact(status)
- << "'. Clearing task list so that scheduling will be attempted by the next"
- << " caller to refresh this namespace.";
-
- _dbTaskLists.erase(dbName.toString());
- }
+ _threadPool.schedule([ this, name = dbName.toString() ](auto status) {
+ invariant(status);
+
+ _runDbTasks(name);
+ });
} else {
_dbTaskLists.erase(dbName.toString());
}
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h
index c378801ea4c..732395d1ddb 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.h
@@ -79,18 +79,24 @@ public:
auto scheduledWorkHandle = uassertStatusOK(_executor->scheduleWorkAt(
when,
[ this, task = std::forward<Callable>(task), taskCompletionPromise ](
- const executor::TaskExecutor::CallbackArgs&) mutable noexcept {
+ const executor::TaskExecutor::CallbackArgs& args) mutable noexcept {
taskCompletionPromise->setWith([&] {
+ {
+ stdx::lock_guard lk(_mutex);
+ uassertStatusOK(_shutdownStatus);
+ uassertStatusOK(args.status);
+ }
+
ThreadClient tc("TransactionCoordinator", _serviceContext);
- stdx::unique_lock<stdx::mutex> ul(_mutex);
- uassertStatusOK(_shutdownStatus);
- auto uniqueOpCtxIter = _activeOpContexts.emplace(
- _activeOpContexts.begin(), tc->makeOperationContext());
- ul.unlock();
+ auto uniqueOpCtxIter = [&] {
+ stdx::lock_guard lk(_mutex);
+ return _activeOpContexts.emplace(_activeOpContexts.begin(),
+ tc->makeOperationContext());
+ }();
ON_BLOCK_EXIT([&] {
- ul.lock();
+ stdx::lock_guard lk(_mutex);
_activeOpContexts.erase(uniqueOpCtxIter);
// There is no need to call _notifyAllTasksComplete here, because we
// will still have an outstanding _activeHandles entry, so the scheduler
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index f99cce19ef8..d17280e9ca3 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -80,21 +80,22 @@ void killSessionTokensFunction(
if (sessionKillTokens->empty())
return;
- uassertStatusOK(getThreadPool(opCtx)->schedule([
- service = opCtx->getServiceContext(),
- sessionKillTokens = std::move(sessionKillTokens)
- ]() mutable {
- auto uniqueClient = service->makeClient("Kill-Session");
- auto uniqueOpCtx = uniqueClient->makeOperationContext();
- const auto opCtx = uniqueOpCtx.get();
- const auto catalog = SessionCatalog::get(opCtx);
-
- for (auto& sessionKillToken : *sessionKillTokens) {
- auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken));
-
- TransactionParticipant::get(session).invalidate(opCtx);
- }
- }));
+ getThreadPool(opCtx)->schedule(
+ [ service = opCtx->getServiceContext(),
+ sessionKillTokens = std::move(sessionKillTokens) ](auto status) mutable {
+ invariant(status);
+
+ auto uniqueClient = service->makeClient("Kill-Session");
+ auto uniqueOpCtx = uniqueClient->makeOperationContext();
+ const auto opCtx = uniqueOpCtx.get();
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ for (auto& sessionKillToken : *sessionKillTokens) {
+ auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken));
+
+ TransactionParticipant::get(session).invalidate(opCtx);
+ }
+ });
}
} // namespace
diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp
index e7afa9d5db4..5cef4ec9af2 100644
--- a/src/mongo/dbtests/threadedtests.cpp
+++ b/src/mongo/dbtests/threadedtests.cpp
@@ -130,7 +130,10 @@ public:
tp.startup();
for (unsigned i = 0; i < iterations; i++) {
- ASSERT_OK(tp.schedule([=] { increment(2); }));
+ tp.schedule([=](auto status) {
+ ASSERT_OK(status);
+ increment(2);
+ });
}
tp.waitForIdle();
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index c0c4293e505..41e4e602c65 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -646,8 +646,9 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// If the host and port were dropped, let this lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
}
- spawnConnections(lk);
+
return;
}
@@ -671,6 +672,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
+ // TODO This should be scheduled on an executor once we have executor-aware pooling
+ fulfillRequests(lk);
}
updateStateInLock();
@@ -706,8 +709,6 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
returnConnection(connPtr, std::move(lk));
}));
-
- fulfillRequests(lk);
}
// Sets state to shutdown and kicks off the failure protocol to tank existing connections
@@ -860,8 +861,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// If the host and port was dropped, let the connection lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
}
- spawnConnections(lk);
} else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
// If we've exceeded the time limit, restart the connect, rather than
// failing all operations. We do this because the various callers
diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp
index 0fa5ed7632e..787bd0a6dac 100644
--- a/src/mongo/executor/network_interface_thread_pool.cpp
+++ b/src/mongo/executor/network_interface_thread_pool.cpp
@@ -105,17 +105,18 @@ void NetworkInterfaceThreadPool::join() {
lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); });
}
-Status NetworkInterfaceThreadPool::schedule(Task task) {
+void NetworkInterfaceThreadPool::schedule(Task task) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ lk.unlock();
+ task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ return;
}
+
_tasks.emplace_back(std::move(task));
if (_started)
_consumeTasks(std::move(lk));
-
- return Status::OK();
}
/**
@@ -162,7 +163,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mut
const auto lkGuard = makeGuard([&] { lk.lock(); });
for (auto&& task : tasks) {
- task();
+ task(Status::OK());
}
tasks.clear();
diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h
index 712ad2d6df7..51771393032 100644
--- a/src/mongo/executor/network_interface_thread_pool.h
+++ b/src/mongo/executor/network_interface_thread_pool.h
@@ -57,7 +57,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- Status schedule(Task task) override;
+ void schedule(Task task) override;
private:
void _consumeTasks(stdx::unique_lock<stdx::mutex> lk);
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index eb808735306..a9529d1c825 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
[ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
- return _onAcquireConn(
- state, std::move(future), std::move(*uassertStatusOK(swConn)), baton);
+ auto connHandle = uassertStatusOK(std::move(swConn));
+ return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton);
})
.onError([](Status error) -> StatusWith<RemoteCommandResponse> {
// The TransportLayer has, for historical reasons returned SocketException for
@@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
- _reactor->schedule([action = std::move(action)]() { action(Status::OK()); });
+ _reactor->schedule([action = std::move(action)](auto status) { action(status); });
return Status::OK();
}
@@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
}
if (when <= now()) {
- _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); });
+ _reactor->schedule([action = std::move(action)](auto status) { action(status); });
return Status::OK();
}
@@ -569,7 +569,13 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
}
// Fulfill the promise on a reactor thread
- _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); });
+ _reactor->schedule([state](auto status) {
+ if (status.isOK()) {
+ state->promise.emplaceValue();
+ } else {
+ state->promise.setError(status);
+ }
+ });
}
bool NetworkInterfaceTL::onNetworkThread() {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index df634ed3fb4..2ab75f49aaa 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -102,7 +102,7 @@ private:
transport::ReactorHandle reactor;
void operator()(ConnectionPool::ConnectionInterface* ptr) const {
- reactor->dispatch([ ret = returner, ptr ] { ret(ptr); });
+ reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); });
}
};
using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index a2ffbbac2ba..191537cebff 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -44,19 +44,11 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti
ThreadPoolMock::~ThreadPoolMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _net->signalWorkAvailable();
- _net->exitNetwork();
- if (_started) {
- if (_worker.joinable()) {
- lk.unlock();
- _worker.join();
- lk.lock();
- }
- } else {
- consumeTasks(&lk);
- }
- invariant(_tasks.empty());
+ if (_joining)
+ return;
+
+ _shutdown(lk);
+ _join(lk);
}
void ThreadPoolMock::startup() {
@@ -68,74 +60,87 @@ void ThreadPoolMock::startup() {
_worker = stdx::thread([this] {
_options.onCreateThread();
stdx::unique_lock<stdx::mutex> lk(_mutex);
- consumeTasks(&lk);
+
+ LOG(1) << "Starting to consume tasks";
+ while (!_joining) {
+ if (_tasks.empty()) {
+ lk.unlock();
+ _net->waitForWork();
+ lk.lock();
+ continue;
+ }
+
+ _consumeOneTask(lk);
+ }
+ LOG(1) << "Done consuming tasks";
});
}
void ThreadPoolMock::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _net->signalWorkAvailable();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _shutdown(lk);
}
void ThreadPoolMock::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _joining = true;
- if (_started) {
- stdx::thread toJoin = std::move(_worker);
- _net->signalWorkAvailable();
- _net->exitNetwork();
+ _join(lk);
+}
+
+void ThreadPoolMock::schedule(Task task) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_inShutdown) {
lk.unlock();
- toJoin.join();
- lk.lock();
- invariant(_tasks.empty());
- } else {
- consumeTasks(&lk);
- invariant(_tasks.empty());
+
+ task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ return;
}
+
+ _tasks.emplace_back(std::move(task));
}
-Status ThreadPoolMock::schedule(Task task) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) {
+ auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
+ if (next + 1 != _tasks.size()) {
+ std::swap(_tasks[next], _tasks.back());
+ }
+ Task fn = std::move(_tasks.back());
+ _tasks.pop_back();
+ lk.unlock();
if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ } else {
+ fn(Status::OK());
}
- _tasks.emplace_back(std::move(task));
- return Status::OK();
+ lk.lock();
}
-void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) {
- using std::swap;
+void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) {
+ LOG(1) << "Shutting down pool";
- LOG(1) << "Starting to consume tasks";
- while (!(_inShutdown && _tasks.empty())) {
- if (_tasks.empty()) {
- lk->unlock();
- _net->waitForWork();
- lk->lock();
- continue;
- }
- auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
- if (next + 1 != _tasks.size()) {
- swap(_tasks[next], _tasks.back());
- }
- Task fn = std::move(_tasks.back());
- _tasks.pop_back();
- lk->unlock();
- fn();
- lk->lock();
- }
- LOG(1) << "Done consuming tasks";
+ _inShutdown = true;
+ _net->signalWorkAvailable();
+}
- invariant(_tasks.empty());
+void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) {
+ LOG(1) << "Joining pool";
+
+ _joining = true;
+ _net->signalWorkAvailable();
+ _net->exitNetwork();
- while (_started && !_joining) {
- lk->unlock();
- _net->waitForWork();
- lk->lock();
+ // Since there is only one worker thread, we need to consume tasks here to potentially
+ // unblock that thread.
+ while (!_tasks.empty()) {
+ _consumeOneTask(lk);
}
- LOG(1) << "Ready to join";
+ if (_started) {
+ lk.unlock();
+ _worker.join();
+ lk.lock();
+ }
+
+ invariant(_tasks.empty());
}
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
index 3d510ffa3a9..e3baaa07273 100644
--- a/src/mongo/executor/thread_pool_mock.h
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -70,10 +70,12 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- Status schedule(Task task) override;
+ void schedule(Task task) override;
private:
- void consumeTasks(stdx::unique_lock<stdx::mutex>* lk);
+ void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk);
+ void _shutdown(stdx::unique_lock<stdx::mutex>& lk);
+ void _join(stdx::unique_lock<stdx::mutex>& lk);
// These are the options with which the pool was configured at construction time.
const Options _options;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index fefecf9a04e..9496b88d21f 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -597,7 +597,9 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) {
scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off);
- while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) {
+
+ auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); };
+ while (!ErrorCodes::isCancelationError(checkStatus().code())) {
sleepmillis(100);
}
}
@@ -611,16 +613,24 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
cbState->canceled.store(1);
- const auto status =
- _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress);
+ _pool->schedule([this, cbState](auto status) {
+ invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
+
+ runCallback(std::move(cbState));
+ });
});
} else {
- const auto status =
- _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- if (status == ErrorCodes::ShutdownInProgress)
- break;
- fassert(28735, status);
+ _pool->schedule([this, cbState](auto status) {
+ if (ErrorCodes::isCancelationError(status.code())) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ cbState->canceled.store(1);
+ } else {
+ fassert(28735, status);
+ }
+
+ runCallback(std::move(cbState));
+ });
}
}
_net->signalWorkAvailable();
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index 977c709eaa3..0776a4c3626 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -176,7 +176,9 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin
const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
- uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept {
+ _threadPool.schedule([ nss, version, notify, callbackFn ](auto status) noexcept {
+ invariant(status);
+
auto opCtx = Client::getCurrent()->makeOperationContext();
auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
@@ -189,7 +191,7 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin
callbackFn(opCtx.get(), std::move(swCollAndChunks));
notify->set();
- }));
+ });
return notify;
}
@@ -197,7 +199,9 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin
void ConfigServerCatalogCacheLoader::getDatabase(
StringData dbName,
stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- uassertStatusOK(_threadPool.schedule([ name = dbName.toString(), callbackFn ]() noexcept {
+ _threadPool.schedule([ name = dbName.toString(), callbackFn ](auto status) noexcept {
+ invariant(status);
+
auto opCtx = Client::getCurrent()->makeOperationContext();
auto swDbt = [&]() -> StatusWith<DatabaseType> {
@@ -214,7 +218,7 @@ void ConfigServerCatalogCacheLoader::getDatabase(
}();
callbackFn(opCtx.get(), std::move(swDbt));
- }));
+ });
}
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index 19c1df24d90..edc0cdb0c75 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -183,7 +183,8 @@ Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task,
}
auto wrappedTask =
- [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ] {
+ [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ](
+ auto status) {
pendingCounterPtr->subtractAndFetch(1);
auto start = _tickSource->getTicks();
_totalSpentQueued.addAndFetch(start - scheduleTime);
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 1a987e2242a..2c52deafb69 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -128,11 +128,11 @@ public:
}
void schedule(Task task) final {
- asio::post(_ioContext, std::move(task));
+ asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); });
}
void dispatch(Task task) final {
- asio::dispatch(_ioContext, std::move(task));
+ asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); });
}
bool onReactorThread() const final {
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index dc0475b801d..1a4aeafea19 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -183,11 +183,11 @@ public:
}
void schedule(Task task) override {
- asio::post(_ioContext, std::move(task));
+ asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); });
}
void dispatch(Task task) override {
- asio::dispatch(_ioContext, std::move(task));
+ asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); });
}
bool onReactorThread() const override {
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
index 81f1e3c2285..5f732c1a1ed 100644
--- a/src/mongo/util/concurrency/thread_pool.cpp
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -185,15 +185,22 @@ void ThreadPool::_drainPendingTasks() {
cleanThread.join();
}
-Status ThreadPool::schedule(Task task) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void ThreadPool::schedule(Task task) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
switch (_state) {
case joinRequired:
case joining:
- case shutdownComplete:
- return Status(ErrorCodes::ShutdownInProgress,
- str::stream() << "Shutdown of thread pool " << _options.poolName
- << " in progress");
+ case shutdownComplete: {
+ auto status = Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Shutdown of thread pool " << _options.poolName
+ << " in progress");
+
+ lk.unlock();
+ task(status);
+ return;
+ } break;
+
case preStart:
case running:
break;
@@ -202,7 +209,7 @@ Status ThreadPool::schedule(Task task) {
}
_pendingTasks.emplace_back(std::move(task));
if (_state == preStart) {
- return Status::OK();
+ return;
}
if (_numIdleThreads < _pendingTasks.size()) {
_startWorkerThread_inlock();
@@ -211,7 +218,6 @@ Status ThreadPool::schedule(Task task) {
_lastFullUtilizationDate = Date_t::now();
}
_workAvailable.notify_one();
- return Status::OK();
}
void ThreadPool::waitForIdle() {
@@ -332,7 +338,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock<stdx::mutex>* lk) noexcept {
_pendingTasks.pop_front();
--_numIdleThreads;
lk->unlock();
- task();
+ task(Status::OK());
lk->lock();
++_numIdleThreads;
if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) {
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index 370de65056e..c4873f84dff 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -123,7 +123,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- Status schedule(Task task) override;
+ void schedule(Task task) override;
/**
* Blocks the caller until there are no pending tasks on this pool.
diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h
index 847dfe0506b..bb59ac8e9f8 100644
--- a/src/mongo/util/concurrency/thread_pool_interface.h
+++ b/src/mongo/util/concurrency/thread_pool_interface.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/util/functional.h"
+#include "mongo/util/out_of_line_executor.h"
namespace mongo {
@@ -38,13 +39,11 @@ class Status;
/**
* Interface for a thread pool.
*/
-class ThreadPoolInterface {
+class ThreadPoolInterface : public OutOfLineExecutor {
ThreadPoolInterface(const ThreadPoolInterface&) = delete;
ThreadPoolInterface& operator=(const ThreadPoolInterface&) = delete;
public:
- using Task = unique_function<void()>;
-
/**
* Destroys a thread pool.
*
@@ -82,7 +81,7 @@ public:
* It is safe to call this before startup(), but the scheduled task will not execute
* until after startup() is called.
*/
- virtual Status schedule(Task task) = 0;
+ virtual void schedule(Task task) = 0;
protected:
ThreadPoolInterface() = default;
diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp
index 012b5ca3ca7..b4a650c54bb 100644
--- a/src/mongo/util/concurrency/thread_pool_test.cpp
+++ b/src/mongo/util/concurrency/thread_pool_test.cpp
@@ -104,14 +104,17 @@ TEST_F(ThreadPoolTest, MinPoolSize0) {
pool.startup();
ASSERT_EQ(0U, pool.getStats().numThreads);
stdx::unique_lock<stdx::mutex> lk(mutex);
- ASSERT_OK(pool.schedule([this] { blockingWork(); }));
+ pool.schedule([this](auto status) {
+ ASSERT_OK(status);
+ blockingWork();
+ });
while (count1 != 1U) {
cv1.wait(lk);
}
auto stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numPendingTasks);
- ASSERT_OK(pool.schedule([] {}));
+ pool.schedule([](auto status) { ASSERT_OK(status); });
stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numIdleThreads);
@@ -129,7 +132,10 @@ TEST_F(ThreadPoolTest, MinPoolSize0) {
lk.lock();
flag2 = false;
count1 = 0;
- ASSERT_OK(pool.schedule([this] { blockingWork(); }));
+ pool.schedule([this](auto status) {
+ ASSERT_OK(status);
+ blockingWork();
+ });
while (count1 == 0) {
cv1.wait(lk);
}
@@ -151,7 +157,10 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) {
pool.startup();
stdx::unique_lock<stdx::mutex> lk(mutex);
for (size_t i = 0U; i < 30U; ++i) {
- ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i;
+ pool.schedule([this, i](auto status) {
+ ASSERT_OK(status) << i;
+ blockingWork();
+ });
}
while (count1 < 20U) {
cv1.wait(lk);
@@ -225,7 +234,10 @@ DEATH_TEST(ThreadPoolTest,
sleepmillis(50);
}
stdx::unique_lock<stdx::mutex> lk(mutex);
- ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard<stdx::mutex> lk(mutex); }));
+ pool->schedule([&mutex](auto status) {
+ ASSERT_OK(status);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ });
stdx::thread t([&pool] {
pool->shutdown();
pool->join();
@@ -257,7 +269,10 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks)
ThreadPool pool(options);
pool.startup();
- ASSERT_OK(pool.schedule([&barrier] { barrier.countDownAndWait(); }));
+ pool.schedule([&barrier](auto status) {
+ ASSERT_OK(status);
+ barrier.countDownAndWait();
+ });
barrier.countDownAndWait();
ASSERT_TRUE(onCreateThreadCalled);
diff --git a/src/mongo/util/concurrency/thread_pool_test_common.cpp b/src/mongo/util/concurrency/thread_pool_test_common.cpp
index a03da59cd49..2c2113bb890 100644
--- a/src/mongo/util/concurrency/thread_pool_test_common.cpp
+++ b/src/mongo/util/concurrency/thread_pool_test_common.cpp
@@ -144,7 +144,7 @@ COMMON_THREAD_POOL_TEST(UnusedPool) {
COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) {
auto& pool = getThreadPool();
pool.shutdown();
- ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([] {}));
+ pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); });
}
COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "it has already started") {
@@ -160,9 +160,9 @@ constexpr auto kExceptionMessage = "No good very bad exception";
COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) {
auto& pool = getThreadPool();
pool.startup();
- ASSERT_OK(pool.schedule([] {
+ pool.schedule([](auto status) {
uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage}));
- }));
+ });
pool.shutdown();
pool.join();
}
@@ -177,7 +177,10 @@ COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") {
COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
- ASSERT_OK(pool.schedule([&executed] { executed = true; }));
+ pool.schedule([&executed](auto status) {
+ ASSERT_OK(status);
+ executed = true;
+ });
deleteThreadPool();
ASSERT_EQ(executed, true);
}
@@ -185,7 +188,10 @@ COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) {
COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
- ASSERT_OK(pool.schedule([&executed] { executed = true; }));
+ pool.schedule([&executed](auto status) {
+ ASSERT_OK(status);
+ executed = true;
+ });
pool.shutdown();
pool.join();
ASSERT_EQ(executed, true);
@@ -198,12 +204,15 @@ COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) {
std::size_t n = 0;
stdx::mutex mutex;
stdx::condition_variable condvar;
- func = [&pool, &n, &func, &condvar, &mutex, depth] {
+ func = [&pool, &n, &func, &condvar, &mutex, depth]() {
stdx::unique_lock<stdx::mutex> lk(mutex);
if (n < depth) {
n++;
lk.unlock();
- ASSERT_OK(pool.schedule(func));
+ pool.schedule([&](auto status) {
+ ASSERT_OK(status);
+ func();
+ });
} else {
pool.shutdown();
condvar.notify_one();
diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp
index bd507eaf416..a3804576eaa 100644
--- a/src/mongo/util/keyed_executor_test.cpp
+++ b/src/mongo/util/keyed_executor_test.cpp
@@ -45,7 +45,7 @@ namespace {
class MockExecutor : public OutOfLineExecutor {
public:
- void schedule(unique_function<void()> func) override {
+ void schedule(Task func) override {
_deque.push_front(std::move(func));
}
@@ -60,7 +60,7 @@ public:
auto x = std::move(_deque.back());
_deque.pop_back();
- x();
+ x(Status::OK());
return true;
}
@@ -71,27 +71,7 @@ public:
}
private:
- std::deque<unique_function<void()>> _deque;
-};
-
-class ThreadPoolExecutor : public OutOfLineExecutor {
-public:
- ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {}
-
- void start() {
- _threadPool.startup();
- }
-
- void shutdown() {
- _threadPool.shutdown();
- }
-
- void schedule(unique_function<void()> func) override {
- ASSERT_OK(_threadPool.schedule(std::move(func)));
- }
-
-private:
- ThreadPool _threadPool;
+ std::deque<Task> _deque;
};
TEST(KeyedExecutor, basicExecute) {
@@ -332,9 +312,9 @@ TEST(KeyedExecutor, gracefulShutdown) {
}
TEST(KeyedExecutor, withThreadsTest) {
- ThreadPoolExecutor tpe;
- KeyedExecutor<int> ke(&tpe);
- tpe.start();
+ auto thread_pool = ThreadPool(ThreadPool::Options{});
+ KeyedExecutor<int> ke(&thread_pool);
+ thread_pool.startup();
constexpr size_t n = (1 << 16);
@@ -372,7 +352,7 @@ TEST(KeyedExecutor, withThreadsTest) {
stdx::unique_lock<stdx::mutex> lk(mutex);
condvar.wait(lk, [&] { return counter == n; });
- tpe.shutdown();
+ thread_pool.shutdown();
ASSERT_EQUALS(counter, n);
}
diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h
index 9dc30653860..367ee700fcc 100644
--- a/src/mongo/util/out_of_line_executor.h
+++ b/src/mongo/util/out_of_line_executor.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/base/status.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/future.h"
@@ -43,13 +44,10 @@ namespace mongo {
* The contract for scheduling work on an executor is that it never blocks the caller. It doesn't
* necessarily need to offer forward progress guarantees, but actual calls to schedule() should not
* deadlock.
- *
- * As an explicit point of implementation: it will never invoke the passed callback from within the
- * scheduling call.
*/
class OutOfLineExecutor {
public:
- using Task = unique_function<void()>;
+ using Task = unique_function<void(Status)>;
public:
/**
@@ -59,17 +57,32 @@ public:
*/
template <typename Callback>
Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
- auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
+ auto[promise, future] = makePromiseFuture<FutureContinuationResult<Callback>>();
- schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
+ schedule([ cb = std::forward<Callback>(cb), p = std::move(promise) ](auto status) mutable {
+ if (!status.isOK()) {
+ p.setError(status);
+ return;
+ }
p.setWith(std::move(cb));
});
- return std::move(pf.future);
+ return std::move(future);
}
/**
- * Invokes the callback on the executor. This never happens immediately on the caller's stack.
+ * Delegates invocation of the Task to this executor
+ *
+ * Execution of the Task can happen in one of three contexts:
+ * * By default, on an execution context maintained by the OutOfLineExecutor (i.e. a thread).
+ * * During shutdown, on the execution context of shutdown/join/dtor for the OutOfLineExecutor.
+ * * Post-shutdown, on the execution context of the calling code.
+ *
+ * The Task will be passed a Status schedStatus that is either:
+ * * schedStatus.isOK() if the function is run in an out-of-line context
+ * * isCancelationError(schedStatus.code()) if the function is run in an inline context
+ *
+ * All of this is to say: CHECK YOUR STATUS.
*/
virtual void schedule(Task func) = 0;