From 6c2bd4b1be257ba7b9335e40c2af18ff25b7fcdd Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Wed, 3 Apr 2019 00:53:07 -0400 Subject: Revert "SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)" This reverts commit 04ea1d46eb6c4c78e19409f120ae2e61f2a35204. --- src/mongo/base/error_codes.err | 4 - src/mongo/db/concurrency/deferred_writer.cpp | 6 +- src/mongo/db/index_builds_coordinator_mongod.cpp | 34 +++--- src/mongo/db/repl/oplog_test.cpp | 20 ++-- src/mongo/db/repl/sync_tail.cpp | 16 +-- src/mongo/db/repl/task_runner.cpp | 5 +- src/mongo/db/s/catalog_cache_loader_mock.cpp | 6 +- src/mongo/db/s/chunk_splitter.cpp | 9 +- .../db/s/shard_server_catalog_cache_loader.cpp | 109 +++++++++--------- src/mongo/db/session_catalog_mongod.cpp | 31 +++-- src/mongo/dbtests/threadedtests.cpp | 5 +- src/mongo/executor/connection_pool.cpp | 9 +- .../executor/network_interface_thread_pool.cpp | 11 +- src/mongo/executor/network_interface_thread_pool.h | 2 +- src/mongo/executor/network_interface_tl.cpp | 16 +-- src/mongo/executor/network_interface_tl.h | 2 +- src/mongo/executor/thread_pool_mock.cpp | 125 ++++++++++----------- src/mongo/executor/thread_pool_mock.h | 6 +- src/mongo/executor/thread_pool_task_executor.cpp | 28 ++--- src/mongo/s/config_server_catalog_cache_loader.cpp | 12 +- src/mongo/transport/service_executor_adaptive.cpp | 3 +- src/mongo/transport/service_executor_test.cpp | 4 +- src/mongo/transport/transport_layer_asio.cpp | 4 +- src/mongo/util/concurrency/thread_pool.cpp | 24 ++-- src/mongo/util/concurrency/thread_pool.h | 2 +- src/mongo/util/concurrency/thread_pool_interface.h | 7 +- src/mongo/util/concurrency/thread_pool_test.cpp | 27 +---- .../util/concurrency/thread_pool_test_common.cpp | 23 ++-- src/mongo/util/keyed_executor_test.cpp | 34 ++++-- src/mongo/util/out_of_line_executor.h | 26 +---- 30 files changed, 264 insertions(+), 346 deletions(-) diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index fe682f8eb62..4fa6ca8929b 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -343,10 +343,6 @@ error_class("WriteConcernError", ["WriteConcernFailed", "UnsatisfiableWriteConcern"]) error_class("ShutdownError", ["ShutdownInProgress", "InterruptedAtShutdown"]) -# isCancelationError() includes all codes that, when passed to a function as its parameter, -# indicates that it cannot be executed as normal and must abort its intended work. -error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled"]) - #TODO SERVER-28679 add checksum failure. error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag", "TooManyDocumentSequences"]) diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp index 2dbda1013c4..3e609009931 100644 --- a/src/mongo/db/concurrency/deferred_writer.cpp +++ b/src/mongo/db/concurrency/deferred_writer.cpp @@ -178,11 +178,7 @@ bool DeferredWriter::insertDocument(BSONObj obj) { // Add the object to the buffer. _numBytes += obj.objsize(); - _pool->schedule([this, obj](auto status) { - fassert(40588, status); - - _worker(InsertStatement(obj.getOwned())); - }); + fassert(40588, _pool->schedule([this, obj] { _worker(InsertStatement(obj.getOwned())); })); return true; } diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp index 3bd3ac802ec..cf783ab1676 100644 --- a/src/mongo/db/index_builds_coordinator_mongod.cpp +++ b/src/mongo/db/index_builds_coordinator_mongod.cpp @@ -154,7 +154,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, opDesc = curOp->opDescription().getOwned(); } - _threadPool.schedule([ + Status status = _threadPool.schedule([ this, buildUUID, deadline, @@ -162,23 +162,8 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, writesAreReplicated, shouldNotConflictWithSecondaryBatchApplication, logicalOp, - opDesc, - replState - ](auto status) noexcept { - // Clean up the index build if we failed to schedule it. - if (!status.isOK()) { - stdx::unique_lock lk(_mutex); - - // Unregister the index build before setting the promises, - // so callers do not see the build again. - _unregisterIndexBuild(lk, replState); - - // Set the promise in case another thread already joined the index build. - replState->sharedPromise.setError(status); - - return; - } - + opDesc + ]() noexcept { auto opCtx = Client::getCurrent()->makeOperationContext(); opCtx->setDeadlineByDate(deadline, timeoutError); @@ -205,6 +190,19 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, _runIndexBuild(opCtx.get(), buildUUID); }); + // Clean up the index build if we failed to schedule it. + if (!status.isOK()) { + stdx::unique_lock lk(_mutex); + + // Unregister the index build before setting the promises, so callers do not see the build + // again. + _unregisterIndexBuild(lk, replState); + + // Set the promise in case another thread already joined the index build. + replState->sharedPromise.setError(status); + + return status; + } return replState->sharedPromise.getFuture(); } diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index 76fd17eaa5c..9aa8fe62628 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -156,8 +156,12 @@ void _checkOplogEntry(const OplogEntry& oplogEntry, * the contents of the oplog collection. */ using OpTimeNamespaceStringMap = std::map; -template -void _testConcurrentLogOp(const F& makeTaskFunction, +using MakeTaskFunction = + stdx::function; +void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction, OpTimeNamespaceStringMap* opTimeNssMap, std::vector* oplogEntries, std::size_t expectedNumOplogEntries) { @@ -177,14 +181,10 @@ void _testConcurrentLogOp(const F& makeTaskFunction, unittest::Barrier barrier(3U); const NamespaceString nss1("test1.coll"); const NamespaceString nss2("test2.coll"); - pool.schedule([&](auto status) mutable { - ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss1; - makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)(); - }); - pool.schedule([&](auto status) mutable { - ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss2; - makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)(); - }); + ASSERT_OK(pool.schedule(makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier))) + << "Failed to schedule logOp() task for namespace " << nss1; + ASSERT_OK(pool.schedule(makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier))) + << "Failed to schedule logOp() task for namespace " << nss2; barrier.countDownAndWait(); // Shut thread pool down. diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 08e942a956e..399bede7329 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -418,9 +418,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, // The returned function will be run in a separate thread after this returns. Therefore all // captures other than 'ops' must be by value since they will not be available. The caller // guarantees that 'ops' will stay in scope until the spawned threads complete. - return [storageInterface, &ops, begin, end](auto status) { - invariant(status); - + return [storageInterface, &ops, begin, end] { auto opCtx = cc().makeOperationContext(); UnreplicatedWritesBlock uwb(opCtx.get()); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( @@ -455,7 +453,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, if (!enoughToMultiThread || !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { - threadPool->schedule(makeOplogWriterForRange(0, ops.size())); + invariant(threadPool->schedule(makeOplogWriterForRange(0, ops.size()))); return; } @@ -465,7 +463,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, for (size_t thread = 0; thread < numOplogThreads; thread++) { size_t begin = thread * numOpsPerThread; size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread; - threadPool->schedule(makeOplogWriterForRange(begin, end)); + invariant(threadPool->schedule(makeOplogWriterForRange(begin, end))); } } @@ -1358,18 +1356,16 @@ void SyncTail::_applyOps(std::vector& writerVectors if (writerVectors[i].empty()) continue; - _writerPool->schedule([ + invariant(_writerPool->schedule([ this, &writer = writerVectors.at(i), &status = statusVector->at(i), &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i) - ](auto scheduleStatus) { - invariant(scheduleStatus); - + ] { auto opCtx = cc().makeOperationContext(); status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown( [&] { return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo); }); - }); + })); } } diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index b4036617a57..2226ad2bcba 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -113,10 +113,7 @@ void TaskRunner::schedule(Task task) { return; } - _threadPool->schedule([this](auto status) { - invariant(status); - _runTasks(); - }); + invariant(_threadPool->schedule([this] { _runTasks(); })); _active = true; _cancelRequested = false; diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp index 89ea61c3697..9b9b52870cc 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -99,9 +99,7 @@ std::shared_ptr> CatalogCacheLoaderMock::getChunksSince( const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared>(); - _threadPool.schedule([ this, notify, callbackFn ](auto status) noexcept { - invariant(status); - + uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept { auto opCtx = Client::getCurrent()->makeOperationContext(); auto swCollAndChunks = [&]() -> StatusWith { @@ -123,7 +121,7 @@ std::shared_ptr> CatalogCacheLoaderMock::getChunksSince( callbackFn(opCtx.get(), std::move(swCollAndChunks)); notify->set(); - }); + })); return notify; } diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index cfe972510a7..3495a01ac05 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -272,13 +272,10 @@ void ChunkSplitter::trySplitting(std::shared_ptr chunkSpl if (!_isPrimary) { return; } - _threadPool.schedule( - [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ]( - auto status) noexcept { - invariant(status); - + uassertStatusOK(_threadPool.schedule( + [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ]() noexcept { _runAutosplit(csd, nss, min, max, dataWritten); - }); + })); } void ChunkSplitter::_runAutosplit(std::shared_ptr chunkSplitStateDriver, diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 4384aeeed72..36700332edb 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -394,10 +394,8 @@ std::shared_ptr> ShardServerCatalogCacheLoader::getChunksSinc return std::make_tuple(_role == ReplicaSetRole::Primary, _term); }(); - _threadPool.schedule( - [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept { - invariant(status); - + uassertStatusOK(_threadPool.schedule( + [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept { auto context = _contexts.makeOperationContext(*Client::getCurrent()); auto const opCtx = context.opCtx(); @@ -422,7 +420,7 @@ std::shared_ptr> ShardServerCatalogCacheLoader::getChunksSinc callbackFn(opCtx, ex.toStatus()); notify->set(); } - }); + })); return notify; } @@ -443,38 +441,36 @@ void ShardServerCatalogCacheLoader::getDatabase( isPrimary = (_role == ReplicaSetRole::Primary); } - _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]( - auto status) noexcept { - invariant(status); - - auto context = _contexts.makeOperationContext(*Client::getCurrent()); - - { - stdx::lock_guard lock(_mutex); + uassertStatusOK(_threadPool.schedule( + [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept { + auto context = _contexts.makeOperationContext(*Client::getCurrent()); - // We may have missed an OperationContextGroup interrupt since this operation began - // but before the OperationContext was added to the group. So we'll check that - // we're still in the same _term. - if (_term != currentTerm) { - callbackFn(context.opCtx(), - Status{ErrorCodes::Interrupted, - "Unable to refresh routing table because replica set state " - "changed or node is shutting down."}); - return; + { + stdx::lock_guard lock(_mutex); + + // We may have missed an OperationContextGroup interrupt since this operation began + // but before the OperationContext was added to the group. So we'll check that + // we're still in the same _term. + if (_term != currentTerm) { + callbackFn(context.opCtx(), + Status{ErrorCodes::Interrupted, + "Unable to refresh routing table because replica set state " + "changed or node is shutting down."}); + return; + } } - } - try { - if (isPrimary) { - _schedulePrimaryGetDatabase( - context.opCtx(), StringData(name), currentTerm, callbackFn); - } else { - _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn); + try { + if (isPrimary) { + _schedulePrimaryGetDatabase( + context.opCtx(), StringData(name), currentTerm, callbackFn); + } else { + _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn); + } + } catch (const DBException& ex) { + callbackFn(context.opCtx(), ex.toStatus()); } - } catch (const DBException& ex) { - callbackFn(context.opCtx(), ex.toStatus()); - } - }); + })); } void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, @@ -684,8 +680,9 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( }(); if (termAfterRefresh != termScheduled) { - // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to - // attempt the refresh as secondary instead of failing the operation + // Raising a ConflictingOperationInProgress error here will cause the + // CatalogCache to attempt the refresh as secondary instead of failing the + // operation return Status(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Replication stepdown occurred during refresh for '" << nss.toString()); @@ -880,11 +877,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun return; } - _threadPool.schedule([this, nss](auto status) { - invariant(status); - - _runCollAndChunksTasks(nss); - }); + invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); })); } void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx, @@ -902,11 +895,8 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper return; } - _threadPool.schedule([ this, name = dbName.toString() ](auto status) { - invariant(status); - - _runDbTasks(name); - }); + auto name = dbName.toString(); + invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); })); } void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) { @@ -934,11 +924,15 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString // Schedule more work if there is any if (!_collAndChunkTaskLists[nss].empty()) { - _threadPool.schedule([this, nss](auto status) { - invariant(status); - - _runCollAndChunksTasks(nss); - }); + Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }); + if (!status.isOK()) { + LOG(0) << "Cache loader failed to schedule a persisted metadata update" + << " task for namespace '" << nss << "' due to '" << redact(status) + << "'. Clearing task list so that scheduling will be attempted by the next" + << " caller to refresh this namespace."; + + _collAndChunkTaskLists.erase(nss); + } } else { _collAndChunkTaskLists.erase(nss); } @@ -968,11 +962,16 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { // Schedule more work if there is any if (!_dbTaskLists[dbName.toString()].empty()) { - _threadPool.schedule([ this, name = dbName.toString() ](auto status) { - invariant(status); - - _runDbTasks(name); - }); + Status status = + _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); }); + if (!status.isOK()) { + LOG(0) << "Cache loader failed to schedule a persisted metadata update" + << " task for namespace '" << dbName << "' due to '" << redact(status) + << "'. Clearing task list so that scheduling will be attempted by the next" + << " caller to refresh this namespace."; + + _dbTaskLists.erase(dbName.toString()); + } } else { _dbTaskLists.erase(dbName.toString()); } diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index d17280e9ca3..f99cce19ef8 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -80,22 +80,21 @@ void killSessionTokensFunction( if (sessionKillTokens->empty()) return; - getThreadPool(opCtx)->schedule( - [ service = opCtx->getServiceContext(), - sessionKillTokens = std::move(sessionKillTokens) ](auto status) mutable { - invariant(status); - - auto uniqueClient = service->makeClient("Kill-Session"); - auto uniqueOpCtx = uniqueClient->makeOperationContext(); - const auto opCtx = uniqueOpCtx.get(); - const auto catalog = SessionCatalog::get(opCtx); - - for (auto& sessionKillToken : *sessionKillTokens) { - auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken)); - - TransactionParticipant::get(session).invalidate(opCtx); - } - }); + uassertStatusOK(getThreadPool(opCtx)->schedule([ + service = opCtx->getServiceContext(), + sessionKillTokens = std::move(sessionKillTokens) + ]() mutable { + auto uniqueClient = service->makeClient("Kill-Session"); + auto uniqueOpCtx = uniqueClient->makeOperationContext(); + const auto opCtx = uniqueOpCtx.get(); + const auto catalog = SessionCatalog::get(opCtx); + + for (auto& sessionKillToken : *sessionKillTokens) { + auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken)); + + TransactionParticipant::get(session).invalidate(opCtx); + } + })); } } // namespace diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp index 5cef4ec9af2..e7afa9d5db4 100644 --- a/src/mongo/dbtests/threadedtests.cpp +++ b/src/mongo/dbtests/threadedtests.cpp @@ -130,10 +130,7 @@ public: tp.startup(); for (unsigned i = 0; i < iterations; i++) { - tp.schedule([=](auto status) { - ASSERT_OK(status); - increment(2); - }); + ASSERT_OK(tp.schedule([=] { increment(2); })); } tp.waitForIdle(); diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 41e4e602c65..c0c4293e505 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -646,9 +646,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // If the host and port were dropped, let this lapse if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); - fulfillRequests(lk); } - + spawnConnections(lk); return; } @@ -672,8 +671,6 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); - // TODO This should be scheduled on an executor once we have executor-aware pooling - fulfillRequests(lk); } updateStateInLock(); @@ -709,6 +706,8 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock& lk returnConnection(connPtr, std::move(lk)); })); + + fulfillRequests(lk); } // Sets state to shutdown and kicks off the failure protocol to tank existing connections @@ -861,8 +860,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lockgetGeneration() == _generation) { addToReady(lk, std::move(conn)); - fulfillRequests(lk); } + spawnConnections(lk); } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { // If we've exceeded the time limit, restart the connect, rather than // failing all operations. We do this because the various callers diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp index 787bd0a6dac..0fa5ed7632e 100644 --- a/src/mongo/executor/network_interface_thread_pool.cpp +++ b/src/mongo/executor/network_interface_thread_pool.cpp @@ -105,18 +105,17 @@ void NetworkInterfaceThreadPool::join() { lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); }); } -void NetworkInterfaceThreadPool::schedule(Task task) { +Status NetworkInterfaceThreadPool::schedule(Task task) { stdx::unique_lock lk(_mutex); if (_inShutdown) { - lk.unlock(); - task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); - return; + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } - _tasks.emplace_back(std::move(task)); if (_started) _consumeTasks(std::move(lk)); + + return Status::OK(); } /** @@ -163,7 +162,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock lk); diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index a9529d1c825..eb808735306 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( StatusWith> swConn) mutable { makeReadyFutureWith([&] { - auto connHandle = uassertStatusOK(std::move(swConn)); - return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton); + return _onAcquireConn( + state, std::move(future), std::move(*uassertStatusOK(swConn)), baton); }) .onError([](Status error) -> StatusWith { // The TransportLayer has, for historical reasons returned SocketException for @@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function action) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } - _reactor->schedule([action = std::move(action)](auto status) { action(status); }); + _reactor->schedule([action = std::move(action)]() { action(Status::OK()); }); return Status::OK(); } @@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle } if (when <= now()) { - _reactor->schedule([action = std::move(action)](auto status) { action(status); }); + _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); }); return Status::OK(); } @@ -569,13 +569,7 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr } // Fulfill the promise on a reactor thread - _reactor->schedule([state](auto status) { - if (status.isOK()) { - state->promise.emplaceValue(); - } else { - state->promise.setError(status); - } - }); + _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); }); } bool NetworkInterfaceTL::onNetworkThread() { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 2ab75f49aaa..df634ed3fb4 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -102,7 +102,7 @@ private: transport::ReactorHandle reactor; void operator()(ConnectionPool::ConnectionInterface* ptr) const { - reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); }); + reactor->dispatch([ ret = returner, ptr ] { ret(ptr); }); } }; using ConnHandle = std::unique_ptr; diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index 191537cebff..a2ffbbac2ba 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -44,11 +44,19 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock lk(_mutex); - if (_joining) - return; - - _shutdown(lk); - _join(lk); + _inShutdown = true; + _net->signalWorkAvailable(); + _net->exitNetwork(); + if (_started) { + if (_worker.joinable()) { + lk.unlock(); + _worker.join(); + lk.lock(); + } + } else { + consumeTasks(&lk); + } + invariant(_tasks.empty()); } void ThreadPoolMock::startup() { @@ -60,87 +68,74 @@ void ThreadPoolMock::startup() { _worker = stdx::thread([this] { _options.onCreateThread(); stdx::unique_lock lk(_mutex); - - LOG(1) << "Starting to consume tasks"; - while (!_joining) { - if (_tasks.empty()) { - lk.unlock(); - _net->waitForWork(); - lk.lock(); - continue; - } - - _consumeOneTask(lk); - } - LOG(1) << "Done consuming tasks"; + consumeTasks(&lk); }); } void ThreadPoolMock::shutdown() { - stdx::unique_lock lk(_mutex); - _shutdown(lk); + stdx::lock_guard lk(_mutex); + _inShutdown = true; + _net->signalWorkAvailable(); } void ThreadPoolMock::join() { stdx::unique_lock lk(_mutex); - _join(lk); -} - -void ThreadPoolMock::schedule(Task task) { - stdx::unique_lock lk(_mutex); - if (_inShutdown) { + _joining = true; + if (_started) { + stdx::thread toJoin = std::move(_worker); + _net->signalWorkAvailable(); + _net->exitNetwork(); lk.unlock(); - - task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); - return; + toJoin.join(); + lk.lock(); + invariant(_tasks.empty()); + } else { + consumeTasks(&lk); + invariant(_tasks.empty()); } - - _tasks.emplace_back(std::move(task)); } -void ThreadPoolMock::_consumeOneTask(stdx::unique_lock& lk) { - auto next = static_cast(_prng.nextInt64(static_cast(_tasks.size()))); - if (next + 1 != _tasks.size()) { - std::swap(_tasks[next], _tasks.back()); - } - Task fn = std::move(_tasks.back()); - _tasks.pop_back(); - lk.unlock(); +Status ThreadPoolMock::schedule(Task task) { + stdx::lock_guard lk(_mutex); if (_inShutdown) { - fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); - } else { - fn(Status::OK()); + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } - lk.lock(); -} - -void ThreadPoolMock::_shutdown(stdx::unique_lock& lk) { - LOG(1) << "Shutting down pool"; - - _inShutdown = true; - _net->signalWorkAvailable(); + _tasks.emplace_back(std::move(task)); + return Status::OK(); } -void ThreadPoolMock::_join(stdx::unique_lock& lk) { - LOG(1) << "Joining pool"; +void ThreadPoolMock::consumeTasks(stdx::unique_lock* lk) { + using std::swap; - _joining = true; - _net->signalWorkAvailable(); - _net->exitNetwork(); - - // Since there is only one worker thread, we need to consume tasks here to potentially - // unblock that thread. - while (!_tasks.empty()) { - _consumeOneTask(lk); + LOG(1) << "Starting to consume tasks"; + while (!(_inShutdown && _tasks.empty())) { + if (_tasks.empty()) { + lk->unlock(); + _net->waitForWork(); + lk->lock(); + continue; + } + auto next = static_cast(_prng.nextInt64(static_cast(_tasks.size()))); + if (next + 1 != _tasks.size()) { + swap(_tasks[next], _tasks.back()); + } + Task fn = std::move(_tasks.back()); + _tasks.pop_back(); + lk->unlock(); + fn(); + lk->lock(); } + LOG(1) << "Done consuming tasks"; - if (_started) { - lk.unlock(); - _worker.join(); - lk.lock(); + invariant(_tasks.empty()); + + while (_started && !_joining) { + lk->unlock(); + _net->waitForWork(); + lk->lock(); } - invariant(_tasks.empty()); + LOG(1) << "Ready to join"; } } // namespace executor diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h index e3baaa07273..3d510ffa3a9 100644 --- a/src/mongo/executor/thread_pool_mock.h +++ b/src/mongo/executor/thread_pool_mock.h @@ -70,12 +70,10 @@ public: void startup() override; void shutdown() override; void join() override; - void schedule(Task task) override; + Status schedule(Task task) override; private: - void _consumeOneTask(stdx::unique_lock& lk); - void _shutdown(stdx::unique_lock& lk); - void _join(stdx::unique_lock& lk); + void consumeTasks(stdx::unique_lock* lk); // These are the options with which the pool was configured at construction time. const Options _options; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 9496b88d21f..fefecf9a04e 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -597,9 +597,7 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) { scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off); - - auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); }; - while (!ErrorCodes::isCancelationError(checkStatus().code())) { + while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) { sleepmillis(100); } } @@ -613,24 +611,16 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } cbState->canceled.store(1); - _pool->schedule([this, cbState](auto status) { - invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); - - runCallback(std::move(cbState)); - }); + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress); }); } else { - _pool->schedule([this, cbState](auto status) { - if (ErrorCodes::isCancelationError(status.code())) { - stdx::lock_guard lk(_mutex); - - cbState->canceled.store(1); - } else { - fassert(28735, status); - } - - runCallback(std::move(cbState)); - }); + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + if (status == ErrorCodes::ShutdownInProgress) + break; + fassert(28735, status); } } _net->signalWorkAvailable(); diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index 0776a4c3626..977c709eaa3 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -176,9 +176,7 @@ std::shared_ptr> ConfigServerCatalogCacheLoader::getChunksSin const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared>(); - _threadPool.schedule([ nss, version, notify, callbackFn ](auto status) noexcept { - invariant(status); - + uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept { auto opCtx = Client::getCurrent()->makeOperationContext(); auto swCollAndChunks = [&]() -> StatusWith { @@ -191,7 +189,7 @@ std::shared_ptr> ConfigServerCatalogCacheLoader::getChunksSin callbackFn(opCtx.get(), std::move(swCollAndChunks)); notify->set(); - }); + })); return notify; } @@ -199,9 +197,7 @@ std::shared_ptr> ConfigServerCatalogCacheLoader::getChunksSin void ConfigServerCatalogCacheLoader::getDatabase( StringData dbName, stdx::function)> callbackFn) { - _threadPool.schedule([ name = dbName.toString(), callbackFn ](auto status) noexcept { - invariant(status); - + uassertStatusOK(_threadPool.schedule([ name = dbName.toString(), callbackFn ]() noexcept { auto opCtx = Client::getCurrent()->makeOperationContext(); auto swDbt = [&]() -> StatusWith { @@ -218,7 +214,7 @@ void ConfigServerCatalogCacheLoader::getDatabase( }(); callbackFn(opCtx.get(), std::move(swDbt)); - }); + })); } } // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp index edc0cdb0c75..19c1df24d90 100644 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -183,8 +183,7 @@ Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, } auto wrappedTask = - [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ]( - auto status) { + [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ] { pendingCounterPtr->subtractAndFetch(1); auto start = _tickSource->getTicks(); _totalSpentQueued.addAndFetch(start - scheduleTime); diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 2c52deafb69..1a987e2242a 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -128,11 +128,11 @@ public: } void schedule(Task task) final { - asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); }); + asio::post(_ioContext, std::move(task)); } void dispatch(Task task) final { - asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); }); + asio::dispatch(_ioContext, std::move(task)); } bool onReactorThread() const final { diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 1a4aeafea19..dc0475b801d 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -183,11 +183,11 @@ public: } void schedule(Task task) override { - asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); }); + asio::post(_ioContext, std::move(task)); } void dispatch(Task task) override { - asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); }); + asio::dispatch(_ioContext, std::move(task)); } bool onReactorThread() const override { diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp index 5f732c1a1ed..81f1e3c2285 100644 --- a/src/mongo/util/concurrency/thread_pool.cpp +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -185,22 +185,15 @@ void ThreadPool::_drainPendingTasks() { cleanThread.join(); } -void ThreadPool::schedule(Task task) { - stdx::unique_lock lk(_mutex); - +Status ThreadPool::schedule(Task task) { + stdx::lock_guard lk(_mutex); switch (_state) { case joinRequired: case joining: - case shutdownComplete: { - auto status = Status(ErrorCodes::ShutdownInProgress, - str::stream() << "Shutdown of thread pool " << _options.poolName - << " in progress"); - - lk.unlock(); - task(status); - return; - } break; - + case shutdownComplete: + return Status(ErrorCodes::ShutdownInProgress, + str::stream() << "Shutdown of thread pool " << _options.poolName + << " in progress"); case preStart: case running: break; @@ -209,7 +202,7 @@ void ThreadPool::schedule(Task task) { } _pendingTasks.emplace_back(std::move(task)); if (_state == preStart) { - return; + return Status::OK(); } if (_numIdleThreads < _pendingTasks.size()) { _startWorkerThread_inlock(); @@ -218,6 +211,7 @@ void ThreadPool::schedule(Task task) { _lastFullUtilizationDate = Date_t::now(); } _workAvailable.notify_one(); + return Status::OK(); } void ThreadPool::waitForIdle() { @@ -338,7 +332,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock* lk) noexcept { _pendingTasks.pop_front(); --_numIdleThreads; lk->unlock(); - task(Status::OK()); + task(); lk->lock(); ++_numIdleThreads; if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) { diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index c4873f84dff..370de65056e 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -123,7 +123,7 @@ public: void startup() override; void shutdown() override; void join() override; - void schedule(Task task) override; + Status schedule(Task task) override; /** * Blocks the caller until there are no pending tasks on this pool. diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h index bb59ac8e9f8..847dfe0506b 100644 --- a/src/mongo/util/concurrency/thread_pool_interface.h +++ b/src/mongo/util/concurrency/thread_pool_interface.h @@ -30,7 +30,6 @@ #pragma once #include "mongo/util/functional.h" -#include "mongo/util/out_of_line_executor.h" namespace mongo { @@ -39,11 +38,13 @@ class Status; /** * Interface for a thread pool. */ -class ThreadPoolInterface : public OutOfLineExecutor { +class ThreadPoolInterface { ThreadPoolInterface(const ThreadPoolInterface&) = delete; ThreadPoolInterface& operator=(const ThreadPoolInterface&) = delete; public: + using Task = unique_function; + /** * Destroys a thread pool. * @@ -81,7 +82,7 @@ public: * It is safe to call this before startup(), but the scheduled task will not execute * until after startup() is called. */ - virtual void schedule(Task task) = 0; + virtual Status schedule(Task task) = 0; protected: ThreadPoolInterface() = default; diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp index b4a650c54bb..012b5ca3ca7 100644 --- a/src/mongo/util/concurrency/thread_pool_test.cpp +++ b/src/mongo/util/concurrency/thread_pool_test.cpp @@ -104,17 +104,14 @@ TEST_F(ThreadPoolTest, MinPoolSize0) { pool.startup(); ASSERT_EQ(0U, pool.getStats().numThreads); stdx::unique_lock lk(mutex); - pool.schedule([this](auto status) { - ASSERT_OK(status); - blockingWork(); - }); + ASSERT_OK(pool.schedule([this] { blockingWork(); })); while (count1 != 1U) { cv1.wait(lk); } auto stats = pool.getStats(); ASSERT_EQUALS(1U, stats.numThreads); ASSERT_EQUALS(0U, stats.numPendingTasks); - pool.schedule([](auto status) { ASSERT_OK(status); }); + ASSERT_OK(pool.schedule([] {})); stats = pool.getStats(); ASSERT_EQUALS(1U, stats.numThreads); ASSERT_EQUALS(0U, stats.numIdleThreads); @@ -132,10 +129,7 @@ TEST_F(ThreadPoolTest, MinPoolSize0) { lk.lock(); flag2 = false; count1 = 0; - pool.schedule([this](auto status) { - ASSERT_OK(status); - blockingWork(); - }); + ASSERT_OK(pool.schedule([this] { blockingWork(); })); while (count1 == 0) { cv1.wait(lk); } @@ -157,10 +151,7 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) { pool.startup(); stdx::unique_lock lk(mutex); for (size_t i = 0U; i < 30U; ++i) { - pool.schedule([this, i](auto status) { - ASSERT_OK(status) << i; - blockingWork(); - }); + ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i; } while (count1 < 20U) { cv1.wait(lk); @@ -234,10 +225,7 @@ DEATH_TEST(ThreadPoolTest, sleepmillis(50); } stdx::unique_lock lk(mutex); - pool->schedule([&mutex](auto status) { - ASSERT_OK(status); - stdx::lock_guard lk(mutex); - }); + ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard lk(mutex); })); stdx::thread t([&pool] { pool->shutdown(); pool->join(); @@ -269,10 +257,7 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks) ThreadPool pool(options); pool.startup(); - pool.schedule([&barrier](auto status) { - ASSERT_OK(status); - barrier.countDownAndWait(); - }); + ASSERT_OK(pool.schedule([&barrier] { barrier.countDownAndWait(); })); barrier.countDownAndWait(); ASSERT_TRUE(onCreateThreadCalled); diff --git a/src/mongo/util/concurrency/thread_pool_test_common.cpp b/src/mongo/util/concurrency/thread_pool_test_common.cpp index 2c2113bb890..a03da59cd49 100644 --- a/src/mongo/util/concurrency/thread_pool_test_common.cpp +++ b/src/mongo/util/concurrency/thread_pool_test_common.cpp @@ -144,7 +144,7 @@ COMMON_THREAD_POOL_TEST(UnusedPool) { COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) { auto& pool = getThreadPool(); pool.shutdown(); - pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); }); + ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([] {})); } COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "it has already started") { @@ -160,9 +160,9 @@ constexpr auto kExceptionMessage = "No good very bad exception"; COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) { auto& pool = getThreadPool(); pool.startup(); - pool.schedule([](auto status) { + ASSERT_OK(pool.schedule([] { uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage})); - }); + })); pool.shutdown(); pool.join(); } @@ -177,10 +177,7 @@ COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") { COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) { auto& pool = getThreadPool(); bool executed = false; - pool.schedule([&executed](auto status) { - ASSERT_OK(status); - executed = true; - }); + ASSERT_OK(pool.schedule([&executed] { executed = true; })); deleteThreadPool(); ASSERT_EQ(executed, true); } @@ -188,10 +185,7 @@ COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) { COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) { auto& pool = getThreadPool(); bool executed = false; - pool.schedule([&executed](auto status) { - ASSERT_OK(status); - executed = true; - }); + ASSERT_OK(pool.schedule([&executed] { executed = true; })); pool.shutdown(); pool.join(); ASSERT_EQ(executed, true); @@ -204,15 +198,12 @@ COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) { std::size_t n = 0; stdx::mutex mutex; stdx::condition_variable condvar; - func = [&pool, &n, &func, &condvar, &mutex, depth]() { + func = [&pool, &n, &func, &condvar, &mutex, depth] { stdx::unique_lock lk(mutex); if (n < depth) { n++; lk.unlock(); - pool.schedule([&](auto status) { - ASSERT_OK(status); - func(); - }); + ASSERT_OK(pool.schedule(func)); } else { pool.shutdown(); condvar.notify_one(); diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp index a3804576eaa..bd507eaf416 100644 --- a/src/mongo/util/keyed_executor_test.cpp +++ b/src/mongo/util/keyed_executor_test.cpp @@ -45,7 +45,7 @@ namespace { class MockExecutor : public OutOfLineExecutor { public: - void schedule(Task func) override { + void schedule(unique_function func) override { _deque.push_front(std::move(func)); } @@ -60,7 +60,7 @@ public: auto x = std::move(_deque.back()); _deque.pop_back(); - x(Status::OK()); + x(); return true; } @@ -71,7 +71,27 @@ public: } private: - std::deque _deque; + std::deque> _deque; +}; + +class ThreadPoolExecutor : public OutOfLineExecutor { +public: + ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {} + + void start() { + _threadPool.startup(); + } + + void shutdown() { + _threadPool.shutdown(); + } + + void schedule(unique_function func) override { + ASSERT_OK(_threadPool.schedule(std::move(func))); + } + +private: + ThreadPool _threadPool; }; TEST(KeyedExecutor, basicExecute) { @@ -312,9 +332,9 @@ TEST(KeyedExecutor, gracefulShutdown) { } TEST(KeyedExecutor, withThreadsTest) { - auto thread_pool = ThreadPool(ThreadPool::Options{}); - KeyedExecutor ke(&thread_pool); - thread_pool.startup(); + ThreadPoolExecutor tpe; + KeyedExecutor ke(&tpe); + tpe.start(); constexpr size_t n = (1 << 16); @@ -352,7 +372,7 @@ TEST(KeyedExecutor, withThreadsTest) { stdx::unique_lock lk(mutex); condvar.wait(lk, [&] { return counter == n; }); - thread_pool.shutdown(); + tpe.shutdown(); ASSERT_EQUALS(counter, n); } diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h index 0aa5599c2aa..9dc30653860 100644 --- a/src/mongo/util/out_of_line_executor.h +++ b/src/mongo/util/out_of_line_executor.h @@ -29,7 +29,6 @@ #pragma once -#include "mongo/base/status.h" #include "mongo/stdx/functional.h" #include "mongo/util/future.h" @@ -50,7 +49,7 @@ namespace mongo { */ class OutOfLineExecutor { public: - using Task = unique_function; + using Task = unique_function; public: /** @@ -60,32 +59,17 @@ public: */ template Future> execute(Callback&& cb) { - auto[promise, future] = makePromiseFuture>(); + auto pf = makePromiseFuture>(); - schedule([ cb = std::forward(cb), p = std::move(promise) ](auto status) mutable { - if (!status.isOK()) { - p.setError(status); - return; - } + schedule([ cb = std::forward(cb), p = std::move(pf.promise) ]() mutable { p.setWith(std::move(cb)); }); - return std::move(future); + return std::move(pf.future); } /** - * Delegates invocation of the Task to this executor - * - * Execution of the Task can happen in one of three contexts: - * * By default, on an execution context maintained by the OutOfLineExecutor (i.e. a thread). - * * During shutdown, on the execution context of shutdown/join/dtor for the OutOfLineExecutor. - * * Post-shutdown, on the execution context of the calling code. - * - * The Task will be passed a Status schedStatus that is either: - * * schedStatus.isOK() if the function is run in an out-of-line context - * * isCancelationError(schedStatus.code()) if the function is run in an inline context - * - * All of this is to say: CHECK YOUR STATUS. + * Invokes the callback on the executor. This never happens immediately on the caller's stack. */ virtual void schedule(Task func) = 0; -- cgit v1.2.1