summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-03 00:53:07 -0400
committerBenety Goh <benety@mongodb.com>2019-04-03 00:53:07 -0400
commit6c2bd4b1be257ba7b9335e40c2af18ff25b7fcdd (patch)
treee5e7d931989f7bdacf515e9f53f29d6a3837c6ee
parentddae7b803ed19bf4bc1af1dcf0f8d4e44575736c (diff)
downloadmongo-6c2bd4b1be257ba7b9335e40c2af18ff25b7fcdd.tar.gz
Revert "SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)"
This reverts commit 04ea1d46eb6c4c78e19409f120ae2e61f2a35204.
-rw-r--r--src/mongo/base/error_codes.err4
-rw-r--r--src/mongo/db/concurrency/deferred_writer.cpp6
-rw-r--r--src/mongo/db/index_builds_coordinator_mongod.cpp34
-rw-r--r--src/mongo/db/repl/oplog_test.cpp20
-rw-r--r--src/mongo/db/repl/sync_tail.cpp16
-rw-r--r--src/mongo/db/repl/task_runner.cpp5
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp6
-rw-r--r--src/mongo/db/s/chunk_splitter.cpp9
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp109
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp31
-rw-r--r--src/mongo/dbtests/threadedtests.cpp5
-rw-r--r--src/mongo/executor/connection_pool.cpp9
-rw-r--r--src/mongo/executor/network_interface_thread_pool.cpp11
-rw-r--r--src/mongo/executor/network_interface_thread_pool.h2
-rw-r--r--src/mongo/executor/network_interface_tl.cpp16
-rw-r--r--src/mongo/executor/network_interface_tl.h2
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp125
-rw-r--r--src/mongo/executor/thread_pool_mock.h6
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp28
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp12
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp3
-rw-r--r--src/mongo/transport/service_executor_test.cpp4
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp4
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp24
-rw-r--r--src/mongo/util/concurrency/thread_pool.h2
-rw-r--r--src/mongo/util/concurrency/thread_pool_interface.h7
-rw-r--r--src/mongo/util/concurrency/thread_pool_test.cpp27
-rw-r--r--src/mongo/util/concurrency/thread_pool_test_common.cpp23
-rw-r--r--src/mongo/util/keyed_executor_test.cpp34
-rw-r--r--src/mongo/util/out_of_line_executor.h26
30 files changed, 264 insertions, 346 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index fe682f8eb62..4fa6ca8929b 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -343,10 +343,6 @@ error_class("WriteConcernError", ["WriteConcernFailed",
"UnsatisfiableWriteConcern"])
error_class("ShutdownError", ["ShutdownInProgress", "InterruptedAtShutdown"])
-# isCancelationError() includes all codes that, when passed to a function as its parameter,
-# indicates that it cannot be executed as normal and must abort its intended work.
-error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled"])
-
#TODO SERVER-28679 add checksum failure.
error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag",
"TooManyDocumentSequences"])
diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp
index 2dbda1013c4..3e609009931 100644
--- a/src/mongo/db/concurrency/deferred_writer.cpp
+++ b/src/mongo/db/concurrency/deferred_writer.cpp
@@ -178,11 +178,7 @@ bool DeferredWriter::insertDocument(BSONObj obj) {
// Add the object to the buffer.
_numBytes += obj.objsize();
- _pool->schedule([this, obj](auto status) {
- fassert(40588, status);
-
- _worker(InsertStatement(obj.getOwned()));
- });
+ fassert(40588, _pool->schedule([this, obj] { _worker(InsertStatement(obj.getOwned())); }));
return true;
}
diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp
index 3bd3ac802ec..cf783ab1676 100644
--- a/src/mongo/db/index_builds_coordinator_mongod.cpp
+++ b/src/mongo/db/index_builds_coordinator_mongod.cpp
@@ -154,7 +154,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
opDesc = curOp->opDescription().getOwned();
}
- _threadPool.schedule([
+ Status status = _threadPool.schedule([
this,
buildUUID,
deadline,
@@ -162,23 +162,8 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
writesAreReplicated,
shouldNotConflictWithSecondaryBatchApplication,
logicalOp,
- opDesc,
- replState
- ](auto status) noexcept {
- // Clean up the index build if we failed to schedule it.
- if (!status.isOK()) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- // Unregister the index build before setting the promises,
- // so callers do not see the build again.
- _unregisterIndexBuild(lk, replState);
-
- // Set the promise in case another thread already joined the index build.
- replState->sharedPromise.setError(status);
-
- return;
- }
-
+ opDesc
+ ]() noexcept {
auto opCtx = Client::getCurrent()->makeOperationContext();
opCtx->setDeadlineByDate(deadline, timeoutError);
@@ -205,6 +190,19 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
_runIndexBuild(opCtx.get(), buildUUID);
});
+ // Clean up the index build if we failed to schedule it.
+ if (!status.isOK()) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // Unregister the index build before setting the promises, so callers do not see the build
+ // again.
+ _unregisterIndexBuild(lk, replState);
+
+ // Set the promise in case another thread already joined the index build.
+ replState->sharedPromise.setError(status);
+
+ return status;
+ }
return replState->sharedPromise.getFuture();
}
diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp
index 76fd17eaa5c..9aa8fe62628 100644
--- a/src/mongo/db/repl/oplog_test.cpp
+++ b/src/mongo/db/repl/oplog_test.cpp
@@ -156,8 +156,12 @@ void _checkOplogEntry(const OplogEntry& oplogEntry,
* the contents of the oplog collection.
*/
using OpTimeNamespaceStringMap = std::map<OpTime, NamespaceString>;
-template <typename F>
-void _testConcurrentLogOp(const F& makeTaskFunction,
+using MakeTaskFunction =
+ stdx::function<ThreadPoolInterface::Task(const NamespaceString& nss,
+ stdx::mutex* mtx,
+ OpTimeNamespaceStringMap* opTimeNssMap,
+ unittest::Barrier* barrier)>;
+void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction,
OpTimeNamespaceStringMap* opTimeNssMap,
std::vector<OplogEntry>* oplogEntries,
std::size_t expectedNumOplogEntries) {
@@ -177,14 +181,10 @@ void _testConcurrentLogOp(const F& makeTaskFunction,
unittest::Barrier barrier(3U);
const NamespaceString nss1("test1.coll");
const NamespaceString nss2("test2.coll");
- pool.schedule([&](auto status) mutable {
- ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss1;
- makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)();
- });
- pool.schedule([&](auto status) mutable {
- ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss2;
- makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)();
- });
+ ASSERT_OK(pool.schedule(makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)))
+ << "Failed to schedule logOp() task for namespace " << nss1;
+ ASSERT_OK(pool.schedule(makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)))
+ << "Failed to schedule logOp() task for namespace " << nss2;
barrier.countDownAndWait();
// Shut thread pool down.
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 08e942a956e..399bede7329 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -418,9 +418,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
// The returned function will be run in a separate thread after this returns. Therefore all
// captures other than 'ops' must be by value since they will not be available. The caller
// guarantees that 'ops' will stay in scope until the spawned threads complete.
- return [storageInterface, &ops, begin, end](auto status) {
- invariant(status);
-
+ return [storageInterface, &ops, begin, end] {
auto opCtx = cc().makeOperationContext();
UnreplicatedWritesBlock uwb(opCtx.get());
ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
@@ -455,7 +453,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
if (!enoughToMultiThread ||
!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
- threadPool->schedule(makeOplogWriterForRange(0, ops.size()));
+ invariant(threadPool->schedule(makeOplogWriterForRange(0, ops.size())));
return;
}
@@ -465,7 +463,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
for (size_t thread = 0; thread < numOplogThreads; thread++) {
size_t begin = thread * numOpsPerThread;
size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread;
- threadPool->schedule(makeOplogWriterForRange(begin, end));
+ invariant(threadPool->schedule(makeOplogWriterForRange(begin, end)));
}
}
@@ -1358,18 +1356,16 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors
if (writerVectors[i].empty())
continue;
- _writerPool->schedule([
+ invariant(_writerPool->schedule([
this,
&writer = writerVectors.at(i),
&status = statusVector->at(i),
&workerMultikeyPathInfo = workerMultikeyPathInfo->at(i)
- ](auto scheduleStatus) {
- invariant(scheduleStatus);
-
+ ] {
auto opCtx = cc().makeOperationContext();
status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown(
[&] { return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo); });
- });
+ }));
}
}
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index b4036617a57..2226ad2bcba 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -113,10 +113,7 @@ void TaskRunner::schedule(Task task) {
return;
}
- _threadPool->schedule([this](auto status) {
- invariant(status);
- _runTasks();
- });
+ invariant(_threadPool->schedule([this] { _runTasks(); }));
_active = true;
_cancelRequested = false;
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
index 89ea61c3697..9b9b52870cc 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -99,9 +99,7 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
- _threadPool.schedule([ this, notify, callbackFn ](auto status) noexcept {
- invariant(status);
-
+ uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept {
auto opCtx = Client::getCurrent()->makeOperationContext();
auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
@@ -123,7 +121,7 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
callbackFn(opCtx.get(), std::move(swCollAndChunks));
notify->set();
- });
+ }));
return notify;
}
diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp
index cfe972510a7..3495a01ac05 100644
--- a/src/mongo/db/s/chunk_splitter.cpp
+++ b/src/mongo/db/s/chunk_splitter.cpp
@@ -272,13 +272,10 @@ void ChunkSplitter::trySplitting(std::shared_ptr<ChunkSplitStateDriver> chunkSpl
if (!_isPrimary) {
return;
}
- _threadPool.schedule(
- [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ](
- auto status) noexcept {
- invariant(status);
-
+ uassertStatusOK(_threadPool.schedule(
+ [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ]() noexcept {
_runAutosplit(csd, nss, min, max, dataWritten);
- });
+ }));
}
void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSplitStateDriver,
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 4384aeeed72..36700332edb 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -394,10 +394,8 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
return std::make_tuple(_role == ReplicaSetRole::Primary, _term);
}();
- _threadPool.schedule(
- [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept {
- invariant(status);
-
+ uassertStatusOK(_threadPool.schedule(
+ [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept {
auto context = _contexts.makeOperationContext(*Client::getCurrent());
auto const opCtx = context.opCtx();
@@ -422,7 +420,7 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
callbackFn(opCtx, ex.toStatus());
notify->set();
}
- });
+ }));
return notify;
}
@@ -443,38 +441,36 @@ void ShardServerCatalogCacheLoader::getDatabase(
isPrimary = (_role == ReplicaSetRole::Primary);
}
- _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ](
- auto status) noexcept {
- invariant(status);
-
- auto context = _contexts.makeOperationContext(*Client::getCurrent());
-
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ uassertStatusOK(_threadPool.schedule(
+ [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept {
+ auto context = _contexts.makeOperationContext(*Client::getCurrent());
- // We may have missed an OperationContextGroup interrupt since this operation began
- // but before the OperationContext was added to the group. So we'll check that
- // we're still in the same _term.
- if (_term != currentTerm) {
- callbackFn(context.opCtx(),
- Status{ErrorCodes::Interrupted,
- "Unable to refresh routing table because replica set state "
- "changed or node is shutting down."});
- return;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // We may have missed an OperationContextGroup interrupt since this operation began
+ // but before the OperationContext was added to the group. So we'll check that
+ // we're still in the same _term.
+ if (_term != currentTerm) {
+ callbackFn(context.opCtx(),
+ Status{ErrorCodes::Interrupted,
+ "Unable to refresh routing table because replica set state "
+ "changed or node is shutting down."});
+ return;
+ }
}
- }
- try {
- if (isPrimary) {
- _schedulePrimaryGetDatabase(
- context.opCtx(), StringData(name), currentTerm, callbackFn);
- } else {
- _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn);
+ try {
+ if (isPrimary) {
+ _schedulePrimaryGetDatabase(
+ context.opCtx(), StringData(name), currentTerm, callbackFn);
+ } else {
+ _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn);
+ }
+ } catch (const DBException& ex) {
+ callbackFn(context.opCtx(), ex.toStatus());
}
- } catch (const DBException& ex) {
- callbackFn(context.opCtx(), ex.toStatus());
- }
- });
+ }));
}
void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
@@ -684,8 +680,9 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
}();
if (termAfterRefresh != termScheduled) {
- // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to
- // attempt the refresh as secondary instead of failing the operation
+ // Raising a ConflictingOperationInProgress error here will cause the
+ // CatalogCache to attempt the refresh as secondary instead of failing the
+ // operation
return Status(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Replication stepdown occurred during refresh for '"
<< nss.toString());
@@ -880,11 +877,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun
return;
}
- _threadPool.schedule([this, nss](auto status) {
- invariant(status);
-
- _runCollAndChunksTasks(nss);
- });
+ invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }));
}
void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
@@ -902,11 +895,8 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper
return;
}
- _threadPool.schedule([ this, name = dbName.toString() ](auto status) {
- invariant(status);
-
- _runDbTasks(name);
- });
+ auto name = dbName.toString();
+ invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); }));
}
void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) {
@@ -934,11 +924,15 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString
// Schedule more work if there is any
if (!_collAndChunkTaskLists[nss].empty()) {
- _threadPool.schedule([this, nss](auto status) {
- invariant(status);
-
- _runCollAndChunksTasks(nss);
- });
+ Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
+ if (!status.isOK()) {
+ LOG(0) << "Cache loader failed to schedule a persisted metadata update"
+ << " task for namespace '" << nss << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling will be attempted by the next"
+ << " caller to refresh this namespace.";
+
+ _collAndChunkTaskLists.erase(nss);
+ }
} else {
_collAndChunkTaskLists.erase(nss);
}
@@ -968,11 +962,16 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) {
// Schedule more work if there is any
if (!_dbTaskLists[dbName.toString()].empty()) {
- _threadPool.schedule([ this, name = dbName.toString() ](auto status) {
- invariant(status);
-
- _runDbTasks(name);
- });
+ Status status =
+ _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); });
+ if (!status.isOK()) {
+ LOG(0) << "Cache loader failed to schedule a persisted metadata update"
+ << " task for namespace '" << dbName << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling will be attempted by the next"
+ << " caller to refresh this namespace.";
+
+ _dbTaskLists.erase(dbName.toString());
+ }
} else {
_dbTaskLists.erase(dbName.toString());
}
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index d17280e9ca3..f99cce19ef8 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -80,22 +80,21 @@ void killSessionTokensFunction(
if (sessionKillTokens->empty())
return;
- getThreadPool(opCtx)->schedule(
- [ service = opCtx->getServiceContext(),
- sessionKillTokens = std::move(sessionKillTokens) ](auto status) mutable {
- invariant(status);
-
- auto uniqueClient = service->makeClient("Kill-Session");
- auto uniqueOpCtx = uniqueClient->makeOperationContext();
- const auto opCtx = uniqueOpCtx.get();
- const auto catalog = SessionCatalog::get(opCtx);
-
- for (auto& sessionKillToken : *sessionKillTokens) {
- auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken));
-
- TransactionParticipant::get(session).invalidate(opCtx);
- }
- });
+ uassertStatusOK(getThreadPool(opCtx)->schedule([
+ service = opCtx->getServiceContext(),
+ sessionKillTokens = std::move(sessionKillTokens)
+ ]() mutable {
+ auto uniqueClient = service->makeClient("Kill-Session");
+ auto uniqueOpCtx = uniqueClient->makeOperationContext();
+ const auto opCtx = uniqueOpCtx.get();
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ for (auto& sessionKillToken : *sessionKillTokens) {
+ auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken));
+
+ TransactionParticipant::get(session).invalidate(opCtx);
+ }
+ }));
}
} // namespace
diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp
index 5cef4ec9af2..e7afa9d5db4 100644
--- a/src/mongo/dbtests/threadedtests.cpp
+++ b/src/mongo/dbtests/threadedtests.cpp
@@ -130,10 +130,7 @@ public:
tp.startup();
for (unsigned i = 0; i < iterations; i++) {
- tp.schedule([=](auto status) {
- ASSERT_OK(status);
- increment(2);
- });
+ ASSERT_OK(tp.schedule([=] { increment(2); }));
}
tp.waitForIdle();
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 41e4e602c65..c0c4293e505 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -646,9 +646,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// If the host and port were dropped, let this lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
- fulfillRequests(lk);
}
-
+ spawnConnections(lk);
return;
}
@@ -672,8 +671,6 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
- // TODO This should be scheduled on an executor once we have executor-aware pooling
- fulfillRequests(lk);
}
updateStateInLock();
@@ -709,6 +706,8 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
returnConnection(connPtr, std::move(lk));
}));
+
+ fulfillRequests(lk);
}
// Sets state to shutdown and kicks off the failure protocol to tank existing connections
@@ -861,8 +860,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// If the host and port was dropped, let the connection lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
- fulfillRequests(lk);
}
+ spawnConnections(lk);
} else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
// If we've exceeded the time limit, restart the connect, rather than
// failing all operations. We do this because the various callers
diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp
index 787bd0a6dac..0fa5ed7632e 100644
--- a/src/mongo/executor/network_interface_thread_pool.cpp
+++ b/src/mongo/executor/network_interface_thread_pool.cpp
@@ -105,18 +105,17 @@ void NetworkInterfaceThreadPool::join() {
lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); });
}
-void NetworkInterfaceThreadPool::schedule(Task task) {
+Status NetworkInterfaceThreadPool::schedule(Task task) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_inShutdown) {
- lk.unlock();
- task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
- return;
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
-
_tasks.emplace_back(std::move(task));
if (_started)
_consumeTasks(std::move(lk));
+
+ return Status::OK();
}
/**
@@ -163,7 +162,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mut
const auto lkGuard = makeGuard([&] { lk.lock(); });
for (auto&& task : tasks) {
- task(Status::OK());
+ task();
}
tasks.clear();
diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h
index 51771393032..712ad2d6df7 100644
--- a/src/mongo/executor/network_interface_thread_pool.h
+++ b/src/mongo/executor/network_interface_thread_pool.h
@@ -57,7 +57,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
+ Status schedule(Task task) override;
private:
void _consumeTasks(stdx::unique_lock<stdx::mutex> lk);
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index a9529d1c825..eb808735306 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
[ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
- auto connHandle = uassertStatusOK(std::move(swConn));
- return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton);
+ return _onAcquireConn(
+ state, std::move(future), std::move(*uassertStatusOK(swConn)), baton);
})
.onError([](Status error) -> StatusWith<RemoteCommandResponse> {
// The TransportLayer has, for historical reasons returned SocketException for
@@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
- _reactor->schedule([action = std::move(action)](auto status) { action(status); });
+ _reactor->schedule([action = std::move(action)]() { action(Status::OK()); });
return Status::OK();
}
@@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
}
if (when <= now()) {
- _reactor->schedule([action = std::move(action)](auto status) { action(status); });
+ _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); });
return Status::OK();
}
@@ -569,13 +569,7 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
}
// Fulfill the promise on a reactor thread
- _reactor->schedule([state](auto status) {
- if (status.isOK()) {
- state->promise.emplaceValue();
- } else {
- state->promise.setError(status);
- }
- });
+ _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); });
}
bool NetworkInterfaceTL::onNetworkThread() {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 2ab75f49aaa..df634ed3fb4 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -102,7 +102,7 @@ private:
transport::ReactorHandle reactor;
void operator()(ConnectionPool::ConnectionInterface* ptr) const {
- reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); });
+ reactor->dispatch([ ret = returner, ptr ] { ret(ptr); });
}
};
using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index 191537cebff..a2ffbbac2ba 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -44,11 +44,19 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti
ThreadPoolMock::~ThreadPoolMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_joining)
- return;
-
- _shutdown(lk);
- _join(lk);
+ _inShutdown = true;
+ _net->signalWorkAvailable();
+ _net->exitNetwork();
+ if (_started) {
+ if (_worker.joinable()) {
+ lk.unlock();
+ _worker.join();
+ lk.lock();
+ }
+ } else {
+ consumeTasks(&lk);
+ }
+ invariant(_tasks.empty());
}
void ThreadPoolMock::startup() {
@@ -60,87 +68,74 @@ void ThreadPoolMock::startup() {
_worker = stdx::thread([this] {
_options.onCreateThread();
stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- LOG(1) << "Starting to consume tasks";
- while (!_joining) {
- if (_tasks.empty()) {
- lk.unlock();
- _net->waitForWork();
- lk.lock();
- continue;
- }
-
- _consumeOneTask(lk);
- }
- LOG(1) << "Done consuming tasks";
+ consumeTasks(&lk);
});
}
void ThreadPoolMock::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _shutdown(lk);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _net->signalWorkAvailable();
}
void ThreadPoolMock::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _join(lk);
-}
-
-void ThreadPoolMock::schedule(Task task) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ _joining = true;
+ if (_started) {
+ stdx::thread toJoin = std::move(_worker);
+ _net->signalWorkAvailable();
+ _net->exitNetwork();
lk.unlock();
-
- task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
- return;
+ toJoin.join();
+ lk.lock();
+ invariant(_tasks.empty());
+ } else {
+ consumeTasks(&lk);
+ invariant(_tasks.empty());
}
-
- _tasks.emplace_back(std::move(task));
}
-void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) {
- auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
- if (next + 1 != _tasks.size()) {
- std::swap(_tasks[next], _tasks.back());
- }
- Task fn = std::move(_tasks.back());
- _tasks.pop_back();
- lk.unlock();
+Status ThreadPoolMock::schedule(Task task) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_inShutdown) {
- fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
- } else {
- fn(Status::OK());
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
- lk.lock();
-}
-
-void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) {
- LOG(1) << "Shutting down pool";
-
- _inShutdown = true;
- _net->signalWorkAvailable();
+ _tasks.emplace_back(std::move(task));
+ return Status::OK();
}
-void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) {
- LOG(1) << "Joining pool";
+void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) {
+ using std::swap;
- _joining = true;
- _net->signalWorkAvailable();
- _net->exitNetwork();
-
- // Since there is only one worker thread, we need to consume tasks here to potentially
- // unblock that thread.
- while (!_tasks.empty()) {
- _consumeOneTask(lk);
+ LOG(1) << "Starting to consume tasks";
+ while (!(_inShutdown && _tasks.empty())) {
+ if (_tasks.empty()) {
+ lk->unlock();
+ _net->waitForWork();
+ lk->lock();
+ continue;
+ }
+ auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
+ if (next + 1 != _tasks.size()) {
+ swap(_tasks[next], _tasks.back());
+ }
+ Task fn = std::move(_tasks.back());
+ _tasks.pop_back();
+ lk->unlock();
+ fn();
+ lk->lock();
}
+ LOG(1) << "Done consuming tasks";
- if (_started) {
- lk.unlock();
- _worker.join();
- lk.lock();
+ invariant(_tasks.empty());
+
+ while (_started && !_joining) {
+ lk->unlock();
+ _net->waitForWork();
+ lk->lock();
}
- invariant(_tasks.empty());
+ LOG(1) << "Ready to join";
}
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
index e3baaa07273..3d510ffa3a9 100644
--- a/src/mongo/executor/thread_pool_mock.h
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -70,12 +70,10 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
+ Status schedule(Task task) override;
private:
- void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk);
- void _shutdown(stdx::unique_lock<stdx::mutex>& lk);
- void _join(stdx::unique_lock<stdx::mutex>& lk);
+ void consumeTasks(stdx::unique_lock<stdx::mutex>* lk);
// These are the options with which the pool was configured at construction time.
const Options _options;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 9496b88d21f..fefecf9a04e 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -597,9 +597,7 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) {
scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off);
-
- auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); };
- while (!ErrorCodes::isCancelationError(checkStatus().code())) {
+ while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) {
sleepmillis(100);
}
}
@@ -613,24 +611,16 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
cbState->canceled.store(1);
- _pool->schedule([this, cbState](auto status) {
- invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
-
- runCallback(std::move(cbState));
- });
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress);
});
} else {
- _pool->schedule([this, cbState](auto status) {
- if (ErrorCodes::isCancelationError(status.code())) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- cbState->canceled.store(1);
- } else {
- fassert(28735, status);
- }
-
- runCallback(std::move(cbState));
- });
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ if (status == ErrorCodes::ShutdownInProgress)
+ break;
+ fassert(28735, status);
}
}
_net->signalWorkAvailable();
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index 0776a4c3626..977c709eaa3 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -176,9 +176,7 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin
const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
- _threadPool.schedule([ nss, version, notify, callbackFn ](auto status) noexcept {
- invariant(status);
-
+ uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept {
auto opCtx = Client::getCurrent()->makeOperationContext();
auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
@@ -191,7 +189,7 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin
callbackFn(opCtx.get(), std::move(swCollAndChunks));
notify->set();
- });
+ }));
return notify;
}
@@ -199,9 +197,7 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin
void ConfigServerCatalogCacheLoader::getDatabase(
StringData dbName,
stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- _threadPool.schedule([ name = dbName.toString(), callbackFn ](auto status) noexcept {
- invariant(status);
-
+ uassertStatusOK(_threadPool.schedule([ name = dbName.toString(), callbackFn ]() noexcept {
auto opCtx = Client::getCurrent()->makeOperationContext();
auto swDbt = [&]() -> StatusWith<DatabaseType> {
@@ -218,7 +214,7 @@ void ConfigServerCatalogCacheLoader::getDatabase(
}();
callbackFn(opCtx.get(), std::move(swDbt));
- });
+ }));
}
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index edc0cdb0c75..19c1df24d90 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -183,8 +183,7 @@ Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task,
}
auto wrappedTask =
- [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ](
- auto status) {
+ [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ] {
pendingCounterPtr->subtractAndFetch(1);
auto start = _tickSource->getTicks();
_totalSpentQueued.addAndFetch(start - scheduleTime);
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 2c52deafb69..1a987e2242a 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -128,11 +128,11 @@ public:
}
void schedule(Task task) final {
- asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); });
+ asio::post(_ioContext, std::move(task));
}
void dispatch(Task task) final {
- asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); });
+ asio::dispatch(_ioContext, std::move(task));
}
bool onReactorThread() const final {
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 1a4aeafea19..dc0475b801d 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -183,11 +183,11 @@ public:
}
void schedule(Task task) override {
- asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); });
+ asio::post(_ioContext, std::move(task));
}
void dispatch(Task task) override {
- asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); });
+ asio::dispatch(_ioContext, std::move(task));
}
bool onReactorThread() const override {
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
index 5f732c1a1ed..81f1e3c2285 100644
--- a/src/mongo/util/concurrency/thread_pool.cpp
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -185,22 +185,15 @@ void ThreadPool::_drainPendingTasks() {
cleanThread.join();
}
-void ThreadPool::schedule(Task task) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
+Status ThreadPool::schedule(Task task) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
switch (_state) {
case joinRequired:
case joining:
- case shutdownComplete: {
- auto status = Status(ErrorCodes::ShutdownInProgress,
- str::stream() << "Shutdown of thread pool " << _options.poolName
- << " in progress");
-
- lk.unlock();
- task(status);
- return;
- } break;
-
+ case shutdownComplete:
+ return Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Shutdown of thread pool " << _options.poolName
+ << " in progress");
case preStart:
case running:
break;
@@ -209,7 +202,7 @@ void ThreadPool::schedule(Task task) {
}
_pendingTasks.emplace_back(std::move(task));
if (_state == preStart) {
- return;
+ return Status::OK();
}
if (_numIdleThreads < _pendingTasks.size()) {
_startWorkerThread_inlock();
@@ -218,6 +211,7 @@ void ThreadPool::schedule(Task task) {
_lastFullUtilizationDate = Date_t::now();
}
_workAvailable.notify_one();
+ return Status::OK();
}
void ThreadPool::waitForIdle() {
@@ -338,7 +332,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock<stdx::mutex>* lk) noexcept {
_pendingTasks.pop_front();
--_numIdleThreads;
lk->unlock();
- task(Status::OK());
+ task();
lk->lock();
++_numIdleThreads;
if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) {
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index c4873f84dff..370de65056e 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -123,7 +123,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
+ Status schedule(Task task) override;
/**
* Blocks the caller until there are no pending tasks on this pool.
diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h
index bb59ac8e9f8..847dfe0506b 100644
--- a/src/mongo/util/concurrency/thread_pool_interface.h
+++ b/src/mongo/util/concurrency/thread_pool_interface.h
@@ -30,7 +30,6 @@
#pragma once
#include "mongo/util/functional.h"
-#include "mongo/util/out_of_line_executor.h"
namespace mongo {
@@ -39,11 +38,13 @@ class Status;
/**
* Interface for a thread pool.
*/
-class ThreadPoolInterface : public OutOfLineExecutor {
+class ThreadPoolInterface {
ThreadPoolInterface(const ThreadPoolInterface&) = delete;
ThreadPoolInterface& operator=(const ThreadPoolInterface&) = delete;
public:
+ using Task = unique_function<void()>;
+
/**
* Destroys a thread pool.
*
@@ -81,7 +82,7 @@ public:
* It is safe to call this before startup(), but the scheduled task will not execute
* until after startup() is called.
*/
- virtual void schedule(Task task) = 0;
+ virtual Status schedule(Task task) = 0;
protected:
ThreadPoolInterface() = default;
diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp
index b4a650c54bb..012b5ca3ca7 100644
--- a/src/mongo/util/concurrency/thread_pool_test.cpp
+++ b/src/mongo/util/concurrency/thread_pool_test.cpp
@@ -104,17 +104,14 @@ TEST_F(ThreadPoolTest, MinPoolSize0) {
pool.startup();
ASSERT_EQ(0U, pool.getStats().numThreads);
stdx::unique_lock<stdx::mutex> lk(mutex);
- pool.schedule([this](auto status) {
- ASSERT_OK(status);
- blockingWork();
- });
+ ASSERT_OK(pool.schedule([this] { blockingWork(); }));
while (count1 != 1U) {
cv1.wait(lk);
}
auto stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numPendingTasks);
- pool.schedule([](auto status) { ASSERT_OK(status); });
+ ASSERT_OK(pool.schedule([] {}));
stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numIdleThreads);
@@ -132,10 +129,7 @@ TEST_F(ThreadPoolTest, MinPoolSize0) {
lk.lock();
flag2 = false;
count1 = 0;
- pool.schedule([this](auto status) {
- ASSERT_OK(status);
- blockingWork();
- });
+ ASSERT_OK(pool.schedule([this] { blockingWork(); }));
while (count1 == 0) {
cv1.wait(lk);
}
@@ -157,10 +151,7 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) {
pool.startup();
stdx::unique_lock<stdx::mutex> lk(mutex);
for (size_t i = 0U; i < 30U; ++i) {
- pool.schedule([this, i](auto status) {
- ASSERT_OK(status) << i;
- blockingWork();
- });
+ ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i;
}
while (count1 < 20U) {
cv1.wait(lk);
@@ -234,10 +225,7 @@ DEATH_TEST(ThreadPoolTest,
sleepmillis(50);
}
stdx::unique_lock<stdx::mutex> lk(mutex);
- pool->schedule([&mutex](auto status) {
- ASSERT_OK(status);
- stdx::lock_guard<stdx::mutex> lk(mutex);
- });
+ ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard<stdx::mutex> lk(mutex); }));
stdx::thread t([&pool] {
pool->shutdown();
pool->join();
@@ -269,10 +257,7 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks)
ThreadPool pool(options);
pool.startup();
- pool.schedule([&barrier](auto status) {
- ASSERT_OK(status);
- barrier.countDownAndWait();
- });
+ ASSERT_OK(pool.schedule([&barrier] { barrier.countDownAndWait(); }));
barrier.countDownAndWait();
ASSERT_TRUE(onCreateThreadCalled);
diff --git a/src/mongo/util/concurrency/thread_pool_test_common.cpp b/src/mongo/util/concurrency/thread_pool_test_common.cpp
index 2c2113bb890..a03da59cd49 100644
--- a/src/mongo/util/concurrency/thread_pool_test_common.cpp
+++ b/src/mongo/util/concurrency/thread_pool_test_common.cpp
@@ -144,7 +144,7 @@ COMMON_THREAD_POOL_TEST(UnusedPool) {
COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) {
auto& pool = getThreadPool();
pool.shutdown();
- pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); });
+ ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([] {}));
}
COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "it has already started") {
@@ -160,9 +160,9 @@ constexpr auto kExceptionMessage = "No good very bad exception";
COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) {
auto& pool = getThreadPool();
pool.startup();
- pool.schedule([](auto status) {
+ ASSERT_OK(pool.schedule([] {
uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage}));
- });
+ }));
pool.shutdown();
pool.join();
}
@@ -177,10 +177,7 @@ COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") {
COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
- pool.schedule([&executed](auto status) {
- ASSERT_OK(status);
- executed = true;
- });
+ ASSERT_OK(pool.schedule([&executed] { executed = true; }));
deleteThreadPool();
ASSERT_EQ(executed, true);
}
@@ -188,10 +185,7 @@ COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) {
COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
- pool.schedule([&executed](auto status) {
- ASSERT_OK(status);
- executed = true;
- });
+ ASSERT_OK(pool.schedule([&executed] { executed = true; }));
pool.shutdown();
pool.join();
ASSERT_EQ(executed, true);
@@ -204,15 +198,12 @@ COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) {
std::size_t n = 0;
stdx::mutex mutex;
stdx::condition_variable condvar;
- func = [&pool, &n, &func, &condvar, &mutex, depth]() {
+ func = [&pool, &n, &func, &condvar, &mutex, depth] {
stdx::unique_lock<stdx::mutex> lk(mutex);
if (n < depth) {
n++;
lk.unlock();
- pool.schedule([&](auto status) {
- ASSERT_OK(status);
- func();
- });
+ ASSERT_OK(pool.schedule(func));
} else {
pool.shutdown();
condvar.notify_one();
diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp
index a3804576eaa..bd507eaf416 100644
--- a/src/mongo/util/keyed_executor_test.cpp
+++ b/src/mongo/util/keyed_executor_test.cpp
@@ -45,7 +45,7 @@ namespace {
class MockExecutor : public OutOfLineExecutor {
public:
- void schedule(Task func) override {
+ void schedule(unique_function<void()> func) override {
_deque.push_front(std::move(func));
}
@@ -60,7 +60,7 @@ public:
auto x = std::move(_deque.back());
_deque.pop_back();
- x(Status::OK());
+ x();
return true;
}
@@ -71,7 +71,27 @@ public:
}
private:
- std::deque<Task> _deque;
+ std::deque<unique_function<void()>> _deque;
+};
+
+class ThreadPoolExecutor : public OutOfLineExecutor {
+public:
+ ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {}
+
+ void start() {
+ _threadPool.startup();
+ }
+
+ void shutdown() {
+ _threadPool.shutdown();
+ }
+
+ void schedule(unique_function<void()> func) override {
+ ASSERT_OK(_threadPool.schedule(std::move(func)));
+ }
+
+private:
+ ThreadPool _threadPool;
};
TEST(KeyedExecutor, basicExecute) {
@@ -312,9 +332,9 @@ TEST(KeyedExecutor, gracefulShutdown) {
}
TEST(KeyedExecutor, withThreadsTest) {
- auto thread_pool = ThreadPool(ThreadPool::Options{});
- KeyedExecutor<int> ke(&thread_pool);
- thread_pool.startup();
+ ThreadPoolExecutor tpe;
+ KeyedExecutor<int> ke(&tpe);
+ tpe.start();
constexpr size_t n = (1 << 16);
@@ -352,7 +372,7 @@ TEST(KeyedExecutor, withThreadsTest) {
stdx::unique_lock<stdx::mutex> lk(mutex);
condvar.wait(lk, [&] { return counter == n; });
- thread_pool.shutdown();
+ tpe.shutdown();
ASSERT_EQUALS(counter, n);
}
diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h
index 0aa5599c2aa..9dc30653860 100644
--- a/src/mongo/util/out_of_line_executor.h
+++ b/src/mongo/util/out_of_line_executor.h
@@ -29,7 +29,6 @@
#pragma once
-#include "mongo/base/status.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/future.h"
@@ -50,7 +49,7 @@ namespace mongo {
*/
class OutOfLineExecutor {
public:
- using Task = unique_function<void(Status)>;
+ using Task = unique_function<void()>;
public:
/**
@@ -60,32 +59,17 @@ public:
*/
template <typename Callback>
Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
- auto[promise, future] = makePromiseFuture<FutureContinuationResult<Callback>>();
+ auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
- schedule([ cb = std::forward<Callback>(cb), p = std::move(promise) ](auto status) mutable {
- if (!status.isOK()) {
- p.setError(status);
- return;
- }
+ schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
p.setWith(std::move(cb));
});
- return std::move(future);
+ return std::move(pf.future);
}
/**
- * Delegates invocation of the Task to this executor
- *
- * Execution of the Task can happen in one of three contexts:
- * * By default, on an execution context maintained by the OutOfLineExecutor (i.e. a thread).
- * * During shutdown, on the execution context of shutdown/join/dtor for the OutOfLineExecutor.
- * * Post-shutdown, on the execution context of the calling code.
- *
- * The Task will be passed a Status schedStatus that is either:
- * * schedStatus.isOK() if the function is run in an out-of-line context
- * * isCancelationError(schedStatus.code()) if the function is run in an inline context
- *
- * All of this is to say: CHECK YOUR STATUS.
+ * Invokes the callback on the executor. This never happens immediately on the caller's stack.
*/
virtual void schedule(Task func) = 0;