diff options
68 files changed, 328 insertions, 432 deletions
diff --git a/src/mongo/client/authenticate.cpp b/src/mongo/client/authenticate.cpp index 6dc0337d627..8931c5de702 100644 --- a/src/mongo/client/authenticate.cpp +++ b/src/mongo/client/authenticate.cpp @@ -137,7 +137,7 @@ Future<void> authX509(RunCommandHook runCommand, const BSONObj& params, StringDa // The runCommand hook checks whether the command returned { ok: 1.0 }, and we don't need to // extract anything from the command payload, so this is just turning a Future<BSONObj> // into a Future<void> - return runCommand(authRequest.getValue()).then([](BSONObj obj) { return Status::OK(); }); + return runCommand(authRequest.getValue()).ignoreValue(); } } // namespace @@ -149,7 +149,6 @@ Future<void> authenticateClient(const BSONObj& params, const HostAndPort& hostname, const std::string& clientName, RunCommandHook runCommand) { - std::string mechanism; auto errorHandler = [](Status status) { if (serverGlobalParams.transitionToAuth && !status.isA<ErrorCategory::NetworkError>()) { // If auth failed in transitionToAuth, just pretend it succeeded. @@ -161,6 +160,8 @@ Future<void> authenticateClient(const BSONObj& params, return status; }; + + std::string mechanism; auto response = bsonExtractStringField(params, saslCommandMechanismFieldName, &mechanism); if (!response.isOK()) return response; diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index f321b956c20..65540e088c4 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -172,7 +172,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, const HostAndPort& source, const std::string& dbname, const BSONObj& findCmdObj, - const CallbackFn& work, + CallbackFn work, const BSONObj& metadata, Milliseconds findNetworkTimeout, Milliseconds getMoreNetworkTimeout, @@ -182,7 +182,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, _dbname(dbname), _cmdObj(findCmdObj.getOwned()), _metadata(metadata.getOwned()), - _work(work), + _work(std::move(work)), _findNetworkTimeout(findNetworkTimeout), _getMoreNetworkTimeout(getMoreNetworkTimeout), _firstRemoteCommandScheduler( @@ -190,7 +190,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, nullptr, _findNetworkTimeout), [this](const auto& x) { return this->_callback(x, kFirstBatchFieldName); }, std::move(firstCommandRetryPolicy)) { - uassert(ErrorCodes::BadValue, "callback function cannot be null", work); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _work); } Fetcher::~Fetcher() { diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index e917e9f3f30..e7672b382f0 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -127,7 +127,7 @@ public: const HostAndPort& source, const std::string& dbname, const BSONObj& cmdObj, - const CallbackFn& work, + CallbackFn work, const BSONObj& metadata = ReadPreferenceSetting::secondaryPreferredMetadata(), Milliseconds findNetworkTimeout = RemoteCommandRequest::kNoTimeout, Milliseconds getMoreNetworkTimeout = RemoteCommandRequest::kNoTimeout, diff --git a/src/mongo/db/free_mon/free_mon_controller_test.cpp b/src/mongo/db/free_mon/free_mon_controller_test.cpp index e1d8e3610ac..9d277fe5fb9 100644 --- a/src/mongo/db/free_mon/free_mon_controller_test.cpp +++ b/src/mongo/db/free_mon/free_mon_controller_test.cpp @@ -250,7 +250,7 @@ public: pf.promise.setFrom(doRegister(req)); } else { auto swSchedule = - _threadPool->scheduleWork([ sharedPromise = pf.promise.share(), req, this ]( + _threadPool->scheduleWork([ sharedPromise = std::move(pf.promise), req, this ]( const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { sharedPromise.setWith([&] { return doRegister(req); }); @@ -297,7 +297,7 @@ public: pf.promise.setFrom(doMetrics(req)); } else { auto swSchedule = - _threadPool->scheduleWork([ sharedPromise = pf.promise.share(), req, this ]( + _threadPool->scheduleWork([ sharedPromise = std::move(pf.promise), req, this ]( const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { sharedPromise.setWith([&] { return doMetrics(req); }); diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp index 9708a1071ef..38c71e6f543 100644 --- a/src/mongo/db/free_mon/free_mon_mongod.cpp +++ b/src/mongo/db/free_mon/free_mon_mongod.cpp @@ -180,20 +180,17 @@ private: auto pf = makePromiseFuture<DataBuilder>(); std::string url(exportedExportedFreeMonEndpointURL.getLocked() + path.toString()); - auto status = _executor->scheduleWork([ - shared_promise = pf.promise.share(), - url = std::move(url), - data = std::move(data), - this - ](const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { - ConstDataRange cdr(reinterpret_cast<char*>(data->data()), data->size()); - try { - auto result = this->_client->post(url, cdr); - shared_promise.emplaceValue(std::move(result)); - } catch (...) { - shared_promise.setError(exceptionToStatus()); - } - }); + auto status = _executor->scheduleWork( + [ promise = std::move(pf.promise), url = std::move(url), data = std::move(data), this ]( + const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { + ConstDataRange cdr(reinterpret_cast<char*>(data->data()), data->size()); + try { + auto result = this->_client->post(url, cdr); + promise.emplaceValue(std::move(result)); + } catch (...) { + promise.setError(exceptionToStatus()); + } + }); uassertStatusOK(status); return std::move(pf.future); diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp index cf8941a05f3..9967dd8012a 100644 --- a/src/mongo/db/repl/abstract_async_component.cpp +++ b/src/mongo/db/repl/abstract_async_component.cpp @@ -166,7 +166,7 @@ Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock( } Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock( - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -175,7 +175,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock( str::stream() << "failed to schedule work " << name << ": " << _componentName << " is shutting down"); } - auto result = _executor->scheduleWork(work); + auto result = _executor->scheduleWork(std::move(work)); if (!result.isOK()) { return result.getStatus().withContext(str::stream() << "failed to schedule work " << name); } @@ -185,7 +185,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock( Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock( Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -196,7 +196,7 @@ Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock( << _componentName << " is shutting down"); } - auto result = _executor->scheduleWorkAt(when, work); + auto result = _executor->scheduleWorkAt(when, std::move(work)); if (!result.isOK()) { return result.getStatus().withContext( str::stream() << "failed to schedule work " << name << " at " << when.toString()); diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h index e7f14121c9a..63fa5db7e46 100644 --- a/src/mongo/db/repl/abstract_async_component.h +++ b/src/mongo/db/repl/abstract_async_component.h @@ -148,11 +148,11 @@ protected: * Saves handle if work was successfully scheduled. * Returns scheduleWork status (without the handle). */ - Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work, + Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp index 691c417ad2b..4dd1183b2b4 100644 --- a/src/mongo/db/repl/abstract_async_component_test.cpp +++ b/src/mongo/db/repl/abstract_async_component_test.cpp @@ -65,7 +65,7 @@ public: * Publicly visible versions of _scheduleWorkAndSaveHandle_inlock() and * _scheduleWorkAtAndSaveHandle_inlock() for testing. */ - Status scheduleWorkAndSaveHandle_forTest(const executor::TaskExecutor::CallbackFn& work, + Status scheduleWorkAndSaveHandle_forTest(executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); @@ -73,7 +73,7 @@ public: * Publicly visible version of _scheduleWorkAtAndSaveHandle_inlock() for testing. */ Status scheduleWorkAtAndSaveHandle_forTest(Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); @@ -122,20 +122,20 @@ Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest(const Status } Status MockAsyncComponent::scheduleWorkAndSaveHandle_forTest( - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _scheduleWorkAndSaveHandle_inlock(work, handle, name); + return _scheduleWorkAndSaveHandle_inlock(std::move(work), handle, name); } Status MockAsyncComponent::scheduleWorkAtAndSaveHandle_forTest( Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _scheduleWorkAtAndSaveHandle_inlock(when, work, handle, name); + return _scheduleWorkAtAndSaveHandle_inlock(when, std::move(work), handle, name); } void MockAsyncComponent::cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle) { diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 6876e88f5aa..6038ec6f544 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -109,7 +109,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, - const CallbackFn& onCompletion, + CallbackFn onCompletion, StorageInterface* storageInterface, const int batchSize) : _executor(executor), @@ -118,7 +118,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _sourceNss(sourceNss), _destNss(_sourceNss), _options(options), - _onCompletion(onCompletion), + _onCompletion(std::move(onCompletion)), _storageInterface(storageInterface), _countScheduler(_executor, RemoteCommandRequest( @@ -155,9 +155,10 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _indexSpecs(), _documentsToInsert(), _dbWorkTaskRunner(_dbWorkThreadPool), - _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) { - auto task = [ this, work ](OperationContext * opCtx, - const Status& status) noexcept->TaskRunner::NextAction { + _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) { + auto task = [ this, work = std::move(work) ]( + OperationContext * opCtx, + const Status& status) mutable noexcept->TaskRunner::NextAction { try { work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx)); } catch (...) { @@ -165,7 +166,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, } return TaskRunner::NextAction::kDisposeOperationContext; }; - _dbWorkTaskRunner.schedule(task); + _dbWorkTaskRunner.schedule(std::move(task)); return executor::TaskExecutor::CallbackHandle(); }), _createClientFn([] { return stdx::make_unique<DBClientConnection>(); }), @@ -184,7 +185,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, uassert(50953, "Missing collection UUID in CollectionCloner, collection name: " + sourceNss.ns(), _options.uuid); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface); uassert( 50954, "collectionClonerBatchSize must be non-negative.", _collectionClonerBatchSize >= 0); @@ -292,9 +293,9 @@ void CollectionCloner::waitForDbWorker() { _dbWorkTaskRunner.join(); } -void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn) { +void CollectionCloner::setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) { LockGuard lk(_mutex); - _scheduleDbWorkFn = scheduleDbWorkFn; + _scheduleDbWorkFn = std::move(scheduleDbWorkFn); } void CollectionCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index c7149970f5e..4ed39cc5512 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -93,8 +93,8 @@ public: * * Used for testing only. */ - using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( - const executor::TaskExecutor::CallbackFn&)>; + using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; /** * Type of function to create a database client @@ -117,7 +117,7 @@ public: const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, - const CallbackFn& onCompletion, + CallbackFn onCompletion, StorageInterface* storageInterface, const int batchSize); @@ -152,7 +152,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn); /** * Allows a different client class to be injected. diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 05c36c41cd6..c17cff0cc60 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -668,12 +668,13 @@ TEST_F(CollectionClonerTest, // status. auto exec = &getExecutor(); collectionCloner->setScheduleDbWorkFn_forTest([exec]( - const executor::TaskExecutor::CallbackFn& workFn) { - auto wrappedTask = [workFn](const executor::TaskExecutor::CallbackArgs& cbd) { + executor::TaskExecutor::CallbackFn workFn) { + auto wrappedTask = [workFn = std::move(workFn)]( + const executor::TaskExecutor::CallbackArgs& cbd) { workFn(executor::TaskExecutor::CallbackArgs( cbd.executor, cbd.myHandle, Status(ErrorCodes::CallbackCanceled, ""), cbd.opCtx)); }; - return exec->scheduleWork(wrappedTask); + return exec->scheduleWork(std::move(wrappedTask)); }); bool collectionCreated = false; @@ -1177,12 +1178,11 @@ TEST_F(CollectionClonerTest, // Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet. executor::TaskExecutor::CallbackFn insertDocumentsFn; - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - insertDocumentsFn = workFn; - executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); - return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); - }); + collectionCloner->setScheduleDbWorkFn_forTest([&](executor::TaskExecutor::CallbackFn workFn) { + insertDocumentsFn = std::move(workFn); + executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); + return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); + }); ASSERT_FALSE(insertDocumentsFn); // Return first batch of collection documents from remote server for the getMore request. diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index a4648bc8e9a..b9c1e23edea 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -116,7 +116,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, const ListCollectionsPredicateFn& listCollectionsPred, StorageInterface* si, const CollectionCallbackFn& collWork, - const CallbackFn& onCompletion) + CallbackFn onCompletion) : _executor(executor), _dbWorkThreadPool(dbWorkThreadPool), _source(source), @@ -128,7 +128,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, _listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred), _storageInterface(si), _collectionWork(collWork), - _onCompletion(onCompletion), + _onCompletion(std::move(onCompletion)), _listCollectionsFetcher(_executor, _source, _dbname, @@ -152,7 +152,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty()); uassert(ErrorCodes::BadValue, "storage interface cannot be null", si); uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); _stats.dbname = _dbname; } @@ -259,7 +259,7 @@ void DatabaseCloner::join() { _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } -void DatabaseCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) { +void DatabaseCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index 0a5b893b294..7b2cd152592 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -97,6 +97,9 @@ public: */ using StartCollectionClonerFn = stdx::function<Status(CollectionCloner&)>; + using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; + /** * Creates DatabaseCloner task in inactive state. Use start() to activate cloner. * @@ -116,7 +119,7 @@ public: const ListCollectionsPredicateFn& listCollectionsPredicate, StorageInterface* storageInterface, const CollectionCallbackFn& collectionWork, - const CallbackFn& onCompletion); + CallbackFn onCompletion); virtual ~DatabaseCloner(); @@ -146,7 +149,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); /** * Overrides how executor starts a collection cloner. @@ -234,7 +237,7 @@ private: std::vector<NamespaceString> _collectionNamespaces; // (M) std::list<CollectionCloner> _collectionCloners; // (M) std::list<CollectionCloner>::iterator _currentCollectionClonerIter; // (M) - CollectionCloner::ScheduleDbWorkFn + ScheduleDbWorkFn _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. StartCollectionClonerFn _startCollectionCloner; // (RT) Stats _stats; // (M) Stats about what this instance did. diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index 65a0e054b9a..83c428340a7 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -104,10 +104,9 @@ void DatabaseClonerTest::setUp() { storageInterface.get(), makeCollectionWorkClosure(), makeSetStatusClosure()); - _databaseCloner->setScheduleDbWorkFn_forTest( - [this](const executor::TaskExecutor::CallbackFn& work) { - return getExecutor().scheduleWork(work); - }); + _databaseCloner->setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) { + return getExecutor().scheduleWork(std::move(work)); + }); _mockServer = stdx::make_unique<MockRemoteDBServer>(target.toString()); _mockServer->assignCollectionUuid("db.a", *_options1.uuid); diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index 960d84fe960..ac3bc398059 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -227,7 +227,7 @@ Status DatabasesCloner::startup() noexcept { return Status::OK(); } -void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) { +void DatabasesCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; } diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h index 6af3b927229..970abc5da60 100644 --- a/src/mongo/db/repl/databases_cloner.h +++ b/src/mongo/db/repl/databases_cloner.h @@ -69,6 +69,8 @@ public: using IncludeDbFilterFn = stdx::function<bool(const BSONObj& dbInfo)>; using OnFinishFn = stdx::function<void(const Status&)>; using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn; + using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; DatabasesCloner(StorageInterface* si, executor::TaskExecutor* exec, @@ -98,7 +100,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); /** * Overrides how executor starts a collection cloner. @@ -178,10 +180,10 @@ private: mutable stdx::mutex _mutex; // (S) Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything. executor::TaskExecutor* _exec; // (R) executor to schedule things with - ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. - const HostAndPort _source; // (R) The source to use. - CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) - StartCollectionClonerFn _startCollectionClonerFn; // (M) + ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. + const HostAndPort _source; // (R) The source to use. + ScheduleDbWorkFn _scheduleDbWorkFn; // (M) + StartCollectionClonerFn _startCollectionClonerFn; // (M) const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned. OnFinishFn _finishFn; // (M) function called when finished. diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 47bec4f18ce..40a3b39a744 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -134,7 +134,7 @@ public: getNet()->runReadyNetworkOperations(); if (getNet()->hasReadyRequests()) { log() << "The network has unexpected requests to process, next req:"; - NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest(); log() << req.getDiagnosticString(); } ASSERT_FALSE(getNet()->hasReadyRequests()); @@ -301,8 +301,8 @@ protected: result = status; cvDone.notify_all(); }}; - cloner.setScheduleDbWorkFn_forTest([this](const executor::TaskExecutor::CallbackFn& work) { - return getExecutor().scheduleWork(work); + cloner.setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) { + return getExecutor().scheduleWork(std::move(work)); }); cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) { diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 6b68c13a7ea..28d3266ca82 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -378,7 +378,7 @@ BSONObj InitialSyncer::_getInitialSyncProgress_inlock() const { return bob.obj(); } -void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) { +void InitialSyncer::setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; } @@ -1431,7 +1431,7 @@ Status InitialSyncer::_checkForShutdownAndConvertStatus_inlock(const Status& sta } Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock( - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -1440,7 +1440,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock( str::stream() << "failed to schedule work " << name << ": initial syncer is shutting down"); } - auto result = _exec->scheduleWork(work); + auto result = _exec->scheduleWork(std::move(work)); if (!result.isOK()) { return result.getStatus().withContext(str::stream() << "failed to schedule work " << name); } @@ -1450,7 +1450,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock( Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock( Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -1460,7 +1460,7 @@ Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock( << when.toString() << ": initial syncer is shutting down"); } - auto result = _exec->scheduleWorkAt(when, work); + auto result = _exec->scheduleWorkAt(when, std::move(work)); if (!result.isOK()) { return result.getStatus().withContext( str::stream() << "failed to schedule work " << name << " at " << when.toString()); diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 0ead52f6f46..929643116c5 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -222,7 +222,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& scheduleDbWorkFn); /** * Overrides how executor starts a collection cloner. @@ -540,11 +540,11 @@ private: * Saves handle if work was successfully scheduled. * Returns scheduleWork status (without the handle). */ - Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work, + Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); @@ -633,8 +633,8 @@ private: State _state = State::kPreStart; // (M) // Passed to CollectionCloner via DatabasesCloner. - CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) - StartCollectionClonerFn _startCollectionClonerFn; // (M) + DatabaseCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) + StartCollectionClonerFn _startCollectionClonerFn; // (M) // Contains stats on the current initial sync request (includes all attempts). // To access these stats in a user-readable format, use getInitialSyncProgress(). diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 6bc02d4d15a..620d9c5abff 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -215,7 +215,7 @@ public: getNet()->runReadyNetworkOperations(); if (getNet()->hasReadyRequests()) { log() << "The network has unexpected requests to process, next req:"; - NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest(); log() << req.getDiagnosticString(); } ASSERT_FALSE(getNet()->hasReadyRequests()); @@ -404,8 +404,8 @@ protected: _onCompletion(lastApplied); }); _initialSyncer->setScheduleDbWorkFn_forTest( - [this](const executor::TaskExecutor::CallbackFn& work) { - return getExecutor().scheduleWork(work); + [this](executor::TaskExecutor::CallbackFn work) { + return getExecutor().scheduleWork(std::move(work)); }); _initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) { cloner.setCreateClientFn_forTest([&cloner, this]() { diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp index 655426a5ce3..e568bb3f8ca 100644 --- a/src/mongo/db/repl/multiapplier.cpp +++ b/src/mongo/db/repl/multiapplier.cpp @@ -45,15 +45,15 @@ namespace repl { MultiApplier::MultiApplier(executor::TaskExecutor* executor, const Operations& operations, const MultiApplyFn& multiApply, - const CallbackFn& onCompletion) + CallbackFn onCompletion) : _executor(executor), _operations(operations), _multiApply(multiApply), - _onCompletion(onCompletion) { + _onCompletion(std::move(onCompletion)) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); uassert(ErrorCodes::BadValue, "multi apply function cannot be null", multiApply); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); } MultiApplier::~MultiApplier() { diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index 3b2a59f7fc8..79fec694b98 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -67,7 +67,7 @@ public: /** * Callback function to report final status of applying operations. */ - using CallbackFn = stdx::function<void(const Status&)>; + using CallbackFn = unique_function<void(const Status&)>; using MultiApplyFn = stdx::function<StatusWith<OpTime>(OperationContext*, MultiApplier::Operations)>; @@ -88,7 +88,7 @@ public: MultiApplier(executor::TaskExecutor* executor, const Operations& operations, const MultiApplyFn& multiApply, - const CallbackFn& onCompletion); + CallbackFn onCompletion); /** * Blocks while applier is active. diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 676d9c1f787..0eb44197c76 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -120,14 +120,14 @@ OplogBuffer* OplogApplier::getBuffer() const { Future<void> OplogApplier::startup() { auto pf = makePromiseFuture<void>(); auto callback = - [ this, promise = pf.promise.share() ](const CallbackArgs& args) mutable noexcept { + [ this, promise = std::move(pf.promise) ](const CallbackArgs& args) mutable noexcept { invariant(args.status); log() << "Starting oplog application"; _run(_oplogBuffer); log() << "Finished oplog application"; promise.setWith([] {}); }; - invariant(_executor->scheduleWork(callback).getStatus()); + invariant(_executor->scheduleWork(std::move(callback)).getStatus()); return std::move(pf.future); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 18c9ccac0a8..f5b1cbb55db 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -191,9 +191,9 @@ auto makeTaskExecutor(ServiceContext* service, const std::string& poolName) { * Schedules a task using the executor. This task is always run unless the task executor is shutting * down. */ -void scheduleWork(executor::TaskExecutor* executor, - const executor::TaskExecutor::CallbackFn& work) { - auto cbh = executor->scheduleWork([work](const executor::TaskExecutor::CallbackArgs& args) { +void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) { + auto cbh = executor->scheduleWork([work = std::move(work)]( + const executor::TaskExecutor::CallbackArgs& args) { if (args.status == ErrorCodes::CallbackCanceled) { return; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 824db78c888..e151e0b2482 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2726,7 +2726,7 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock() { } // Schedule timeout callback. auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout; - auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB); + auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, std::move(timeoutCB)); if (!status.isOK()) { log() << "Failed to schedule catchup timeout work."; abort_inlock(); @@ -3512,13 +3512,14 @@ void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() { } } -CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) { - auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) { - if (args.status == ErrorCodes::CallbackCanceled) { - return; - } - work(args); - }); +CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, CallbackFn work) { + auto cbh = + _replExecutor->scheduleWorkAt(when, [work = std::move(work)](const CallbackArgs& args) { + if (args.status == ErrorCodes::CallbackCanceled) { + return; + } + work(args); + }); if (cbh == ErrorCodes::ShutdownInProgress) { return {}; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index cff7179d1ce..233c0bf88e0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1034,7 +1034,7 @@ private: * All other non-shutdown scheduling failures will abort the process. * Does not run 'work' if callback is canceled. */ - CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work); + CallbackHandle _scheduleWorkAt(Date_t when, CallbackFn work); /** * Creates an event. diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index f11ef6c3d94..e5b7607390d 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -579,7 +579,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp, TaskExecutorWithFailureInScheduleWork(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWork( - const CallbackFn& work) override { + CallbackFn work) override { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } }; @@ -631,7 +631,7 @@ TEST_F(ReporterTest, FailingToScheduleTimeoutShouldMakeReporterInactive) { TaskExecutorWithFailureInScheduleWorkAt(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWorkAt( - Date_t when, const CallbackFn& work) override { + Date_t when, CallbackFn work) override { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } }; diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp index d3e14a7d8f1..727014a5cc2 100644 --- a/src/mongo/db/repl/task_executor_mock.cpp +++ b/src/mongo/db/repl/task_executor_mock.cpp @@ -38,25 +38,24 @@ namespace repl { TaskExecutorMock::TaskExecutorMock(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} -StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork( - const CallbackFn& work) { +StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork(CallbackFn work) { if (shouldFailScheduleWorkRequest()) { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } if (shouldDeferScheduleWorkRequestByOneSecond()) { auto when = now() + Seconds(1); - return getExecutor()->scheduleWorkAt(when, work); + return getExecutor()->scheduleWorkAt(when, std::move(work)); } - return getExecutor()->scheduleWork(work); + return getExecutor()->scheduleWork(std::move(work)); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWorkAt( - Date_t when, const CallbackFn& work) { + Date_t when, CallbackFn work) { if (shouldFailScheduleWorkAtRequest()) { return Status(ErrorCodes::OperationFailed, str::stream() << "failed to schedule work at " << when.toString()); } - return getExecutor()->scheduleWorkAt(when, work); + return getExecutor()->scheduleWorkAt(when, std::move(work)); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand( diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h index 41740b74433..e5e92e0f0e1 100644 --- a/src/mongo/db/repl/task_executor_mock.h +++ b/src/mongo/db/repl/task_executor_mock.h @@ -47,8 +47,8 @@ public: explicit TaskExecutorMock(executor::TaskExecutor* executor); - StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; StatusWith<CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 94bd65d1d8d..43f0b735789 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -102,12 +102,12 @@ bool TaskRunner::isActive() const { return _active; } -void TaskRunner::schedule(const Task& task) { +void TaskRunner::schedule(Task task) { invariant(task); stdx::lock_guard<stdx::mutex> lk(_mutex); - _tasks.push_back(task); + _tasks.push_back(std::move(task)); _condition.notify_all(); if (_active) { @@ -174,8 +174,8 @@ void TaskRunner::_runTasks() { tasks.swap(_tasks); lk.unlock(); // Cancel remaining tasks with a CallbackCanceled status. - for (auto task : tasks) { - runSingleTask(task, + for (auto&& task : tasks) { + runSingleTask(std::move(task), nullptr, Status(ErrorCodes::CallbackCanceled, "this task has been canceled by a previously invoked task")); @@ -207,46 +207,10 @@ TaskRunner::Task TaskRunner::_waitForNextTask() { return Task(); } - Task task = _tasks.front(); + Task task = std::move(_tasks.front()); _tasks.pop_front(); return task; } -Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextAction nextAction) { - // Setup cond_var for signaling when done. - bool done = false; - stdx::mutex mutex; - stdx::condition_variable waitTillDoneCond; - - Status returnStatus{Status::OK()}; - this->schedule([&](OperationContext* opCtx, const Status taskStatus) { - if (!taskStatus.isOK()) { - returnStatus = taskStatus; - } else { - // Run supplied function. - try { - returnStatus = func(opCtx); - } catch (...) { - returnStatus = exceptionToStatus(); - error() << "Exception thrown in runSynchronousTask: " << redact(returnStatus); - } - } - - // Signal done. - LockGuard lk2{mutex}; - done = true; - waitTillDoneCond.notify_all(); - - // return nextAction based on status from supplied function. - if (returnStatus.isOK()) { - return nextAction; - } - return TaskRunner::NextAction::kCancel; - }); - - UniqueLock lk{mutex}; - waitTillDoneCond.wait(lk, [&done] { return done; }); - return returnStatus; -} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h index 6875229bb9e..85313cc4ce9 100644 --- a/src/mongo/db/repl/task_runner.h +++ b/src/mongo/db/repl/task_runner.h @@ -38,6 +38,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/functional.h" namespace mongo { @@ -60,17 +61,7 @@ public: kCancel = 3, }; - using Task = stdx::function<NextAction(OperationContext*, const Status&)>; - using SynchronousTask = stdx::function<Status(OperationContext* opCtx)>; - - /** - * Returns the Status from the supplied function after running it.. - * - * Note: TaskRunner::NextAction controls when the operation context and thread will be released. - */ - Status runSynchronousTask( - SynchronousTask func, - TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext); + using Task = unique_function<NextAction(OperationContext*, const Status&)>; /** * Creates a Task returning kCancel. This is useful in shutting down the task runner after @@ -126,7 +117,7 @@ public: * immediately. This is usually the case when the task runner is canceled. Accessing the * operation context in the task will result in undefined behavior. */ - void schedule(const Task& task); + void schedule(Task task); /** * If there is a task that is already running, allows the task to run to completion. diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp index b01885ba332..17771287f90 100644 --- a/src/mongo/db/repl/task_runner_test.cpp +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -83,7 +83,7 @@ using OpIdVector = std::vector<unsigned int>; OpIdVector _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction, - stdx::function<void(const Task& task)> schedule) { + unique_function<void(Task task)> schedule) { unittest::Barrier barrier(2U); stdx::mutex mutex; std::vector<OperationContext*> txns; @@ -121,7 +121,7 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test, std::vector<unsigned int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) { - auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); }; + auto schedule = [&](Task task) { test.getTaskRunner().schedule(std::move(task)); }; return _testRunTaskTwice(test, nextAction, schedule); } @@ -134,9 +134,9 @@ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) { // Joining thread pool before scheduling second task ensures that task runner releases // thread back to pool after disposing of operation context. TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) { - auto schedule = [this](const Task& task) { + auto schedule = [this](Task task) { getThreadPool().waitForIdle(); - getTaskRunner().schedule(task); + getTaskRunner().schedule(std::move(task)); }; auto txnId = _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule); diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 63445075780..a6438e3cf7e 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -179,7 +179,7 @@ private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>; - using Request = std::pair<Date_t, SharedPromise<ConnectionHandle>>; + using Request = std::pair<Date_t, Promise<ConnectionHandle>>; struct RequestComparator { bool operator()(const Request& a, const Request& b) { return a.first > b.first; @@ -461,7 +461,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec const auto expiration = _parent->_factory->now() + timeout; auto pf = makePromiseFuture<ConnectionHandle>(); - _requests.push_back(make_pair(expiration, pf.promise.share())); + _requests.push_back(make_pair(expiration, std::move(pf.promise))); std::push_heap(begin(_requests), end(_requests), RequestComparator{}); updateStateInLock(); diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 97320d12bf0..c55a7925bdc 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -38,6 +38,7 @@ #include "mongo/stdx/functional.h" #include "mongo/transport/baton.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/functional.h" #include "mongo/util/future.h" namespace mongo { @@ -57,7 +58,7 @@ class NetworkInterface { public: using Response = RemoteCommandResponse; - using RemoteCommandCompletionFn = stdx::function<void(const TaskExecutor::ResponseStatus&)>; + using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseStatus&)>; virtual ~NetworkInterface(); @@ -145,7 +146,7 @@ public: */ virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton = nullptr) = 0; Future<TaskExecutor::ResponseStatus> startCommand( @@ -154,13 +155,13 @@ public: const transport::BatonHandle& baton = nullptr) { auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>(); - auto status = - startCommand(cbHandle, - request, - [sp = pf.promise.share()](const TaskExecutor::ResponseStatus& rs) mutable { - sp.emplaceValue(rs); - }, - baton); + auto status = startCommand( + cbHandle, + request, + [p = std::move(pf.promise)](const TaskExecutor::ResponseStatus& rs) mutable { + p.emplaceValue(rs); + }, + baton); if (!status.isOK()) { return status; @@ -190,7 +191,7 @@ public: * return true. See that method for why. */ virtual Status setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton = nullptr) = 0; /** diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index bd6cd9bf41e..f43af8bd17a 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -323,9 +323,10 @@ TEST_F(NetworkInterfaceTest, SetAlarm) { Date_t expiration = net().now() + Milliseconds(100); auto makeTimerFuture = [&] { auto pf = makePromiseFuture<Date_t>(); - return std::make_pair( - [ this, promise = pf.promise.share() ]() mutable { promise.emplaceValue(net().now()); }, - std::move(pf.future)); + return std::make_pair([ this, promise = std::move(pf.promise) ]() mutable { + promise.emplaceValue(net().now()); + }, + std::move(pf.future)); }; auto futurePair = makeTimerFuture(); diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 258c531707e..242e81126f6 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -115,7 +115,7 @@ std::string NetworkInterfaceMock::getHostName() { Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; @@ -124,7 +124,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, stdx::lock_guard<stdx::mutex> lk(_mutex); const Date_t now = _now_inlock(); - auto op = NetworkOperation(cbHandle, request, now, onFinish); + auto op = NetworkOperation(cbHandle, request, now, std::move(onFinish)); // If we don't have a hook, or we have already 'connected' to this host, enqueue the op. if (!_hook || _connections.count(request.target)) { @@ -180,7 +180,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( } Status NetworkInterfaceMock::setAlarm(const Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; @@ -193,7 +193,7 @@ Status NetworkInterfaceMock::setAlarm(const Date_t when, action(); return Status::OK(); } - _alarms.emplace(when, action); + _alarms.emplace(when, std::move(action)); return Status::OK(); } @@ -460,16 +460,21 @@ void NetworkInterfaceMock::_enqueueOperation_inlock( return a.getNextConsiderationDate() < b.getNextConsiderationDate(); }); + const auto timeout = op.getRequest().timeout; + auto cbh = op.getCallbackHandle(); + _unscheduled.emplace(insertBefore, std::move(op)); - if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) { - invariant(op.getRequest().timeout >= Milliseconds(0)); + if (timeout != RemoteCommandRequest::kNoTimeout) { + invariant(timeout >= Milliseconds(0)); ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0)); std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_scheduled}; - auto action = [ =, cbh = op.getCallbackHandle() ] { - _interruptWithResponse_inlock(cbh, queuesToCheck, rs); - }; - _alarms.emplace(_now_inlock() + op.getRequest().timeout, action); + _alarms.emplace(_now_inlock() + timeout, [ + this, + cbh = std::move(cbh), + queuesToCheck = std::move(queuesToCheck), + rs = std::move(rs) + ] { _interruptWithResponse_inlock(cbh, queuesToCheck, rs); }); } } @@ -504,13 +509,14 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort if (!hookPostconnectCommand) { // If we don't have a post connect command, enqueue the actual command. - _enqueueOperation_inlock(std::move(op)); _connections.emplace(op.getRequest().target); + _enqueueOperation_inlock(std::move(op)); return; } + auto cbh = op.getCallbackHandle(); // The completion handler for the postconnect command schedules the original command. - auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable { + auto postconnectCompletionHandler = [ this, op = std::move(op) ](ResponseStatus rs) mutable { stdx::lock_guard<stdx::mutex> lk(_mutex); if (!rs.isOK()) { op.setResponse(_now_inlock(), rs); @@ -526,11 +532,11 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort return; } - _enqueueOperation_inlock(std::move(op)); _connections.emplace(op.getRequest().target); + _enqueueOperation_inlock(std::move(op)); }; - auto postconnectOp = NetworkOperation(op.getCallbackHandle(), + auto postconnectOp = NetworkOperation(cbh, std::move(*hookPostconnectCommand), _now_inlock(), std::move(postconnectCompletionHandler)); @@ -563,7 +569,7 @@ void NetworkInterfaceMock::signalWorkAvailable() { void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) { while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) { - auto fn = _alarms.top().action; + auto fn = std::move(_alarms.top().action); _alarms.pop(); lk->unlock(); fn(); @@ -571,7 +577,7 @@ void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<s } while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) { invariant(_currentlyRunning == kNetworkThread); - NetworkOperation op = _scheduled.front(); + NetworkOperation op = std::move(_scheduled.front()); _scheduled.pop_front(); _waitingToRunMask |= kExecutorThread; lk->unlock(); @@ -637,16 +643,14 @@ NetworkInterfaceMock::NetworkOperation::NetworkOperation() NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle, const RemoteCommandRequest& theRequest, Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish) + RemoteCommandCompletionFn onFinish) : _requestDate(theRequestDate), _nextConsiderationDate(theRequestDate), _responseDate(), _cbHandle(cbHandle), _request(theRequest), _response(kUnsetResponse), - _onFinish(onFinish) {} - -NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {} + _onFinish(std::move(onFinish)) {} std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const { return str::stream() << "NetworkOperation -- request:'" << _request.toString() diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index f87b18c27b3..7023101fd59 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -113,7 +113,7 @@ public: virtual std::string getHostName(); virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton = nullptr); /** @@ -129,7 +129,7 @@ public: * Not implemented. */ virtual Status setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton = nullptr); virtual bool onNetworkThread(); @@ -284,7 +284,7 @@ private: * Information describing a scheduled alarm. */ struct AlarmInfo { - using AlarmAction = stdx::function<void()>; + using AlarmAction = unique_function<void()>; AlarmInfo(Date_t inWhen, AlarmAction inAction) : when(inWhen), action(std::move(inAction)) {} bool operator>(const AlarmInfo& rhs) const { @@ -292,7 +292,7 @@ private: } Date_t when; - AlarmAction action; + mutable AlarmAction action; }; /** @@ -435,8 +435,7 @@ public: NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& theRequest, Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish); - ~NetworkOperation(); + RemoteCommandCompletionFn onFinish); /** * Adjusts the stored virtual time at which this entry will be subject to consideration @@ -556,8 +555,8 @@ public: Date_t now() override { return _net->now(); } - Status setAlarm(Date_t when, stdx::function<void()> action) override { - return _net->setAlarm(when, action); + Status setAlarm(Date_t when, unique_function<void()> action) override { + return _net->setAlarm(when, std::move(action)); } private: diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 74681c42308..a395b1f48bb 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -172,7 +172,7 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; @@ -205,7 +205,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) { log() << "Discarding command due to failpoint before acquireConn"; - std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) { + std::move(pf.future).getAsync([onFinish = std::move(onFinish)]( + StatusWith<RemoteCommandResponse> response) mutable { onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0})); }); return Status::OK(); @@ -237,8 +238,9 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa }); }); - auto remainingWork = [ this, state, future = std::move(pf.future), baton, onFinish ]( - StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + auto remainingWork = + [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { makeReadyFutureWith([&] { return _onAcquireConn( state, std::move(future), std::move(*uassertStatusOK(swConn)), baton); @@ -251,7 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa } return error; }) - .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { + .getAsync([ this, state, onFinish = std::move(onFinish) ]( + StatusWith<RemoteCommandResponse> response) { auto duration = now() - state->start; if (!response.isOK()) { onFinish(RemoteCommandResponse(response.getStatus(), duration)); @@ -430,7 +433,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan } Status NetworkInterfaceTL::setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; @@ -454,37 +457,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, } alarmTimer->waitUntil(when, baton) - .getAsync([this, weakTimer, action, when, baton](Status status) { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when - << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action), baton); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); + .getAsync( + [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); } - return; - } + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action), baton); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); + } - if (status.isOK()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); + return; } - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + + if (status.isOK()) { + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 048aaba7f5b..1dbcc7a7678 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -67,13 +67,13 @@ public: Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, const transport::BatonHandle& baton) override; Status setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton) override; bool onNetworkThread() override; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 3c845a18824..e8cff8ee0f6 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -94,7 +94,7 @@ public: * the callback was canceled for any reason (including shutdown). Otherwise, it should have * Status::OK(). */ - using CallbackFn = stdx::function<void(const CallbackArgs&)>; + using CallbackFn = unique_function<void(const CallbackArgs&)>; /** * Type of a callback from a request to run a command on a remote MongoDB node. @@ -175,8 +175,7 @@ public: * * May be called by client threads or callbacks running in the executor. */ - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, - const CallbackFn& work) = 0; + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) = 0; /** * Blocks the calling thread until "event" is signaled. Also returns if the event is never @@ -209,7 +208,7 @@ public: * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0; + virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) = 0; /** * Schedules "work" to be run by the executor no sooner than "when". @@ -224,7 +223,7 @@ public: * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) = 0; /** * Schedules "cb" to be run by the executor with the result of executing the remote command diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index a60b115b923..42373c2b47b 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -307,13 +307,14 @@ void EventChainAndWaitingTest::onGo(const TaskExecutor::CallbackArgs& cbData) { return; } triggerEvent = errorOrTriggerEvent.getValue(); - StatusWith<TaskExecutor::CallbackHandle> cbHandle = executor->onEvent(triggerEvent, triggered2); + StatusWith<TaskExecutor::CallbackHandle> cbHandle = + executor->onEvent(triggerEvent, std::move(triggered2)); if (!cbHandle.isOK()) { status1 = cbHandle.getStatus(); executor->shutdown(); return; } - cbHandle = executor->onEvent(triggerEvent, triggered3); + cbHandle = executor->onEvent(triggerEvent, std::move(triggered3)); if (!cbHandle.isOK()) { status1 = cbHandle.getStatus(); executor->shutdown(); diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h index 5520de68396..8d596b3d3c5 100644 --- a/src/mongo/executor/thread_pool_mock.h +++ b/src/mongo/executor/thread_pool_mock.h @@ -34,6 +34,7 @@ #include <vector> #include "mongo/platform/random.h" +#include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/thread_pool_interface.h" diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index d48a0888bc6..94e547b87f3 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -274,11 +274,11 @@ void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) { } StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event, - const CallbackFn& work) { + CallbackFn work) { if (!event.isValid()) { return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; } - auto wq = makeSingletonWorkQueue(work, nullptr); + auto wq = makeSingletonWorkQueue(std::move(work), nullptr); stdx::unique_lock<stdx::mutex> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); @@ -323,9 +323,8 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { } } -StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( - const CallbackFn& work) { - auto wq = makeSingletonWorkQueue(work, nullptr); +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(CallbackFn work) { + auto wq = makeSingletonWorkQueue(std::move(work), nullptr); WorkQueue temp; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); @@ -336,12 +335,12 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( return cbHandle; } -StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( - Date_t when, const CallbackFn& work) { +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(Date_t when, + CallbackFn work) { if (when <= now()) { - return scheduleWork(work); + return scheduleWork(std::move(work)); } - auto wq = makeSingletonWorkQueue(work, nullptr, when); + auto wq = makeSingletonWorkQueue(std::move(work), nullptr, when); stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq); if (!cbHandle.isOK()) { diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 0fd821baa95..5163ad9d10d 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -75,13 +75,13 @@ public: Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) override; void waitForEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; StatusWith<CallbackHandle> scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp index dd9bde0518a..d11de192011 100644 --- a/src/mongo/s/sharding_task_executor.cpp +++ b/src/mongo/s/sharding_task_executor.cpp @@ -89,8 +89,8 @@ void ShardingTaskExecutor::signalEvent(const EventHandle& event) { } StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::onEvent(const EventHandle& event, - const CallbackFn& work) { - return _executor->onEvent(event, work); + CallbackFn work) { + return _executor->onEvent(event, std::move(work)); } void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { @@ -103,14 +103,13 @@ StatusWith<stdx::cv_status> ShardingTaskExecutor::waitForEvent(OperationContext* return _executor->waitForEvent(opCtx, event, deadline); } -StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork( - const CallbackFn& work) { - return _executor->scheduleWork(work); +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork(CallbackFn work) { + return _executor->scheduleWork(std::move(work)); } -StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt( - Date_t when, const CallbackFn& work) { - return _executor->scheduleWorkAt(when, work); +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(Date_t when, + CallbackFn work) { + return _executor->scheduleWorkAt(when, std::move(work)); } StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand( diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h index 9e018f497aa..3a0df5fb57e 100644 --- a/src/mongo/s/sharding_task_executor.h +++ b/src/mongo/s/sharding_task_executor.h @@ -62,13 +62,13 @@ public: Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; void waitForEvent(const EventHandle& event) override; StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) override; - StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; StatusWith<CallbackHandle> scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, diff --git a/src/mongo/transport/asio_utils.h b/src/mongo/transport/asio_utils.h index a48a1a7043b..2a8a2310a6b 100644 --- a/src/mongo/transport/asio_utils.h +++ b/src/mongo/transport/asio_utils.h @@ -350,7 +350,7 @@ struct AsyncHandlerHelper { template <> struct AsyncHandlerHelper<> { using Result = void; - static void complete(SharedPromise<Result>* promise) { + static void complete(Promise<Result>* promise) { promise->emplaceValue(); } }; @@ -358,7 +358,7 @@ struct AsyncHandlerHelper<> { template <typename Arg> struct AsyncHandlerHelper<Arg> { using Result = Arg; - static void complete(SharedPromise<Result>* promise, Arg arg) { + static void complete(Promise<Result>* promise, Arg arg) { promise->emplaceValue(arg); } }; @@ -369,7 +369,7 @@ struct AsyncHandlerHelper<std::error_code, Args...> { using Result = typename Helper::Result; template <typename... Args2> - static void complete(SharedPromise<Result>* promise, std::error_code ec, Args2&&... args) { + static void complete(Promise<Result>* promise, std::error_code ec, Args2&&... args) { if (ec) { promise->setError(errorCodeToStatus(ec)); } else { @@ -381,7 +381,7 @@ struct AsyncHandlerHelper<std::error_code, Args...> { template <> struct AsyncHandlerHelper<std::error_code> { using Result = void; - static void complete(SharedPromise<Result>* promise, std::error_code ec) { + static void complete(Promise<Result>* promise, std::error_code ec) { if (ec) { promise->setError(errorCodeToStatus(ec)); } else { @@ -402,7 +402,7 @@ struct AsyncHandler { Helper::complete(&promise, std::forward<Args2>(args)...); } - SharedPromise<Result> promise; + Promise<Result> promise; }; template <typename... Args> @@ -414,7 +414,7 @@ struct AsyncResult { explicit AsyncResult(completion_handler_type& handler) { auto pf = makePromiseFuture<RealResult>(); fut = std::move(pf.future); - handler.promise = pf.promise.share(); + handler.promise = std::move(pf.promise); } auto get() { diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index 6440b7d4e39..9dbf570ede2 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -75,8 +75,8 @@ public: Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { auto pf = makePromiseFuture<FutureContinuationResult<Callback>>(); - schedule([ cb = std::forward<Callback>(cb), sp = pf.promise.share() ]() mutable { - sp.setWith(std::move(cb)); + schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable { + p.setWith(std::move(cb)); }); return std::move(pf.future); diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index f96e73b67a3..e8dcf79deeb 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -150,8 +150,8 @@ public: auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); auto pf = makePromiseFuture<void>(); - _safeExecute([ fd, type, sp = pf.promise.share(), this ] { - _sessions[fd] = TransportSession{type, sp}; + _safeExecute([ fd, type, promise = std::move(pf.promise), this ]() mutable { + _sessions[fd] = TransportSession{type, std::move(promise)}; }); return std::move(pf.future); @@ -159,13 +159,14 @@ public: Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { auto pf = makePromiseFuture<void>(); - _safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] { - auto pair = _timers.insert({ - timerPtr, expiration, sp, + _safeExecute( + [ timerPtr = &timer, expiration, promise = std::move(pf.promise), this ]() mutable { + auto pair = _timers.insert({ + timerPtr, expiration, std::move(promise), + }); + invariant(pair.second); + _timersById[pair.first->id] = pair.first; }); - invariant(pair.second); - _timersById[pair.first->id] = pair.first; - }); return std::move(pf.future); } @@ -244,7 +245,7 @@ public: } void run(ClockSource* clkSource) noexcept override { - std::vector<SharedPromise<void>> toFulfill; + std::vector<Promise<void>> toFulfill; // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks const auto guard = MakeGuard([&] { @@ -368,7 +369,7 @@ private: struct Timer { const ReactorTimer* id; Date_t expiration; - SharedPromise<void> promise; + mutable Promise<void> promise; // Needs to be mutable to move from it while in std::set. struct LessThan { bool operator()(const Timer& lhs, const Timer& rhs) const { @@ -379,7 +380,7 @@ private: struct TransportSession { Type type; - SharedPromise<void> promise; + Promise<void> promise; }; template <typename Callback> @@ -394,7 +395,7 @@ private: template <typename Callback> void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { if (_inPoll) { - _scheduled.push_back([cb, this] { + _scheduled.push_back([ cb = std::forward<Callback>(cb), this ]() mutable { stdx::lock_guard<stdx::mutex> lk(_mutex); cb(); }); diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index f13b8ec5c5f..f6c66ec98b7 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -130,9 +130,9 @@ public: void schedule(ScheduleMode mode, Task task) final { if (mode == kDispatch) { - _ioContext.dispatch(std::move(task)); + asio::dispatch(_ioContext, std::move(task)); } else { - _ioContext.post(std::move(task)); + asio::post(_ioContext, std::move(task)); } } diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index d41d48a8658..ca1453cb3a4 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/stdx/functional.h" #include "mongo/transport/session.h" +#include "mongo/util/functional.h" #include "mongo/util/future.h" #include "mongo/util/time_support.h" @@ -158,7 +159,7 @@ public: virtual void stop() = 0; virtual void drain() = 0; - using Task = stdx::function<void()>; + using Task = unique_function<void()>; enum ScheduleMode { kDispatch, kPost }; virtual void schedule(ScheduleMode mode, Task task) = 0; @@ -166,8 +167,8 @@ public: template <typename Callback> Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { auto pf = makePromiseFuture<FutureContinuationResult<Callback>>(); - schedule(kPost, [ cb = std::forward<Callback>(cb), sp = pf.promise.share() ]() mutable { - sp.setWith(cb); + schedule(kPost, [ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable { + p.setWith(cb); }); return std::move(pf.future); diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 800263a4485..c41c5d6be68 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -120,11 +120,11 @@ private: cancel(baton); auto pf = makePromiseFuture<void>(); - armTimer().getAsync([sp = pf.promise.share()](Status status) mutable { + armTimer().getAsync([p = std::move(pf.promise)](Status status) mutable { if (status.isOK()) { - sp.emplaceValue(); + p.emplaceValue(); } else { - sp.setError(status); + p.setError(status); } }); @@ -182,9 +182,9 @@ public: void schedule(ScheduleMode mode, Task task) override { if (mode == kDispatch) { - _ioContext.dispatch(std::move(task)); + asio::dispatch(_ioContext, std::move(task)); } else { - _ioContext.post(std::move(task)); + asio::post(_ioContext, std::move(task)); } } diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index 9860c968102..2cf2886afa0 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -76,8 +76,8 @@ void TaskExecutorProxy::signalEvent(const EventHandle& event) { } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::onEvent( - const EventHandle& event, const CallbackFn& work) { - return _executor->onEvent(event, work); + const EventHandle& event, CallbackFn work) { + return _executor->onEvent(event, std::move(work)); } void TaskExecutorProxy::waitForEvent(const EventHandle& event) { @@ -91,13 +91,13 @@ StatusWith<stdx::cv_status> TaskExecutorProxy::waitForEvent(OperationContext* op } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWork( - const CallbackFn& work) { - return _executor->scheduleWork(work); + CallbackFn work) { + return _executor->scheduleWork(std::move(work)); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWorkAt( - Date_t when, const CallbackFn& work) { - return _executor->scheduleWorkAt(when, work); + Date_t when, CallbackFn work) { + return _executor->scheduleWorkAt(when, std::move(work)); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand( diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index 2087ed1e6ad..e03e2a6955e 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -59,14 +59,13 @@ public: virtual Date_t now() override; virtual StatusWith<EventHandle> makeEvent() override; virtual void signalEvent(const EventHandle& event) override; - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, - const CallbackFn& work) override; + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; virtual void waitForEvent(const EventHandle& event) override; virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) override; - virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; virtual StatusWith<CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, diff --git a/src/mongo/util/background_thread_clock_source.cpp b/src/mongo/util/background_thread_clock_source.cpp index 4233b016477..89df9e7e970 100644 --- a/src/mongo/util/background_thread_clock_source.cpp +++ b/src/mongo/util/background_thread_clock_source.cpp @@ -66,8 +66,8 @@ Milliseconds BackgroundThreadClockSource::getPrecision() { return _granularity; } -Status BackgroundThreadClockSource::setAlarm(Date_t when, stdx::function<void()> action) { - return _clockSource->setAlarm(when, action); +Status BackgroundThreadClockSource::setAlarm(Date_t when, unique_function<void()> action) { + return _clockSource->setAlarm(when, std::move(action)); } Date_t BackgroundThreadClockSource::now() { diff --git a/src/mongo/util/background_thread_clock_source.h b/src/mongo/util/background_thread_clock_source.h index a251ba4f2ce..6bfb5fa8f92 100644 --- a/src/mongo/util/background_thread_clock_source.h +++ b/src/mongo/util/background_thread_clock_source.h @@ -58,7 +58,7 @@ public: ~BackgroundThreadClockSource() override; Milliseconds getPrecision() override; Date_t now() override; - Status setAlarm(Date_t when, stdx::function<void()> action) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; /** * Doesn't count as a call to now() for determining whether this ClockSource is idle. diff --git a/src/mongo/util/clock_source.h b/src/mongo/util/clock_source.h index e8fc2d06d4c..4043c166dbf 100644 --- a/src/mongo/util/clock_source.h +++ b/src/mongo/util/clock_source.h @@ -33,8 +33,8 @@ #include <type_traits> #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/functional.h" #include "mongo/util/time_support.h" namespace mongo { @@ -74,7 +74,7 @@ public: * Returns InternalError if this clock source does not implement setAlarm. May also * return ShutdownInProgress during shutdown. Other errors are also allowed. */ - virtual Status setAlarm(Date_t when, stdx::function<void()> action) { + virtual Status setAlarm(Date_t when, unique_function<void()> action) { return {ErrorCodes::InternalError, "This clock source does not implement setAlarm."}; } diff --git a/src/mongo/util/clock_source_mock.cpp b/src/mongo/util/clock_source_mock.cpp index ad4aa90802c..3a84e43932c 100644 --- a/src/mongo/util/clock_source_mock.cpp +++ b/src/mongo/util/clock_source_mock.cpp @@ -57,7 +57,7 @@ void ClockSourceMock::reset(Date_t newNow) { _processAlarms(std::move(lk)); } -Status ClockSourceMock::setAlarm(Date_t when, stdx::function<void()> action) { +Status ClockSourceMock::setAlarm(Date_t when, unique_function<void()> action) { stdx::unique_lock<stdx::mutex> lk(_mutex); if (when <= _now) { lk.unlock(); diff --git a/src/mongo/util/clock_source_mock.h b/src/mongo/util/clock_source_mock.h index a257775bb7c..ac220f912cd 100644 --- a/src/mongo/util/clock_source_mock.h +++ b/src/mongo/util/clock_source_mock.h @@ -54,7 +54,7 @@ public: Milliseconds getPrecision() override; Date_t now() override; - Status setAlarm(Date_t when, stdx::function<void()> action) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; /** * Advances the current time by the given value. @@ -67,7 +67,7 @@ public: void reset(Date_t newNow); private: - using Alarm = std::pair<Date_t, stdx::function<void()>>; + using Alarm = std::pair<Date_t, unique_function<void()>>; void _processAlarms(stdx::unique_lock<stdx::mutex> lk); stdx::mutex _mutex; @@ -106,7 +106,7 @@ public: return _source->now(); } - Status setAlarm(Date_t when, stdx::function<void()> action) override { + Status setAlarm(Date_t when, unique_function<void()> action) override { return _source->setAlarm(when, std::move(action)); } diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h index 6bca39c3aaa..e558a8cf701 100644 --- a/src/mongo/util/concurrency/thread_pool_interface.h +++ b/src/mongo/util/concurrency/thread_pool_interface.h @@ -31,7 +31,7 @@ #pragma once #include "mongo/base/disallow_copying.h" -#include "mongo/stdx/functional.h" +#include "mongo/util/functional.h" namespace mongo { @@ -44,7 +44,7 @@ class ThreadPoolInterface { MONGO_DISALLOW_COPYING(ThreadPoolInterface); public: - using Task = stdx::function<void()>; + using Task = unique_function<void()>; /** * Destroys a thread pool. diff --git a/src/mongo/util/functional.h b/src/mongo/util/functional.h index 31e57d0868a..975e974e762 100644 --- a/src/mongo/util/functional.h +++ b/src/mongo/util/functional.h @@ -153,7 +153,7 @@ private: template <typename Functor> static auto makeImpl(Functor&& functor) { struct SpecificImpl : Impl { - explicit SpecificImpl(Functor&& func) : f(std::move(func)) {} + explicit SpecificImpl(Functor&& func) : f(std::forward<Functor>(func)) {} RetType call(Args&&... args) override { return callRegularVoid(std::is_void<RetType>(), f, std::forward<Args>(args)...); @@ -162,7 +162,7 @@ private: std::decay_t<Functor> f; }; - return std::make_unique<SpecificImpl>(std::move(functor)); + return std::make_unique<SpecificImpl>(std::forward<Functor>(functor)); } std::unique_ptr<Impl> impl; diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index 855ea76416d..04d154502e3 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -52,9 +52,6 @@ namespace mongo { -template <typename T> -class SharedPromise; - namespace future_details { template <typename T> class Promise; @@ -482,10 +479,8 @@ using future_details::Future; * destroyed, a error will be set with ErrorCode::BrokenPromise. This should generally be considered * a programmer error, and should not be relied upon. We may make it debug-fatal in the future. * - * Only one thread can use a given Promise at a time. It is legal to have different threads setting - * the value/error and extracting the Future, but it is the user's responsibility to ensure that - * those calls are strictly synchronized. This is usually easiest to achieve by calling - * makePromiseFuture<T>() then passing a SharedPromise to the completing threads. + * Only one thread can use a given Promise at a time, but another thread may be using the associated + * Future object. * * If the result is ready when producing the Future, it is more efficient to use * makeReadyFutureWith() or Future<T>::makeReady() than to use a Promise<T>. @@ -569,18 +564,6 @@ public: }); } - /** - * Get a copyable SharedPromise that can be used to complete this Promise's Future. - * - * Callers are required to extract the Future before calling share() to prevent race conditions. - * Even with a SharedPromise, callers must ensure it is only completed at most once. Copyability - * is primarily to allow capturing lambdas to be put in std::functions which don't support - * move-only types. - * - * It is safe to destroy the original Promise as soon as this call returns. - */ - SharedPromise<T> share() noexcept; - static auto makePromiseFutureImpl() { struct PromiseAndFuture { Promise<T> promise{make_intrusive<SharedState<T>>()}; @@ -621,54 +604,6 @@ private: }; /** - * A SharedPromise is a copyable object that can be used to complete a Promise. - * - * All copies derived from the same call to Promise::share() will complete the same shared state. - * Callers must ensure that the shared state is only completed at most once. Copyability is - * primarily to allow capturing lambdas to be put in std::functions which don't support move-only - * types. If the final derived SharedPromise is destroyed without completion, the Promise will be - * broken. - * - * All methods behave the same as on the underlying Promise. - */ -template <typename T> -class SharedPromise { -public: - SharedPromise() = default; - - template <typename Func> - void setWith(Func&& func) noexcept { - _promise->setWith(std::forward<Func>(func)); - } - - void setFrom(Future<T>&& future) noexcept { - _promise->setFrom(std::move(future)); - } - - template <typename... Args> - void emplaceValue(Args&&... args) noexcept { - _promise->emplaceValue(std::forward<Args>(args)...); - } - - void setError(Status status) noexcept { - _promise->setError(std::move(status)); - } - -private: - // Only Promise<T> needs to be a friend, but MSVC2015 doesn't respect that friendship. - // TODO see if this is still needed on MSVC2017+ - template <typename T2> - friend class Promise; - - explicit SharedPromise(std::shared_ptr<Promise<T>>&& promise) : _promise(std::move(promise)) {} - - // TODO consider adding a SharedPromise refcount to SharedStateBase to avoid the extra - // allocation. The tricky part will be ensuring that BrokenPromise is set when the last copy is - // destroyed. - std::shared_ptr<Promise<T>> _promise; -}; - -/** * Future<T> is logically a possibly-deferred StatusWith<T> (or Status when T is void). * * As is usual for rvalue-qualified methods, you may call at most one of them on a given Future. @@ -1431,12 +1366,6 @@ inline Future<T> Promise<T>::getFuture() noexcept { } template <typename T> -inline SharedPromise<T> Promise<T>::share() noexcept { - invariant(_sharedState); - return SharedPromise<T>(std::make_shared<Promise<T>>(std::move(*this))); -} - -template <typename T> inline void Promise<T>::setFrom(Future<T>&& future) noexcept { setImpl([&](boost::intrusive_ptr<SharedState<T>>&& sharedState) { future.propagateResultTo(sharedState.get()); diff --git a/src/mongo/util/future_test_future_int.cpp b/src/mongo/util/future_test_future_int.cpp index aec0a556d87..92955d367fb 100644 --- a/src/mongo/util/future_test_future_int.cpp +++ b/src/mongo/util/future_test_future_int.cpp @@ -94,7 +94,7 @@ TEST(Future, Success_getAsync) { [] { return 1; }, [](Future<int>&& fut) { auto pf = makePromiseFuture<int>(); - std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable { + std::move(fut).getAsync([outside = std::move(pf.promise)](StatusWith<int> sw) mutable { ASSERT_OK(sw); outside.emplaceValue(sw.getValue()); }); @@ -132,7 +132,7 @@ TEST(Future, Fail_getNothrowRvalue) { TEST(Future, Fail_getAsync) { FUTURE_FAIL_TEST<int>([](Future<int>&& fut) { auto pf = makePromiseFuture<int>(); - std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable { + std::move(fut).getAsync([outside = std::move(pf.promise)](StatusWith<int> sw) mutable { ASSERT(!sw.isOK()); outside.setError(sw.getStatus()); }); diff --git a/src/mongo/util/future_test_future_move_only.cpp b/src/mongo/util/future_test_future_move_only.cpp index 75bda9505c8..b3d6dcc9730 100644 --- a/src/mongo/util/future_test_future_move_only.cpp +++ b/src/mongo/util/future_test_future_move_only.cpp @@ -110,16 +110,16 @@ TEST(Future_MoveOnly, Success_getNothrowRvalue) { } TEST(Future_MoveOnly, Success_getAsync) { - FUTURE_SUCCESS_TEST( - [] { return Widget(1); }, - [](Future<Widget>&& fut) { - auto pf = makePromiseFuture<Widget>(); - std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable { - ASSERT_OK(sw); - outside.emplaceValue(std::move(sw.getValue())); - }); - ASSERT_EQ(std::move(pf.future).get(), 1); - }); + FUTURE_SUCCESS_TEST([] { return Widget(1); }, + [](Future<Widget>&& fut) { + auto pf = makePromiseFuture<Widget>(); + std::move(fut).getAsync([outside = std::move(pf.promise)]( + StatusWith<Widget> sw) mutable { + ASSERT_OK(sw); + outside.emplaceValue(std::move(sw.getValue())); + }); + ASSERT_EQ(std::move(pf.future).get(), 1); + }); } TEST(Future_MoveOnly, Fail_getLvalue) { @@ -156,7 +156,7 @@ TEST(Future_MoveOnly, Fail_getNothrowRvalue) { TEST(Future_MoveOnly, Fail_getAsync) { FUTURE_FAIL_TEST<Widget>([](Future<Widget>&& fut) { auto pf = makePromiseFuture<Widget>(); - std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable { + std::move(fut).getAsync([outside = std::move(pf.promise)](StatusWith<Widget> sw) mutable { ASSERT(!sw.isOK()); outside.setError(sw.getStatus()); }); diff --git a/src/mongo/util/future_test_future_void.cpp b/src/mongo/util/future_test_future_void.cpp index 02ab0e73e76..4095706739f 100644 --- a/src/mongo/util/future_test_future_void.cpp +++ b/src/mongo/util/future_test_future_void.cpp @@ -73,7 +73,7 @@ TEST(Future_Void, Success_getAsync) { [] {}, [](Future<void>&& fut) { auto pf = makePromiseFuture<void>(); - std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable { + std::move(fut).getAsync([outside = std::move(pf.promise)](Status status) mutable { ASSERT_OK(status); outside.emplaceValue(); }); @@ -111,7 +111,7 @@ TEST(Future_Void, Fail_getNothrowRvalue) { TEST(Future_Void, Fail_getAsync) { FUTURE_FAIL_TEST<void>([](Future<void>&& fut) { auto pf = makePromiseFuture<void>(); - std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable { + std::move(fut).getAsync([outside = std::move(pf.promise)](Status status) mutable { ASSERT(!status.isOK()); outside.setError(status); }); diff --git a/src/mongo/util/keyed_executor.h b/src/mongo/util/keyed_executor.h index 093d3a30b7c..2f0938c907c 100644 --- a/src/mongo/util/keyed_executor.h +++ b/src/mongo/util/keyed_executor.h @@ -77,7 +77,7 @@ template <typename Key, typename... MapArgs> class KeyedExecutor { // We hold a deque per key. Each entry in the deque represents a task we'll eventually execute // and a list of callers who need to be notified after it completes. - using Deque = std::deque<std::vector<SharedPromise<void>>>; + using Deque = std::deque<std::vector<Promise<void>>>; using Map = stdx::unordered_map<Key, Deque, MapArgs...>; @@ -232,7 +232,7 @@ private: Future<void> _onCleared(WithLock, Deque& deque) { invariant(deque.size()); auto pf = makePromiseFuture<void>(); - deque.back().push_back(pf.promise.share()); + deque.back().push_back(std::move(pf.promise)); return std::move(pf.future); } diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp index e44bf934d7f..9332891b7e7 100644 --- a/src/mongo/util/keyed_executor_test.cpp +++ b/src/mongo/util/keyed_executor_test.cpp @@ -46,7 +46,7 @@ namespace { class MockExecutor : public OutOfLineExecutor { public: - void schedule(stdx::function<void()> func) override { + void schedule(unique_function<void()> func) override { _deque.push_front(std::move(func)); } @@ -72,7 +72,7 @@ public: } private: - std::deque<stdx::function<void()>> _deque; + std::deque<unique_function<void()>> _deque; }; class ThreadPoolExecutor : public OutOfLineExecutor { @@ -87,7 +87,7 @@ public: _threadPool.shutdown(); } - void schedule(stdx::function<void()> func) override { + void schedule(unique_function<void()> func) override { ASSERT_OK(_threadPool.schedule(std::move(func))); } diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h index 1f2b006c1b1..310b26cdd21 100644 --- a/src/mongo/util/out_of_line_executor.h +++ b/src/mongo/util/out_of_line_executor.h @@ -59,8 +59,8 @@ public: Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { auto pf = makePromiseFuture<FutureContinuationResult<Callback>>(); - schedule([ cb = std::forward<Callback>(cb), sp = pf.promise.share() ]() mutable { - sp.setWith(std::move(cb)); + schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable { + p.setWith(std::move(cb)); }); return std::move(pf.future); @@ -69,7 +69,7 @@ public: /** * Invokes the callback on the executor. This never happens immediately on the caller's stack. */ - virtual void schedule(stdx::function<void()> func) = 0; + virtual void schedule(unique_function<void()> func) = 0; protected: ~OutOfLineExecutor() noexcept {} |