diff options
Diffstat (limited to 'src/mongo/db/repl')
27 files changed, 113 insertions, 153 deletions
diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp index cf8941a05f3..9967dd8012a 100644 --- a/src/mongo/db/repl/abstract_async_component.cpp +++ b/src/mongo/db/repl/abstract_async_component.cpp @@ -166,7 +166,7 @@ Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock( } Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock( - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -175,7 +175,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock( str::stream() << "failed to schedule work " << name << ": " << _componentName << " is shutting down"); } - auto result = _executor->scheduleWork(work); + auto result = _executor->scheduleWork(std::move(work)); if (!result.isOK()) { return result.getStatus().withContext(str::stream() << "failed to schedule work " << name); } @@ -185,7 +185,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock( Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock( Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -196,7 +196,7 @@ Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock( << _componentName << " is shutting down"); } - auto result = _executor->scheduleWorkAt(when, work); + auto result = _executor->scheduleWorkAt(when, std::move(work)); if (!result.isOK()) { return result.getStatus().withContext( str::stream() << "failed to schedule work " << name << " at " << when.toString()); diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h index e7f14121c9a..63fa5db7e46 100644 --- a/src/mongo/db/repl/abstract_async_component.h +++ b/src/mongo/db/repl/abstract_async_component.h @@ -148,11 +148,11 @@ protected: * Saves handle if work was successfully scheduled. * Returns scheduleWork status (without the handle). */ - Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work, + Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp index 691c417ad2b..4dd1183b2b4 100644 --- a/src/mongo/db/repl/abstract_async_component_test.cpp +++ b/src/mongo/db/repl/abstract_async_component_test.cpp @@ -65,7 +65,7 @@ public: * Publicly visible versions of _scheduleWorkAndSaveHandle_inlock() and * _scheduleWorkAtAndSaveHandle_inlock() for testing. */ - Status scheduleWorkAndSaveHandle_forTest(const executor::TaskExecutor::CallbackFn& work, + Status scheduleWorkAndSaveHandle_forTest(executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); @@ -73,7 +73,7 @@ public: * Publicly visible version of _scheduleWorkAtAndSaveHandle_inlock() for testing. */ Status scheduleWorkAtAndSaveHandle_forTest(Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); @@ -122,20 +122,20 @@ Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest(const Status } Status MockAsyncComponent::scheduleWorkAndSaveHandle_forTest( - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _scheduleWorkAndSaveHandle_inlock(work, handle, name); + return _scheduleWorkAndSaveHandle_inlock(std::move(work), handle, name); } Status MockAsyncComponent::scheduleWorkAtAndSaveHandle_forTest( Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _scheduleWorkAtAndSaveHandle_inlock(when, work, handle, name); + return _scheduleWorkAtAndSaveHandle_inlock(when, std::move(work), handle, name); } void MockAsyncComponent::cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle) { diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 6876e88f5aa..6038ec6f544 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -109,7 +109,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, - const CallbackFn& onCompletion, + CallbackFn onCompletion, StorageInterface* storageInterface, const int batchSize) : _executor(executor), @@ -118,7 +118,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _sourceNss(sourceNss), _destNss(_sourceNss), _options(options), - _onCompletion(onCompletion), + _onCompletion(std::move(onCompletion)), _storageInterface(storageInterface), _countScheduler(_executor, RemoteCommandRequest( @@ -155,9 +155,10 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _indexSpecs(), _documentsToInsert(), _dbWorkTaskRunner(_dbWorkThreadPool), - _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) { - auto task = [ this, work ](OperationContext * opCtx, - const Status& status) noexcept->TaskRunner::NextAction { + _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) { + auto task = [ this, work = std::move(work) ]( + OperationContext * opCtx, + const Status& status) mutable noexcept->TaskRunner::NextAction { try { work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx)); } catch (...) { @@ -165,7 +166,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, } return TaskRunner::NextAction::kDisposeOperationContext; }; - _dbWorkTaskRunner.schedule(task); + _dbWorkTaskRunner.schedule(std::move(task)); return executor::TaskExecutor::CallbackHandle(); }), _createClientFn([] { return stdx::make_unique<DBClientConnection>(); }), @@ -184,7 +185,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, uassert(50953, "Missing collection UUID in CollectionCloner, collection name: " + sourceNss.ns(), _options.uuid); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface); uassert( 50954, "collectionClonerBatchSize must be non-negative.", _collectionClonerBatchSize >= 0); @@ -292,9 +293,9 @@ void CollectionCloner::waitForDbWorker() { _dbWorkTaskRunner.join(); } -void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn) { +void CollectionCloner::setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) { LockGuard lk(_mutex); - _scheduleDbWorkFn = scheduleDbWorkFn; + _scheduleDbWorkFn = std::move(scheduleDbWorkFn); } void CollectionCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index c7149970f5e..4ed39cc5512 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -93,8 +93,8 @@ public: * * Used for testing only. */ - using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( - const executor::TaskExecutor::CallbackFn&)>; + using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; /** * Type of function to create a database client @@ -117,7 +117,7 @@ public: const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, - const CallbackFn& onCompletion, + CallbackFn onCompletion, StorageInterface* storageInterface, const int batchSize); @@ -152,7 +152,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn); /** * Allows a different client class to be injected. diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 05c36c41cd6..c17cff0cc60 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -668,12 +668,13 @@ TEST_F(CollectionClonerTest, // status. auto exec = &getExecutor(); collectionCloner->setScheduleDbWorkFn_forTest([exec]( - const executor::TaskExecutor::CallbackFn& workFn) { - auto wrappedTask = [workFn](const executor::TaskExecutor::CallbackArgs& cbd) { + executor::TaskExecutor::CallbackFn workFn) { + auto wrappedTask = [workFn = std::move(workFn)]( + const executor::TaskExecutor::CallbackArgs& cbd) { workFn(executor::TaskExecutor::CallbackArgs( cbd.executor, cbd.myHandle, Status(ErrorCodes::CallbackCanceled, ""), cbd.opCtx)); }; - return exec->scheduleWork(wrappedTask); + return exec->scheduleWork(std::move(wrappedTask)); }); bool collectionCreated = false; @@ -1177,12 +1178,11 @@ TEST_F(CollectionClonerTest, // Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet. executor::TaskExecutor::CallbackFn insertDocumentsFn; - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - insertDocumentsFn = workFn; - executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); - return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); - }); + collectionCloner->setScheduleDbWorkFn_forTest([&](executor::TaskExecutor::CallbackFn workFn) { + insertDocumentsFn = std::move(workFn); + executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); + return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); + }); ASSERT_FALSE(insertDocumentsFn); // Return first batch of collection documents from remote server for the getMore request. diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index a4648bc8e9a..b9c1e23edea 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -116,7 +116,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, const ListCollectionsPredicateFn& listCollectionsPred, StorageInterface* si, const CollectionCallbackFn& collWork, - const CallbackFn& onCompletion) + CallbackFn onCompletion) : _executor(executor), _dbWorkThreadPool(dbWorkThreadPool), _source(source), @@ -128,7 +128,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, _listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred), _storageInterface(si), _collectionWork(collWork), - _onCompletion(onCompletion), + _onCompletion(std::move(onCompletion)), _listCollectionsFetcher(_executor, _source, _dbname, @@ -152,7 +152,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty()); uassert(ErrorCodes::BadValue, "storage interface cannot be null", si); uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); _stats.dbname = _dbname; } @@ -259,7 +259,7 @@ void DatabaseCloner::join() { _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } -void DatabaseCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) { +void DatabaseCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index 0a5b893b294..7b2cd152592 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -97,6 +97,9 @@ public: */ using StartCollectionClonerFn = stdx::function<Status(CollectionCloner&)>; + using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; + /** * Creates DatabaseCloner task in inactive state. Use start() to activate cloner. * @@ -116,7 +119,7 @@ public: const ListCollectionsPredicateFn& listCollectionsPredicate, StorageInterface* storageInterface, const CollectionCallbackFn& collectionWork, - const CallbackFn& onCompletion); + CallbackFn onCompletion); virtual ~DatabaseCloner(); @@ -146,7 +149,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); /** * Overrides how executor starts a collection cloner. @@ -234,7 +237,7 @@ private: std::vector<NamespaceString> _collectionNamespaces; // (M) std::list<CollectionCloner> _collectionCloners; // (M) std::list<CollectionCloner>::iterator _currentCollectionClonerIter; // (M) - CollectionCloner::ScheduleDbWorkFn + ScheduleDbWorkFn _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. StartCollectionClonerFn _startCollectionCloner; // (RT) Stats _stats; // (M) Stats about what this instance did. diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index 65a0e054b9a..83c428340a7 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -104,10 +104,9 @@ void DatabaseClonerTest::setUp() { storageInterface.get(), makeCollectionWorkClosure(), makeSetStatusClosure()); - _databaseCloner->setScheduleDbWorkFn_forTest( - [this](const executor::TaskExecutor::CallbackFn& work) { - return getExecutor().scheduleWork(work); - }); + _databaseCloner->setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) { + return getExecutor().scheduleWork(std::move(work)); + }); _mockServer = stdx::make_unique<MockRemoteDBServer>(target.toString()); _mockServer->assignCollectionUuid("db.a", *_options1.uuid); diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index 960d84fe960..ac3bc398059 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -227,7 +227,7 @@ Status DatabasesCloner::startup() noexcept { return Status::OK(); } -void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) { +void DatabasesCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; } diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h index 6af3b927229..970abc5da60 100644 --- a/src/mongo/db/repl/databases_cloner.h +++ b/src/mongo/db/repl/databases_cloner.h @@ -69,6 +69,8 @@ public: using IncludeDbFilterFn = stdx::function<bool(const BSONObj& dbInfo)>; using OnFinishFn = stdx::function<void(const Status&)>; using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn; + using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; DatabasesCloner(StorageInterface* si, executor::TaskExecutor* exec, @@ -98,7 +100,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); /** * Overrides how executor starts a collection cloner. @@ -178,10 +180,10 @@ private: mutable stdx::mutex _mutex; // (S) Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything. executor::TaskExecutor* _exec; // (R) executor to schedule things with - ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. - const HostAndPort _source; // (R) The source to use. - CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) - StartCollectionClonerFn _startCollectionClonerFn; // (M) + ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. + const HostAndPort _source; // (R) The source to use. + ScheduleDbWorkFn _scheduleDbWorkFn; // (M) + StartCollectionClonerFn _startCollectionClonerFn; // (M) const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned. OnFinishFn _finishFn; // (M) function called when finished. diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 47bec4f18ce..40a3b39a744 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -134,7 +134,7 @@ public: getNet()->runReadyNetworkOperations(); if (getNet()->hasReadyRequests()) { log() << "The network has unexpected requests to process, next req:"; - NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest(); log() << req.getDiagnosticString(); } ASSERT_FALSE(getNet()->hasReadyRequests()); @@ -301,8 +301,8 @@ protected: result = status; cvDone.notify_all(); }}; - cloner.setScheduleDbWorkFn_forTest([this](const executor::TaskExecutor::CallbackFn& work) { - return getExecutor().scheduleWork(work); + cloner.setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) { + return getExecutor().scheduleWork(std::move(work)); }); cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) { diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 6b68c13a7ea..28d3266ca82 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -378,7 +378,7 @@ BSONObj InitialSyncer::_getInitialSyncProgress_inlock() const { return bob.obj(); } -void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) { +void InitialSyncer::setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; } @@ -1431,7 +1431,7 @@ Status InitialSyncer::_checkForShutdownAndConvertStatus_inlock(const Status& sta } Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock( - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -1440,7 +1440,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock( str::stream() << "failed to schedule work " << name << ": initial syncer is shutting down"); } - auto result = _exec->scheduleWork(work); + auto result = _exec->scheduleWork(std::move(work)); if (!result.isOK()) { return result.getStatus().withContext(str::stream() << "failed to schedule work " << name); } @@ -1450,7 +1450,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock( Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock( Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name) { invariant(handle); @@ -1460,7 +1460,7 @@ Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock( << when.toString() << ": initial syncer is shutting down"); } - auto result = _exec->scheduleWorkAt(when, work); + auto result = _exec->scheduleWorkAt(when, std::move(work)); if (!result.isOK()) { return result.getStatus().withContext( str::stream() << "failed to schedule work " << name << " at " << when.toString()); diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 0ead52f6f46..929643116c5 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -222,7 +222,7 @@ public: * * For testing only. */ - void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + void setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& scheduleDbWorkFn); /** * Overrides how executor starts a collection cloner. @@ -540,11 +540,11 @@ private: * Saves handle if work was successfully scheduled. * Returns scheduleWork status (without the handle). */ - Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work, + Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when, - const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackFn work, executor::TaskExecutor::CallbackHandle* handle, const std::string& name); @@ -633,8 +633,8 @@ private: State _state = State::kPreStart; // (M) // Passed to CollectionCloner via DatabasesCloner. - CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) - StartCollectionClonerFn _startCollectionClonerFn; // (M) + DatabaseCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) + StartCollectionClonerFn _startCollectionClonerFn; // (M) // Contains stats on the current initial sync request (includes all attempts). // To access these stats in a user-readable format, use getInitialSyncProgress(). diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 6bc02d4d15a..620d9c5abff 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -215,7 +215,7 @@ public: getNet()->runReadyNetworkOperations(); if (getNet()->hasReadyRequests()) { log() << "The network has unexpected requests to process, next req:"; - NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest(); log() << req.getDiagnosticString(); } ASSERT_FALSE(getNet()->hasReadyRequests()); @@ -404,8 +404,8 @@ protected: _onCompletion(lastApplied); }); _initialSyncer->setScheduleDbWorkFn_forTest( - [this](const executor::TaskExecutor::CallbackFn& work) { - return getExecutor().scheduleWork(work); + [this](executor::TaskExecutor::CallbackFn work) { + return getExecutor().scheduleWork(std::move(work)); }); _initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) { cloner.setCreateClientFn_forTest([&cloner, this]() { diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp index 655426a5ce3..e568bb3f8ca 100644 --- a/src/mongo/db/repl/multiapplier.cpp +++ b/src/mongo/db/repl/multiapplier.cpp @@ -45,15 +45,15 @@ namespace repl { MultiApplier::MultiApplier(executor::TaskExecutor* executor, const Operations& operations, const MultiApplyFn& multiApply, - const CallbackFn& onCompletion) + CallbackFn onCompletion) : _executor(executor), _operations(operations), _multiApply(multiApply), - _onCompletion(onCompletion) { + _onCompletion(std::move(onCompletion)) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); uassert(ErrorCodes::BadValue, "multi apply function cannot be null", multiApply); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); } MultiApplier::~MultiApplier() { diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index 3b2a59f7fc8..79fec694b98 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -67,7 +67,7 @@ public: /** * Callback function to report final status of applying operations. */ - using CallbackFn = stdx::function<void(const Status&)>; + using CallbackFn = unique_function<void(const Status&)>; using MultiApplyFn = stdx::function<StatusWith<OpTime>(OperationContext*, MultiApplier::Operations)>; @@ -88,7 +88,7 @@ public: MultiApplier(executor::TaskExecutor* executor, const Operations& operations, const MultiApplyFn& multiApply, - const CallbackFn& onCompletion); + CallbackFn onCompletion); /** * Blocks while applier is active. diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 676d9c1f787..0eb44197c76 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -120,14 +120,14 @@ OplogBuffer* OplogApplier::getBuffer() const { Future<void> OplogApplier::startup() { auto pf = makePromiseFuture<void>(); auto callback = - [ this, promise = pf.promise.share() ](const CallbackArgs& args) mutable noexcept { + [ this, promise = std::move(pf.promise) ](const CallbackArgs& args) mutable noexcept { invariant(args.status); log() << "Starting oplog application"; _run(_oplogBuffer); log() << "Finished oplog application"; promise.setWith([] {}); }; - invariant(_executor->scheduleWork(callback).getStatus()); + invariant(_executor->scheduleWork(std::move(callback)).getStatus()); return std::move(pf.future); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 18c9ccac0a8..f5b1cbb55db 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -191,9 +191,9 @@ auto makeTaskExecutor(ServiceContext* service, const std::string& poolName) { * Schedules a task using the executor. This task is always run unless the task executor is shutting * down. */ -void scheduleWork(executor::TaskExecutor* executor, - const executor::TaskExecutor::CallbackFn& work) { - auto cbh = executor->scheduleWork([work](const executor::TaskExecutor::CallbackArgs& args) { +void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) { + auto cbh = executor->scheduleWork([work = std::move(work)]( + const executor::TaskExecutor::CallbackArgs& args) { if (args.status == ErrorCodes::CallbackCanceled) { return; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 824db78c888..e151e0b2482 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2726,7 +2726,7 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock() { } // Schedule timeout callback. auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout; - auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB); + auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, std::move(timeoutCB)); if (!status.isOK()) { log() << "Failed to schedule catchup timeout work."; abort_inlock(); @@ -3512,13 +3512,14 @@ void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() { } } -CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) { - auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) { - if (args.status == ErrorCodes::CallbackCanceled) { - return; - } - work(args); - }); +CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, CallbackFn work) { + auto cbh = + _replExecutor->scheduleWorkAt(when, [work = std::move(work)](const CallbackArgs& args) { + if (args.status == ErrorCodes::CallbackCanceled) { + return; + } + work(args); + }); if (cbh == ErrorCodes::ShutdownInProgress) { return {}; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index cff7179d1ce..233c0bf88e0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1034,7 +1034,7 @@ private: * All other non-shutdown scheduling failures will abort the process. * Does not run 'work' if callback is canceled. */ - CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work); + CallbackHandle _scheduleWorkAt(Date_t when, CallbackFn work); /** * Creates an event. diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index f11ef6c3d94..e5b7607390d 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -579,7 +579,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp, TaskExecutorWithFailureInScheduleWork(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWork( - const CallbackFn& work) override { + CallbackFn work) override { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } }; @@ -631,7 +631,7 @@ TEST_F(ReporterTest, FailingToScheduleTimeoutShouldMakeReporterInactive) { TaskExecutorWithFailureInScheduleWorkAt(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWorkAt( - Date_t when, const CallbackFn& work) override { + Date_t when, CallbackFn work) override { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } }; diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp index d3e14a7d8f1..727014a5cc2 100644 --- a/src/mongo/db/repl/task_executor_mock.cpp +++ b/src/mongo/db/repl/task_executor_mock.cpp @@ -38,25 +38,24 @@ namespace repl { TaskExecutorMock::TaskExecutorMock(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} -StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork( - const CallbackFn& work) { +StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork(CallbackFn work) { if (shouldFailScheduleWorkRequest()) { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } if (shouldDeferScheduleWorkRequestByOneSecond()) { auto when = now() + Seconds(1); - return getExecutor()->scheduleWorkAt(when, work); + return getExecutor()->scheduleWorkAt(when, std::move(work)); } - return getExecutor()->scheduleWork(work); + return getExecutor()->scheduleWork(std::move(work)); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWorkAt( - Date_t when, const CallbackFn& work) { + Date_t when, CallbackFn work) { if (shouldFailScheduleWorkAtRequest()) { return Status(ErrorCodes::OperationFailed, str::stream() << "failed to schedule work at " << when.toString()); } - return getExecutor()->scheduleWorkAt(when, work); + return getExecutor()->scheduleWorkAt(when, std::move(work)); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand( diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h index 41740b74433..e5e92e0f0e1 100644 --- a/src/mongo/db/repl/task_executor_mock.h +++ b/src/mongo/db/repl/task_executor_mock.h @@ -47,8 +47,8 @@ public: explicit TaskExecutorMock(executor::TaskExecutor* executor); - StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; StatusWith<CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 94bd65d1d8d..43f0b735789 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -102,12 +102,12 @@ bool TaskRunner::isActive() const { return _active; } -void TaskRunner::schedule(const Task& task) { +void TaskRunner::schedule(Task task) { invariant(task); stdx::lock_guard<stdx::mutex> lk(_mutex); - _tasks.push_back(task); + _tasks.push_back(std::move(task)); _condition.notify_all(); if (_active) { @@ -174,8 +174,8 @@ void TaskRunner::_runTasks() { tasks.swap(_tasks); lk.unlock(); // Cancel remaining tasks with a CallbackCanceled status. - for (auto task : tasks) { - runSingleTask(task, + for (auto&& task : tasks) { + runSingleTask(std::move(task), nullptr, Status(ErrorCodes::CallbackCanceled, "this task has been canceled by a previously invoked task")); @@ -207,46 +207,10 @@ TaskRunner::Task TaskRunner::_waitForNextTask() { return Task(); } - Task task = _tasks.front(); + Task task = std::move(_tasks.front()); _tasks.pop_front(); return task; } -Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextAction nextAction) { - // Setup cond_var for signaling when done. - bool done = false; - stdx::mutex mutex; - stdx::condition_variable waitTillDoneCond; - - Status returnStatus{Status::OK()}; - this->schedule([&](OperationContext* opCtx, const Status taskStatus) { - if (!taskStatus.isOK()) { - returnStatus = taskStatus; - } else { - // Run supplied function. - try { - returnStatus = func(opCtx); - } catch (...) { - returnStatus = exceptionToStatus(); - error() << "Exception thrown in runSynchronousTask: " << redact(returnStatus); - } - } - - // Signal done. - LockGuard lk2{mutex}; - done = true; - waitTillDoneCond.notify_all(); - - // return nextAction based on status from supplied function. - if (returnStatus.isOK()) { - return nextAction; - } - return TaskRunner::NextAction::kCancel; - }); - - UniqueLock lk{mutex}; - waitTillDoneCond.wait(lk, [&done] { return done; }); - return returnStatus; -} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h index 6875229bb9e..85313cc4ce9 100644 --- a/src/mongo/db/repl/task_runner.h +++ b/src/mongo/db/repl/task_runner.h @@ -38,6 +38,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/functional.h" namespace mongo { @@ -60,17 +61,7 @@ public: kCancel = 3, }; - using Task = stdx::function<NextAction(OperationContext*, const Status&)>; - using SynchronousTask = stdx::function<Status(OperationContext* opCtx)>; - - /** - * Returns the Status from the supplied function after running it.. - * - * Note: TaskRunner::NextAction controls when the operation context and thread will be released. - */ - Status runSynchronousTask( - SynchronousTask func, - TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext); + using Task = unique_function<NextAction(OperationContext*, const Status&)>; /** * Creates a Task returning kCancel. This is useful in shutting down the task runner after @@ -126,7 +117,7 @@ public: * immediately. This is usually the case when the task runner is canceled. Accessing the * operation context in the task will result in undefined behavior. */ - void schedule(const Task& task); + void schedule(Task task); /** * If there is a task that is already running, allows the task to run to completion. diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp index b01885ba332..17771287f90 100644 --- a/src/mongo/db/repl/task_runner_test.cpp +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -83,7 +83,7 @@ using OpIdVector = std::vector<unsigned int>; OpIdVector _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction, - stdx::function<void(const Task& task)> schedule) { + unique_function<void(Task task)> schedule) { unittest::Barrier barrier(2U); stdx::mutex mutex; std::vector<OperationContext*> txns; @@ -121,7 +121,7 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test, std::vector<unsigned int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) { - auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); }; + auto schedule = [&](Task task) { test.getTaskRunner().schedule(std::move(task)); }; return _testRunTaskTwice(test, nextAction, schedule); } @@ -134,9 +134,9 @@ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) { // Joining thread pool before scheduling second task ensures that task runner releases // thread back to pool after disposing of operation context. TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) { - auto schedule = [this](const Task& task) { + auto schedule = [this](Task task) { getThreadPool().waitForIdle(); - getTaskRunner().schedule(task); + getTaskRunner().schedule(std::move(task)); }; auto txnId = _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule); |