diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-03-05 13:40:25 -0500 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-04-05 14:28:40 -0400 |
commit | a08bac3e29cd9ec3d030623894928e80c2f572dd (patch) | |
tree | a82304a07e67909a8d8a40ec08959f4f1c8ccace /src | |
parent | b1aa248f71ffb197c6576fd90ff7571ee9a96c3f (diff) | |
download | mongo-a08bac3e29cd9ec3d030623894928e80c2f572dd.tar.gz |
SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)
Diffstat (limited to 'src')
32 files changed, 366 insertions, 275 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 4fa6ca8929b..fe682f8eb62 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -343,6 +343,10 @@ error_class("WriteConcernError", ["WriteConcernFailed", "UnsatisfiableWriteConcern"]) error_class("ShutdownError", ["ShutdownInProgress", "InterruptedAtShutdown"]) +# isCancelationError() includes all codes that, when passed to a function as its parameter, +# indicates that it cannot be executed as normal and must abort its intended work. +error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled"]) + #TODO SERVER-28679 add checksum failure. error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag", "TooManyDocumentSequences"]) diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp index 3e609009931..2dbda1013c4 100644 --- a/src/mongo/db/concurrency/deferred_writer.cpp +++ b/src/mongo/db/concurrency/deferred_writer.cpp @@ -178,7 +178,11 @@ bool DeferredWriter::insertDocument(BSONObj obj) { // Add the object to the buffer. _numBytes += obj.objsize(); - fassert(40588, _pool->schedule([this, obj] { _worker(InsertStatement(obj.getOwned())); })); + _pool->schedule([this, obj](auto status) { + fassert(40588, status); + + _worker(InsertStatement(obj.getOwned())); + }); return true; } diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp index cf783ab1676..3bd3ac802ec 100644 --- a/src/mongo/db/index_builds_coordinator_mongod.cpp +++ b/src/mongo/db/index_builds_coordinator_mongod.cpp @@ -154,7 +154,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, opDesc = curOp->opDescription().getOwned(); } - Status status = _threadPool.schedule([ + _threadPool.schedule([ this, buildUUID, deadline, @@ -162,8 +162,23 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, writesAreReplicated, shouldNotConflictWithSecondaryBatchApplication, logicalOp, - opDesc - ]() noexcept { + opDesc, + replState + ](auto status) noexcept { + // Clean up the index build if we failed to schedule it. + if (!status.isOK()) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + // Unregister the index build before setting the promises, + // so callers do not see the build again. + _unregisterIndexBuild(lk, replState); + + // Set the promise in case another thread already joined the index build. + replState->sharedPromise.setError(status); + + return; + } + auto opCtx = Client::getCurrent()->makeOperationContext(); opCtx->setDeadlineByDate(deadline, timeoutError); @@ -190,19 +205,6 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, _runIndexBuild(opCtx.get(), buildUUID); }); - // Clean up the index build if we failed to schedule it. - if (!status.isOK()) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - // Unregister the index build before setting the promises, so callers do not see the build - // again. - _unregisterIndexBuild(lk, replState); - - // Set the promise in case another thread already joined the index build. - replState->sharedPromise.setError(status); - - return status; - } return replState->sharedPromise.getFuture(); } diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index 9aa8fe62628..76fd17eaa5c 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -156,12 +156,8 @@ void _checkOplogEntry(const OplogEntry& oplogEntry, * the contents of the oplog collection. */ using OpTimeNamespaceStringMap = std::map<OpTime, NamespaceString>; -using MakeTaskFunction = - stdx::function<ThreadPoolInterface::Task(const NamespaceString& nss, - stdx::mutex* mtx, - OpTimeNamespaceStringMap* opTimeNssMap, - unittest::Barrier* barrier)>; -void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction, +template <typename F> +void _testConcurrentLogOp(const F& makeTaskFunction, OpTimeNamespaceStringMap* opTimeNssMap, std::vector<OplogEntry>* oplogEntries, std::size_t expectedNumOplogEntries) { @@ -181,10 +177,14 @@ void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction, unittest::Barrier barrier(3U); const NamespaceString nss1("test1.coll"); const NamespaceString nss2("test2.coll"); - ASSERT_OK(pool.schedule(makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier))) - << "Failed to schedule logOp() task for namespace " << nss1; - ASSERT_OK(pool.schedule(makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier))) - << "Failed to schedule logOp() task for namespace " << nss2; + pool.schedule([&](auto status) mutable { + ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss1; + makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)(); + }); + pool.schedule([&](auto status) mutable { + ASSERT_OK(status) << "Failed to schedule logOp() task for namespace " << nss2; + makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)(); + }); barrier.countDownAndWait(); // Shut thread pool down. diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 399bede7329..08e942a956e 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -418,7 +418,9 @@ void scheduleWritesToOplog(OperationContext* opCtx, // The returned function will be run in a separate thread after this returns. Therefore all // captures other than 'ops' must be by value since they will not be available. The caller // guarantees that 'ops' will stay in scope until the spawned threads complete. - return [storageInterface, &ops, begin, end] { + return [storageInterface, &ops, begin, end](auto status) { + invariant(status); + auto opCtx = cc().makeOperationContext(); UnreplicatedWritesBlock uwb(opCtx.get()); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( @@ -453,7 +455,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, if (!enoughToMultiThread || !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { - invariant(threadPool->schedule(makeOplogWriterForRange(0, ops.size()))); + threadPool->schedule(makeOplogWriterForRange(0, ops.size())); return; } @@ -463,7 +465,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, for (size_t thread = 0; thread < numOplogThreads; thread++) { size_t begin = thread * numOpsPerThread; size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread; - invariant(threadPool->schedule(makeOplogWriterForRange(begin, end))); + threadPool->schedule(makeOplogWriterForRange(begin, end)); } } @@ -1356,16 +1358,18 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors if (writerVectors[i].empty()) continue; - invariant(_writerPool->schedule([ + _writerPool->schedule([ this, &writer = writerVectors.at(i), &status = statusVector->at(i), &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i) - ] { + ](auto scheduleStatus) { + invariant(scheduleStatus); + auto opCtx = cc().makeOperationContext(); status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown( [&] { return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo); }); - })); + }); } } diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 2226ad2bcba..b4036617a57 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -113,7 +113,10 @@ void TaskRunner::schedule(Task task) { return; } - invariant(_threadPool->schedule([this] { _runTasks(); })); + _threadPool->schedule([this](auto status) { + invariant(status); + _runTasks(); + }); _active = true; _cancelRequested = false; diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp index 9b9b52870cc..89ea61c3697 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -99,7 +99,9 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince( const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared<Notification<void>>(); - uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept { + _threadPool.schedule([ this, notify, callbackFn ](auto status) noexcept { + invariant(status); + auto opCtx = Client::getCurrent()->makeOperationContext(); auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { @@ -121,7 +123,7 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince( callbackFn(opCtx.get(), std::move(swCollAndChunks)); notify->set(); - })); + }); return notify; } diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index 3495a01ac05..cfe972510a7 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -272,10 +272,13 @@ void ChunkSplitter::trySplitting(std::shared_ptr<ChunkSplitStateDriver> chunkSpl if (!_isPrimary) { return; } - uassertStatusOK(_threadPool.schedule( - [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ]() noexcept { + _threadPool.schedule( + [ this, csd = std::move(chunkSplitStateDriver), nss, min, max, dataWritten ]( + auto status) noexcept { + invariant(status); + _runAutosplit(csd, nss, min, max, dataWritten); - })); + }); } void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSplitStateDriver, diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index c816b1c02e7..0d77fa131a2 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -143,7 +143,13 @@ void scheduleCleanup(executor::TaskExecutor* executor, Date_t when) { LOG(1) << "Scheduling cleanup on " << nss.ns() << " at " << when; auto swCallbackHandle = executor->scheduleWorkAt( - when, [ executor, nss = std::move(nss), epoch = std::move(epoch) ](auto&) { + when, [ executor, nss = std::move(nss), epoch = std::move(epoch) ](auto& args) { + auto& status = args.status; + if (ErrorCodes::isCancelationError(status.code())) { + return; + } + invariant(status); + ThreadClient tc("Collection-Range-Deleter", getGlobalServiceContext()); auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); auto opCtx = uniqueOpCtx.get(); diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 36700332edb..4384aeeed72 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -394,8 +394,10 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc return std::make_tuple(_role == ReplicaSetRole::Primary, _term); }(); - uassertStatusOK(_threadPool.schedule( - [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept { + _threadPool.schedule( + [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept { + invariant(status); + auto context = _contexts.makeOperationContext(*Client::getCurrent()); auto const opCtx = context.opCtx(); @@ -420,7 +422,7 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc callbackFn(opCtx, ex.toStatus()); notify->set(); } - })); + }); return notify; } @@ -441,36 +443,38 @@ void ShardServerCatalogCacheLoader::getDatabase( isPrimary = (_role == ReplicaSetRole::Primary); } - uassertStatusOK(_threadPool.schedule( - [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept { - auto context = _contexts.makeOperationContext(*Client::getCurrent()); + _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]( + auto status) noexcept { + invariant(status); - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - - // We may have missed an OperationContextGroup interrupt since this operation began - // but before the OperationContext was added to the group. So we'll check that - // we're still in the same _term. - if (_term != currentTerm) { - callbackFn(context.opCtx(), - Status{ErrorCodes::Interrupted, - "Unable to refresh routing table because replica set state " - "changed or node is shutting down."}); - return; - } + auto context = _contexts.makeOperationContext(*Client::getCurrent()); + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + // We may have missed an OperationContextGroup interrupt since this operation began + // but before the OperationContext was added to the group. So we'll check that + // we're still in the same _term. + if (_term != currentTerm) { + callbackFn(context.opCtx(), + Status{ErrorCodes::Interrupted, + "Unable to refresh routing table because replica set state " + "changed or node is shutting down."}); + return; } + } - try { - if (isPrimary) { - _schedulePrimaryGetDatabase( - context.opCtx(), StringData(name), currentTerm, callbackFn); - } else { - _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn); - } - } catch (const DBException& ex) { - callbackFn(context.opCtx(), ex.toStatus()); + try { + if (isPrimary) { + _schedulePrimaryGetDatabase( + context.opCtx(), StringData(name), currentTerm, callbackFn); + } else { + _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn); } - })); + } catch (const DBException& ex) { + callbackFn(context.opCtx(), ex.toStatus()); + } + }); } void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, @@ -680,9 +684,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( }(); if (termAfterRefresh != termScheduled) { - // Raising a ConflictingOperationInProgress error here will cause the - // CatalogCache to attempt the refresh as secondary instead of failing the - // operation + // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to + // attempt the refresh as secondary instead of failing the operation return Status(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Replication stepdown occurred during refresh for '" << nss.toString()); @@ -877,7 +880,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun return; } - invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); })); + _threadPool.schedule([this, nss](auto status) { + invariant(status); + + _runCollAndChunksTasks(nss); + }); } void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx, @@ -895,8 +902,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper return; } - auto name = dbName.toString(); - invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); })); + _threadPool.schedule([ this, name = dbName.toString() ](auto status) { + invariant(status); + + _runDbTasks(name); + }); } void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) { @@ -924,15 +934,11 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString // Schedule more work if there is any if (!_collAndChunkTaskLists[nss].empty()) { - Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }); - if (!status.isOK()) { - LOG(0) << "Cache loader failed to schedule a persisted metadata update" - << " task for namespace '" << nss << "' due to '" << redact(status) - << "'. Clearing task list so that scheduling will be attempted by the next" - << " caller to refresh this namespace."; - - _collAndChunkTaskLists.erase(nss); - } + _threadPool.schedule([this, nss](auto status) { + invariant(status); + + _runCollAndChunksTasks(nss); + }); } else { _collAndChunkTaskLists.erase(nss); } @@ -962,16 +968,11 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { // Schedule more work if there is any if (!_dbTaskLists[dbName.toString()].empty()) { - Status status = - _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); }); - if (!status.isOK()) { - LOG(0) << "Cache loader failed to schedule a persisted metadata update" - << " task for namespace '" << dbName << "' due to '" << redact(status) - << "'. Clearing task list so that scheduling will be attempted by the next" - << " caller to refresh this namespace."; - - _dbTaskLists.erase(dbName.toString()); - } + _threadPool.schedule([ this, name = dbName.toString() ](auto status) { + invariant(status); + + _runDbTasks(name); + }); } else { _dbTaskLists.erase(dbName.toString()); } diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index c378801ea4c..732395d1ddb 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -79,18 +79,24 @@ public: auto scheduledWorkHandle = uassertStatusOK(_executor->scheduleWorkAt( when, [ this, task = std::forward<Callable>(task), taskCompletionPromise ]( - const executor::TaskExecutor::CallbackArgs&) mutable noexcept { + const executor::TaskExecutor::CallbackArgs& args) mutable noexcept { taskCompletionPromise->setWith([&] { + { + stdx::lock_guard lk(_mutex); + uassertStatusOK(_shutdownStatus); + uassertStatusOK(args.status); + } + ThreadClient tc("TransactionCoordinator", _serviceContext); - stdx::unique_lock<stdx::mutex> ul(_mutex); - uassertStatusOK(_shutdownStatus); - auto uniqueOpCtxIter = _activeOpContexts.emplace( - _activeOpContexts.begin(), tc->makeOperationContext()); - ul.unlock(); + auto uniqueOpCtxIter = [&] { + stdx::lock_guard lk(_mutex); + return _activeOpContexts.emplace(_activeOpContexts.begin(), + tc->makeOperationContext()); + }(); ON_BLOCK_EXIT([&] { - ul.lock(); + stdx::lock_guard lk(_mutex); _activeOpContexts.erase(uniqueOpCtxIter); // There is no need to call _notifyAllTasksComplete here, because we // will still have an outstanding _activeHandles entry, so the scheduler diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index f99cce19ef8..d17280e9ca3 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -80,21 +80,22 @@ void killSessionTokensFunction( if (sessionKillTokens->empty()) return; - uassertStatusOK(getThreadPool(opCtx)->schedule([ - service = opCtx->getServiceContext(), - sessionKillTokens = std::move(sessionKillTokens) - ]() mutable { - auto uniqueClient = service->makeClient("Kill-Session"); - auto uniqueOpCtx = uniqueClient->makeOperationContext(); - const auto opCtx = uniqueOpCtx.get(); - const auto catalog = SessionCatalog::get(opCtx); - - for (auto& sessionKillToken : *sessionKillTokens) { - auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken)); - - TransactionParticipant::get(session).invalidate(opCtx); - } - })); + getThreadPool(opCtx)->schedule( + [ service = opCtx->getServiceContext(), + sessionKillTokens = std::move(sessionKillTokens) ](auto status) mutable { + invariant(status); + + auto uniqueClient = service->makeClient("Kill-Session"); + auto uniqueOpCtx = uniqueClient->makeOperationContext(); + const auto opCtx = uniqueOpCtx.get(); + const auto catalog = SessionCatalog::get(opCtx); + + for (auto& sessionKillToken : *sessionKillTokens) { + auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken)); + + TransactionParticipant::get(session).invalidate(opCtx); + } + }); } } // namespace diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp index e7afa9d5db4..5cef4ec9af2 100644 --- a/src/mongo/dbtests/threadedtests.cpp +++ b/src/mongo/dbtests/threadedtests.cpp @@ -130,7 +130,10 @@ public: tp.startup(); for (unsigned i = 0; i < iterations; i++) { - ASSERT_OK(tp.schedule([=] { increment(2); })); + tp.schedule([=](auto status) { + ASSERT_OK(status); + increment(2); + }); } tp.waitForIdle(); diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index c0c4293e505..41e4e602c65 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -646,8 +646,9 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // If the host and port were dropped, let this lapse if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); + fulfillRequests(lk); } - spawnConnections(lk); + return; } @@ -671,6 +672,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); + // TODO This should be scheduled on an executor once we have executor-aware pooling + fulfillRequests(lk); } updateStateInLock(); @@ -706,8 +709,6 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk returnConnection(connPtr, std::move(lk)); })); - - fulfillRequests(lk); } // Sets state to shutdown and kicks off the failure protocol to tank existing connections @@ -860,8 +861,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // If the host and port was dropped, let the connection lapse if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); + fulfillRequests(lk); } - spawnConnections(lk); } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { // If we've exceeded the time limit, restart the connect, rather than // failing all operations. We do this because the various callers diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp index 0fa5ed7632e..787bd0a6dac 100644 --- a/src/mongo/executor/network_interface_thread_pool.cpp +++ b/src/mongo/executor/network_interface_thread_pool.cpp @@ -105,17 +105,18 @@ void NetworkInterfaceThreadPool::join() { lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); }); } -Status NetworkInterfaceThreadPool::schedule(Task task) { +void NetworkInterfaceThreadPool::schedule(Task task) { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_inShutdown) { - return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + lk.unlock(); + task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + return; } + _tasks.emplace_back(std::move(task)); if (_started) _consumeTasks(std::move(lk)); - - return Status::OK(); } /** @@ -162,7 +163,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mut const auto lkGuard = makeGuard([&] { lk.lock(); }); for (auto&& task : tasks) { - task(); + task(Status::OK()); } tasks.clear(); diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h index 712ad2d6df7..51771393032 100644 --- a/src/mongo/executor/network_interface_thread_pool.h +++ b/src/mongo/executor/network_interface_thread_pool.h @@ -57,7 +57,7 @@ public: void startup() override; void shutdown() override; void join() override; - Status schedule(Task task) override; + void schedule(Task task) override; private: void _consumeTasks(stdx::unique_lock<stdx::mutex> lk); diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index eb808735306..a9529d1c825 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { makeReadyFutureWith([&] { - return _onAcquireConn( - state, std::move(future), std::move(*uassertStatusOK(swConn)), baton); + auto connHandle = uassertStatusOK(std::move(swConn)); + return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton); }) .onError([](Status error) -> StatusWith<RemoteCommandResponse> { // The TransportLayer has, for historical reasons returned SocketException for @@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } - _reactor->schedule([action = std::move(action)]() { action(Status::OK()); }); + _reactor->schedule([action = std::move(action)](auto status) { action(status); }); return Status::OK(); } @@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle } if (when <= now()) { - _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); }); + _reactor->schedule([action = std::move(action)](auto status) { action(status); }); return Status::OK(); } @@ -569,7 +569,13 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState> } // Fulfill the promise on a reactor thread - _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); }); + _reactor->schedule([state](auto status) { + if (status.isOK()) { + state->promise.emplaceValue(); + } else { + state->promise.setError(status); + } + }); } bool NetworkInterfaceTL::onNetworkThread() { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index df634ed3fb4..2ab75f49aaa 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -102,7 +102,7 @@ private: transport::ReactorHandle reactor; void operator()(ConnectionPool::ConnectionInterface* ptr) const { - reactor->dispatch([ ret = returner, ptr ] { ret(ptr); }); + reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); }); } }; using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>; diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index a2ffbbac2ba..191537cebff 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -44,19 +44,11 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; - _net->signalWorkAvailable(); - _net->exitNetwork(); - if (_started) { - if (_worker.joinable()) { - lk.unlock(); - _worker.join(); - lk.lock(); - } - } else { - consumeTasks(&lk); - } - invariant(_tasks.empty()); + if (_joining) + return; + + _shutdown(lk); + _join(lk); } void ThreadPoolMock::startup() { @@ -68,74 +60,87 @@ void ThreadPoolMock::startup() { _worker = stdx::thread([this] { _options.onCreateThread(); stdx::unique_lock<stdx::mutex> lk(_mutex); - consumeTasks(&lk); + + LOG(1) << "Starting to consume tasks"; + while (!_joining) { + if (_tasks.empty()) { + lk.unlock(); + _net->waitForWork(); + lk.lock(); + continue; + } + + _consumeOneTask(lk); + } + LOG(1) << "Done consuming tasks"; }); } void ThreadPoolMock::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = true; - _net->signalWorkAvailable(); + stdx::unique_lock<stdx::mutex> lk(_mutex); + _shutdown(lk); } void ThreadPoolMock::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _joining = true; - if (_started) { - stdx::thread toJoin = std::move(_worker); - _net->signalWorkAvailable(); - _net->exitNetwork(); + _join(lk); +} + +void ThreadPoolMock::schedule(Task task) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_inShutdown) { lk.unlock(); - toJoin.join(); - lk.lock(); - invariant(_tasks.empty()); - } else { - consumeTasks(&lk); - invariant(_tasks.empty()); + + task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + return; } + + _tasks.emplace_back(std::move(task)); } -Status ThreadPoolMock::schedule(Task task) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) { + auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); + if (next + 1 != _tasks.size()) { + std::swap(_tasks[next], _tasks.back()); + } + Task fn = std::move(_tasks.back()); + _tasks.pop_back(); + lk.unlock(); if (_inShutdown) { - return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + } else { + fn(Status::OK()); } - _tasks.emplace_back(std::move(task)); - return Status::OK(); + lk.lock(); } -void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) { - using std::swap; +void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) { + LOG(1) << "Shutting down pool"; - LOG(1) << "Starting to consume tasks"; - while (!(_inShutdown && _tasks.empty())) { - if (_tasks.empty()) { - lk->unlock(); - _net->waitForWork(); - lk->lock(); - continue; - } - auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); - if (next + 1 != _tasks.size()) { - swap(_tasks[next], _tasks.back()); - } - Task fn = std::move(_tasks.back()); - _tasks.pop_back(); - lk->unlock(); - fn(); - lk->lock(); - } - LOG(1) << "Done consuming tasks"; + _inShutdown = true; + _net->signalWorkAvailable(); +} - invariant(_tasks.empty()); +void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) { + LOG(1) << "Joining pool"; + + _joining = true; + _net->signalWorkAvailable(); + _net->exitNetwork(); - while (_started && !_joining) { - lk->unlock(); - _net->waitForWork(); - lk->lock(); + // Since there is only one worker thread, we need to consume tasks here to potentially + // unblock that thread. + while (!_tasks.empty()) { + _consumeOneTask(lk); } - LOG(1) << "Ready to join"; + if (_started) { + lk.unlock(); + _worker.join(); + lk.lock(); + } + + invariant(_tasks.empty()); } } // namespace executor diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h index 3d510ffa3a9..e3baaa07273 100644 --- a/src/mongo/executor/thread_pool_mock.h +++ b/src/mongo/executor/thread_pool_mock.h @@ -70,10 +70,12 @@ public: void startup() override; void shutdown() override; void join() override; - Status schedule(Task task) override; + void schedule(Task task) override; private: - void consumeTasks(stdx::unique_lock<stdx::mutex>* lk); + void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk); + void _shutdown(stdx::unique_lock<stdx::mutex>& lk); + void _join(stdx::unique_lock<stdx::mutex>& lk); // These are the options with which the pool was configured at construction time. const Options _options; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index fefecf9a04e..9496b88d21f 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -597,7 +597,9 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) { scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off); - while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) { + + auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); }; + while (!ErrorCodes::isCancelationError(checkStatus().code())) { sleepmillis(100); } } @@ -611,16 +613,24 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } cbState->canceled.store(1); - const auto status = - _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress); + _pool->schedule([this, cbState](auto status) { + invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); + + runCallback(std::move(cbState)); + }); }); } else { - const auto status = - _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - if (status == ErrorCodes::ShutdownInProgress) - break; - fassert(28735, status); + _pool->schedule([this, cbState](auto status) { + if (ErrorCodes::isCancelationError(status.code())) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + cbState->canceled.store(1); + } else { + fassert(28735, status); + } + + runCallback(std::move(cbState)); + }); } } _net->signalWorkAvailable(); diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index 977c709eaa3..0776a4c3626 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -176,7 +176,9 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared<Notification<void>>(); - uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept { + _threadPool.schedule([ nss, version, notify, callbackFn ](auto status) noexcept { + invariant(status); + auto opCtx = Client::getCurrent()->makeOperationContext(); auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { @@ -189,7 +191,7 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin callbackFn(opCtx.get(), std::move(swCollAndChunks)); notify->set(); - })); + }); return notify; } @@ -197,7 +199,9 @@ std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSin void ConfigServerCatalogCacheLoader::getDatabase( StringData dbName, stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - uassertStatusOK(_threadPool.schedule([ name = dbName.toString(), callbackFn ]() noexcept { + _threadPool.schedule([ name = dbName.toString(), callbackFn ](auto status) noexcept { + invariant(status); + auto opCtx = Client::getCurrent()->makeOperationContext(); auto swDbt = [&]() -> StatusWith<DatabaseType> { @@ -214,7 +218,7 @@ void ConfigServerCatalogCacheLoader::getDatabase( }(); callbackFn(opCtx.get(), std::move(swDbt)); - })); + }); } } // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp index 19c1df24d90..edc0cdb0c75 100644 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -183,7 +183,8 @@ Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, } auto wrappedTask = - [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ] { + [ this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags ]( + auto status) { pendingCounterPtr->subtractAndFetch(1); auto start = _tickSource->getTicks(); _totalSpentQueued.addAndFetch(start - scheduleTime); diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 1a987e2242a..2c52deafb69 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -128,11 +128,11 @@ public: } void schedule(Task task) final { - asio::post(_ioContext, std::move(task)); + asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); }); } void dispatch(Task task) final { - asio::dispatch(_ioContext, std::move(task)); + asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); }); } bool onReactorThread() const final { diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index dc0475b801d..1a4aeafea19 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -183,11 +183,11 @@ public: } void schedule(Task task) override { - asio::post(_ioContext, std::move(task)); + asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); }); } void dispatch(Task task) override { - asio::dispatch(_ioContext, std::move(task)); + asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); }); } bool onReactorThread() const override { diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp index 81f1e3c2285..5f732c1a1ed 100644 --- a/src/mongo/util/concurrency/thread_pool.cpp +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -185,15 +185,22 @@ void ThreadPool::_drainPendingTasks() { cleanThread.join(); } -Status ThreadPool::schedule(Task task) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void ThreadPool::schedule(Task task) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + switch (_state) { case joinRequired: case joining: - case shutdownComplete: - return Status(ErrorCodes::ShutdownInProgress, - str::stream() << "Shutdown of thread pool " << _options.poolName - << " in progress"); + case shutdownComplete: { + auto status = Status(ErrorCodes::ShutdownInProgress, + str::stream() << "Shutdown of thread pool " << _options.poolName + << " in progress"); + + lk.unlock(); + task(status); + return; + } break; + case preStart: case running: break; @@ -202,7 +209,7 @@ Status ThreadPool::schedule(Task task) { } _pendingTasks.emplace_back(std::move(task)); if (_state == preStart) { - return Status::OK(); + return; } if (_numIdleThreads < _pendingTasks.size()) { _startWorkerThread_inlock(); @@ -211,7 +218,6 @@ Status ThreadPool::schedule(Task task) { _lastFullUtilizationDate = Date_t::now(); } _workAvailable.notify_one(); - return Status::OK(); } void ThreadPool::waitForIdle() { @@ -332,7 +338,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock<stdx::mutex>* lk) noexcept { _pendingTasks.pop_front(); --_numIdleThreads; lk->unlock(); - task(); + task(Status::OK()); lk->lock(); ++_numIdleThreads; if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) { diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index 370de65056e..c4873f84dff 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -123,7 +123,7 @@ public: void startup() override; void shutdown() override; void join() override; - Status schedule(Task task) override; + void schedule(Task task) override; /** * Blocks the caller until there are no pending tasks on this pool. diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h index 847dfe0506b..bb59ac8e9f8 100644 --- a/src/mongo/util/concurrency/thread_pool_interface.h +++ b/src/mongo/util/concurrency/thread_pool_interface.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/util/functional.h" +#include "mongo/util/out_of_line_executor.h" namespace mongo { @@ -38,13 +39,11 @@ class Status; /** * Interface for a thread pool. */ -class ThreadPoolInterface { +class ThreadPoolInterface : public OutOfLineExecutor { ThreadPoolInterface(const ThreadPoolInterface&) = delete; ThreadPoolInterface& operator=(const ThreadPoolInterface&) = delete; public: - using Task = unique_function<void()>; - /** * Destroys a thread pool. * @@ -82,7 +81,7 @@ public: * It is safe to call this before startup(), but the scheduled task will not execute * until after startup() is called. */ - virtual Status schedule(Task task) = 0; + virtual void schedule(Task task) = 0; protected: ThreadPoolInterface() = default; diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp index 012b5ca3ca7..b4a650c54bb 100644 --- a/src/mongo/util/concurrency/thread_pool_test.cpp +++ b/src/mongo/util/concurrency/thread_pool_test.cpp @@ -104,14 +104,17 @@ TEST_F(ThreadPoolTest, MinPoolSize0) { pool.startup(); ASSERT_EQ(0U, pool.getStats().numThreads); stdx::unique_lock<stdx::mutex> lk(mutex); - ASSERT_OK(pool.schedule([this] { blockingWork(); })); + pool.schedule([this](auto status) { + ASSERT_OK(status); + blockingWork(); + }); while (count1 != 1U) { cv1.wait(lk); } auto stats = pool.getStats(); ASSERT_EQUALS(1U, stats.numThreads); ASSERT_EQUALS(0U, stats.numPendingTasks); - ASSERT_OK(pool.schedule([] {})); + pool.schedule([](auto status) { ASSERT_OK(status); }); stats = pool.getStats(); ASSERT_EQUALS(1U, stats.numThreads); ASSERT_EQUALS(0U, stats.numIdleThreads); @@ -129,7 +132,10 @@ TEST_F(ThreadPoolTest, MinPoolSize0) { lk.lock(); flag2 = false; count1 = 0; - ASSERT_OK(pool.schedule([this] { blockingWork(); })); + pool.schedule([this](auto status) { + ASSERT_OK(status); + blockingWork(); + }); while (count1 == 0) { cv1.wait(lk); } @@ -151,7 +157,10 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) { pool.startup(); stdx::unique_lock<stdx::mutex> lk(mutex); for (size_t i = 0U; i < 30U; ++i) { - ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i; + pool.schedule([this, i](auto status) { + ASSERT_OK(status) << i; + blockingWork(); + }); } while (count1 < 20U) { cv1.wait(lk); @@ -225,7 +234,10 @@ DEATH_TEST(ThreadPoolTest, sleepmillis(50); } stdx::unique_lock<stdx::mutex> lk(mutex); - ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard<stdx::mutex> lk(mutex); })); + pool->schedule([&mutex](auto status) { + ASSERT_OK(status); + stdx::lock_guard<stdx::mutex> lk(mutex); + }); stdx::thread t([&pool] { pool->shutdown(); pool->join(); @@ -257,7 +269,10 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks) ThreadPool pool(options); pool.startup(); - ASSERT_OK(pool.schedule([&barrier] { barrier.countDownAndWait(); })); + pool.schedule([&barrier](auto status) { + ASSERT_OK(status); + barrier.countDownAndWait(); + }); barrier.countDownAndWait(); ASSERT_TRUE(onCreateThreadCalled); diff --git a/src/mongo/util/concurrency/thread_pool_test_common.cpp b/src/mongo/util/concurrency/thread_pool_test_common.cpp index a03da59cd49..2c2113bb890 100644 --- a/src/mongo/util/concurrency/thread_pool_test_common.cpp +++ b/src/mongo/util/concurrency/thread_pool_test_common.cpp @@ -144,7 +144,7 @@ COMMON_THREAD_POOL_TEST(UnusedPool) { COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) { auto& pool = getThreadPool(); pool.shutdown(); - ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([] {})); + pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); }); } COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "it has already started") { @@ -160,9 +160,9 @@ constexpr auto kExceptionMessage = "No good very bad exception"; COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) { auto& pool = getThreadPool(); pool.startup(); - ASSERT_OK(pool.schedule([] { + pool.schedule([](auto status) { uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage})); - })); + }); pool.shutdown(); pool.join(); } @@ -177,7 +177,10 @@ COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") { COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) { auto& pool = getThreadPool(); bool executed = false; - ASSERT_OK(pool.schedule([&executed] { executed = true; })); + pool.schedule([&executed](auto status) { + ASSERT_OK(status); + executed = true; + }); deleteThreadPool(); ASSERT_EQ(executed, true); } @@ -185,7 +188,10 @@ COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) { COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) { auto& pool = getThreadPool(); bool executed = false; - ASSERT_OK(pool.schedule([&executed] { executed = true; })); + pool.schedule([&executed](auto status) { + ASSERT_OK(status); + executed = true; + }); pool.shutdown(); pool.join(); ASSERT_EQ(executed, true); @@ -198,12 +204,15 @@ COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) { std::size_t n = 0; stdx::mutex mutex; stdx::condition_variable condvar; - func = [&pool, &n, &func, &condvar, &mutex, depth] { + func = [&pool, &n, &func, &condvar, &mutex, depth]() { stdx::unique_lock<stdx::mutex> lk(mutex); if (n < depth) { n++; lk.unlock(); - ASSERT_OK(pool.schedule(func)); + pool.schedule([&](auto status) { + ASSERT_OK(status); + func(); + }); } else { pool.shutdown(); condvar.notify_one(); diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp index bd507eaf416..a3804576eaa 100644 --- a/src/mongo/util/keyed_executor_test.cpp +++ b/src/mongo/util/keyed_executor_test.cpp @@ -45,7 +45,7 @@ namespace { class MockExecutor : public OutOfLineExecutor { public: - void schedule(unique_function<void()> func) override { + void schedule(Task func) override { _deque.push_front(std::move(func)); } @@ -60,7 +60,7 @@ public: auto x = std::move(_deque.back()); _deque.pop_back(); - x(); + x(Status::OK()); return true; } @@ -71,27 +71,7 @@ public: } private: - std::deque<unique_function<void()>> _deque; -}; - -class ThreadPoolExecutor : public OutOfLineExecutor { -public: - ThreadPoolExecutor() : _threadPool(ThreadPool::Options{}) {} - - void start() { - _threadPool.startup(); - } - - void shutdown() { - _threadPool.shutdown(); - } - - void schedule(unique_function<void()> func) override { - ASSERT_OK(_threadPool.schedule(std::move(func))); - } - -private: - ThreadPool _threadPool; + std::deque<Task> _deque; }; TEST(KeyedExecutor, basicExecute) { @@ -332,9 +312,9 @@ TEST(KeyedExecutor, gracefulShutdown) { } TEST(KeyedExecutor, withThreadsTest) { - ThreadPoolExecutor tpe; - KeyedExecutor<int> ke(&tpe); - tpe.start(); + auto thread_pool = ThreadPool(ThreadPool::Options{}); + KeyedExecutor<int> ke(&thread_pool); + thread_pool.startup(); constexpr size_t n = (1 << 16); @@ -372,7 +352,7 @@ TEST(KeyedExecutor, withThreadsTest) { stdx::unique_lock<stdx::mutex> lk(mutex); condvar.wait(lk, [&] { return counter == n; }); - tpe.shutdown(); + thread_pool.shutdown(); ASSERT_EQUALS(counter, n); } diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h index 9dc30653860..367ee700fcc 100644 --- a/src/mongo/util/out_of_line_executor.h +++ b/src/mongo/util/out_of_line_executor.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/base/status.h" #include "mongo/stdx/functional.h" #include "mongo/util/future.h" @@ -43,13 +44,10 @@ namespace mongo { * The contract for scheduling work on an executor is that it never blocks the caller. It doesn't * necessarily need to offer forward progress guarantees, but actual calls to schedule() should not * deadlock. - * - * As an explicit point of implementation: it will never invoke the passed callback from within the - * scheduling call. */ class OutOfLineExecutor { public: - using Task = unique_function<void()>; + using Task = unique_function<void(Status)>; public: /** @@ -59,17 +57,32 @@ public: */ template <typename Callback> Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { - auto pf = makePromiseFuture<FutureContinuationResult<Callback>>(); + auto[promise, future] = makePromiseFuture<FutureContinuationResult<Callback>>(); - schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable { + schedule([ cb = std::forward<Callback>(cb), p = std::move(promise) ](auto status) mutable { + if (!status.isOK()) { + p.setError(status); + return; + } p.setWith(std::move(cb)); }); - return std::move(pf.future); + return std::move(future); } /** - * Invokes the callback on the executor. This never happens immediately on the caller's stack. + * Delegates invocation of the Task to this executor + * + * Execution of the Task can happen in one of three contexts: + * * By default, on an execution context maintained by the OutOfLineExecutor (i.e. a thread). + * * During shutdown, on the execution context of shutdown/join/dtor for the OutOfLineExecutor. + * * Post-shutdown, on the execution context of the calling code. + * + * The Task will be passed a Status schedStatus that is either: + * * schedStatus.isOK() if the function is run in an out-of-line context + * * isCancelationError(schedStatus.code()) if the function is run in an inline context + * + * All of this is to say: CHECK YOUR STATUS. */ virtual void schedule(Task func) = 0; |