diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2017-12-06 14:40:59 -0500 |
---|---|---|
committer | Billy Donahue <billy.donahue@mongodb.com> | 2017-12-14 17:50:41 -0500 |
commit | 950fa6e6fd8f46248796dea3bc6c2392757b163d (patch) | |
tree | 2556f9322ee634477fc54b6876653b8ea702b8fa /src/mongo | |
parent | d2eedbeeedb61753c17b6a87912e4b14e7611b95 (diff) | |
download | mongo-950fa6e6fd8f46248796dea3bc6c2392757b163d.tar.gz |
SERVER-32070 migrate some easy stdx::bind to lambdas (pt3)
Diffstat (limited to 'src/mongo')
33 files changed, 433 insertions, 553 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index d8f70eeab87..97e3ccf4f21 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -123,7 +123,9 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, ReadPreferenceSetting::secondaryPreferredMetadata(), nullptr, RemoteCommandRequest::kNoTimeout), - stdx::bind(&CollectionCloner::_countCallback, this, stdx::placeholders::_1), + [this](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + return _countCallback(args); + }, RemoteCommandRetryScheduler::makeRetryPolicy( numInitialSyncCollectionCountAttempts.load(), executor::RemoteCommandRequest::kNoTimeout, @@ -133,11 +135,11 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _source, _sourceNss.db().toString(), makeCommandWithUUIDorCollectionName("listIndexes", _options.uuid, sourceNss), - stdx::bind(&CollectionCloner::_listIndexesCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3), + [this](const Fetcher::QueryResponseStatus& fetchResult, + Fetcher::NextAction * nextAction, + BSONObjBuilder * getMoreBob) { + _listIndexesCallback(fetchResult, nextAction, getMoreBob); + }, ReadPreferenceSetting::secondaryPreferredMetadata(), RemoteCommandRequest::kNoTimeout /* find network timeout */, RemoteCommandRequest::kNoTimeout /* getMore network timeout */, @@ -438,7 +440,7 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& // We have all of the indexes now, so we can start cloning the collection data. auto&& scheduleResult = _scheduleDbWorkFn( - stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1)); + [=](const executor::TaskExecutor::CallbackArgs& cbd) { _beginCollectionCallback(cbd); }); if (!scheduleResult.isOK()) { _finishCallback(scheduleResult.getStatus()); return; @@ -527,10 +529,9 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca ReadPreferenceSetting::secondaryPreferredMetadata(), opCtx, RemoteCommandRequest::kNoTimeout), - stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback, - this, - stdx::placeholders::_1, - cursorCommand), + [=](const RemoteCommandCallbackArgs& rcbd) { + _establishCollectionCursorsCallback(rcbd, cursorCommand); + }, RemoteCommandRetryScheduler::makeRetryPolicy( numInitialSyncCollectionFindAttempts.load(), executor::RemoteCommandRequest::kNoTimeout, @@ -723,11 +724,9 @@ Status CollectionCloner::_scheduleNextARMResultsCallback( } auto event = nextEvent.getValue(); auto handleARMResultsOnNextEvent = - _executor->onEvent(event, - stdx::bind(&CollectionCloner::_handleARMResultsCallback, - this, - stdx::placeholders::_1, - onCompletionGuard)); + _executor->onEvent(event, [=](const executor::TaskExecutor::CallbackArgs& cbd) { + _handleARMResultsCallback(cbd, onCompletionGuard); + }); return handleARMResultsOnNextEvent.getStatus(); } @@ -778,12 +777,9 @@ void CollectionCloner::_handleARMResultsCallback( } // Schedule the next document batch insertion. - auto&& scheduleResult = - _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback, - this, - stdx::placeholders::_1, - lastBatch, - onCompletionGuard)); + auto&& scheduleResult = _scheduleDbWorkFn([=](const executor::TaskExecutor::CallbackArgs& cbd) { + _insertDocumentsCallback(cbd, lastBatch, onCompletionGuard); + }); if (!scheduleResult.isOK()) { Status newStatus{scheduleResult.getStatus().code(), str::stream() << "While cloning collection '" << _sourceNss.ns() diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 385e14dd15f..7cdb4550002 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -125,11 +125,11 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, _source, _dbname, createListCollectionsCommandObject(_listCollectionsFilter), - stdx::bind(&DatabaseCloner::_listCollectionsCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3), + [=](const StatusWith<Fetcher::QueryResponse>& result, + Fetcher::NextAction * nextAction, + BSONObjBuilder * getMoreBob) { + _listCollectionsCallback(result, nextAction, getMoreBob); + }, ReadPreferenceSetting::secondaryPreferredMetadata(), RemoteCommandRequest::kNoTimeout /* find network timeout */, RemoteCommandRequest::kNoTimeout /* getMore network timeout */, @@ -403,8 +403,7 @@ void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryRes _source, nss, options, - stdx::bind( - &DatabaseCloner::_collectionClonerCallback, this, stdx::placeholders::_1, nss), + [=](const Status& status) { return _collectionClonerCallback(status, nss); }, _storageInterface, collectionClonerBatchSize, maxNumInitialSyncCollectionClonerCursors.load()); diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index d586221398c..f0330add560 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -59,36 +59,38 @@ struct CollectionCloneInfo { class DatabaseClonerTest : public BaseClonerTest { public: - void collectionWork(const Status& status, const NamespaceString& sourceNss); void clear() override; BaseCloner* getCloner() const override; protected: + auto makeCollectionWorkClosure() { + return [this](const Status& status, const NamespaceString& srcNss) { + _collections[srcNss].status = status; + }; + } + auto makeSetStatusClosure() { + return [this](const Status& status) { setStatus(status); }; + } + void setUp() override; void tearDown() override; std::map<NamespaceString, CollectionCloneInfo> _collections; std::unique_ptr<DatabaseCloner> _databaseCloner; }; -void DatabaseClonerTest::collectionWork(const Status& status, const NamespaceString& srcNss) { - _collections[srcNss].status = status; -} void DatabaseClonerTest::setUp() { BaseClonerTest::setUp(); - _databaseCloner.reset(new DatabaseCloner( - &getExecutor(), - dbWorkThreadPool.get(), - target, - dbname, - BSONObj(), - DatabaseCloner::ListCollectionsPredicateFn(), - storageInterface.get(), - stdx::bind(&DatabaseClonerTest::collectionWork, - this, - stdx::placeholders::_1, - stdx::placeholders::_2), - stdx::bind(&DatabaseClonerTest::setStatus, this, stdx::placeholders::_1))); + _databaseCloner = + stdx::make_unique<DatabaseCloner>(&getExecutor(), + dbWorkThreadPool.get(), + target, + dbname, + BSONObj(), + DatabaseCloner::ListCollectionsPredicateFn(), + storageInterface.get(), + makeCollectionWorkClosure(), + makeSetStatusClosure()); _databaseCloner->setScheduleDbWorkFn_forTest( [this](const executor::TaskExecutor::CallbackFn& work) { return getExecutor().scheduleWork(work); @@ -127,11 +129,8 @@ TEST_F(DatabaseClonerTest, InvalidConstruction) { const BSONObj filter; DatabaseCloner::ListCollectionsPredicateFn pred; StorageInterface* si = storageInterface.get(); - namespace stdxph = stdx::placeholders; - const DatabaseCloner::CollectionCallbackFn ccb = - stdx::bind(&DatabaseClonerTest::collectionWork, this, stdxph::_1, stdxph::_2); - - const auto& cb = [](const Status&) { FAIL("should not reach here"); }; + auto ccb = makeCollectionWorkClosure(); + auto cb = [](const Status&) { FAIL("should not reach here"); }; // Null executor -- error from Fetcher, not _databaseCloner. ASSERT_THROWS_CODE_AND_WHAT( @@ -266,19 +265,16 @@ TEST_F(DatabaseClonerTest, FirstRemoteCommandWithoutFilter) { TEST_F(DatabaseClonerTest, FirstRemoteCommandWithFilter) { const BSONObj listCollectionsFilter = BSON("name" << "coll"); - _databaseCloner.reset(new DatabaseCloner( - &getExecutor(), - dbWorkThreadPool.get(), - target, - dbname, - listCollectionsFilter, - DatabaseCloner::ListCollectionsPredicateFn(), - storageInterface.get(), - stdx::bind(&DatabaseClonerTest::collectionWork, - this, - stdx::placeholders::_1, - stdx::placeholders::_2), - stdx::bind(&DatabaseClonerTest::setStatus, this, stdx::placeholders::_1))); + _databaseCloner = + stdx::make_unique<DatabaseCloner>(&getExecutor(), + dbWorkThreadPool.get(), + target, + dbname, + listCollectionsFilter, + DatabaseCloner::ListCollectionsPredicateFn(), + storageInterface.get(), + makeCollectionWorkClosure(), + makeSetStatusClosure()); ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest()); ASSERT_OK(_databaseCloner->startup()); @@ -351,19 +347,15 @@ TEST_F(DatabaseClonerTest, ListCollectionsPredicate) { DatabaseCloner::ListCollectionsPredicateFn pred = [](const BSONObj& info) { return info["name"].String() != "b"; }; - _databaseCloner.reset(new DatabaseCloner( - &getExecutor(), - dbWorkThreadPool.get(), - target, - dbname, - BSONObj(), - pred, - storageInterface.get(), - stdx::bind(&DatabaseClonerTest::collectionWork, - this, - stdx::placeholders::_1, - stdx::placeholders::_2), - stdx::bind(&DatabaseClonerTest::setStatus, this, stdx::placeholders::_1))); + _databaseCloner = stdx::make_unique<DatabaseCloner>(&getExecutor(), + dbWorkThreadPool.get(), + target, + dbname, + BSONObj(), + pred, + storageInterface.get(), + makeCollectionWorkClosure(), + makeSetStatusClosure()); ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest()); ASSERT_OK(_databaseCloner->startup()); @@ -612,19 +604,16 @@ TEST_F(DatabaseClonerTest, DatabaseClonerResendsListCollectionsRequestOnRetriabl } TEST_F(DatabaseClonerTest, ListCollectionsReturnsEmptyCollectionName) { - _databaseCloner.reset(new DatabaseCloner( - &getExecutor(), - dbWorkThreadPool.get(), - target, - dbname, - BSONObj(), - DatabaseCloner::ListCollectionsPredicateFn(), - storageInterface.get(), - stdx::bind(&DatabaseClonerTest::collectionWork, - this, - stdx::placeholders::_1, - stdx::placeholders::_2), - stdx::bind(&DatabaseClonerTest::setStatus, this, stdx::placeholders::_1))); + _databaseCloner = + stdx::make_unique<DatabaseCloner>(&getExecutor(), + dbWorkThreadPool.get(), + target, + dbname, + BSONObj(), + DatabaseCloner::ListCollectionsPredicateFn(), + storageInterface.get(), + makeCollectionWorkClosure(), + makeSetStatusClosure()); ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest()); ASSERT_OK(_databaseCloner->startup()); diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index a327208c172..ae405125fbb 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -121,15 +121,10 @@ void ElectCmdRunnerTest::startTest(ElectCmdRunner* electCmdRunner, int selfIndex, const std::vector<HostAndPort>& hosts) { StatusWith<executor::TaskExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set"); - StatusWith<executor::TaskExecutor::CallbackHandle> cbh = - getExecutor().scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, - this, - stdx::placeholders::_1, - electCmdRunner, - &evh, - currentConfig, - selfIndex, - hosts)); + StatusWith<executor::TaskExecutor::CallbackHandle> cbh = getExecutor().scheduleWork([&]( + const executor::TaskExecutor::CallbackArgs& data) { + return electCmdRunnerRunner(data, electCmdRunner, &evh, currentConfig, selfIndex, hosts); + }); ASSERT_OK(cbh.getStatus()); getExecutor().wait(cbh.getValue()); ASSERT_OK(evh.getStatus()); @@ -213,14 +208,9 @@ TEST_F(ElectCmdRunnerTest, ShuttingDown) { ElectCmdRunner electCmdRunner; StatusWith<executor::TaskExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set"); StatusWith<executor::TaskExecutor::CallbackHandle> cbh = - getExecutor().scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, - this, - stdx::placeholders::_1, - &electCmdRunner, - &evh, - config, - 0, - hosts)); + getExecutor().scheduleWork([&](const executor::TaskExecutor::CallbackArgs& data) { + return electCmdRunnerRunner(data, &electCmdRunner, &evh, config, 0, hosts); + }); ASSERT_OK(cbh.getStatus()); getExecutor().wait(cbh.getValue()); ASSERT_OK(evh.getStatus()); diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index f0b86dbd0af..09dc5c8767c 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -66,17 +66,13 @@ protected: FreshnessChecker::ElectionAbortReason shouldAbortElection() const; int64_t countLogLinesContaining(const std::string& needle) { - return std::count_if(getCapturedLogMessages().begin(), - getCapturedLogMessages().end(), - stdx::bind(stringContains, stdx::placeholders::_1, needle)); + const auto& messages = getCapturedLogMessages(); + return std::count_if(messages.begin(), messages.end(), [&](const std::string& x) { + return stringContains(x, needle); + }); } private: - void freshnessCheckerRunner(const executor::TaskExecutor::CallbackArgs& data, - const Timestamp& lastOpTimeApplied, - const ReplSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& hosts); void setUp(); FreshnessChecker _checker; @@ -119,29 +115,18 @@ const BSONObj makeFreshRequest(const ReplSetConfig& rsConfig, // This is necessary because the run method must be scheduled in the executor // for correct concurrency operation. -void FreshnessCheckerTest::freshnessCheckerRunner(const executor::TaskExecutor::CallbackArgs& data, - const Timestamp& lastOpTimeApplied, - const ReplSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& hosts) { - invariant(data.status.isOK()); - StatusWith<executor::TaskExecutor::EventHandle> evh = - _checker.start(data.executor, lastOpTimeApplied, currentConfig, selfIndex, hosts); - _checkerDoneEvent = assertGet(evh); -} void FreshnessCheckerTest::startTest(const Timestamp& lastOpTimeApplied, const ReplSetConfig& currentConfig, int selfIndex, const std::vector<HostAndPort>& hosts) { - getExecutor().wait(assertGet( - getExecutor().scheduleWork(stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - lastOpTimeApplied, - currentConfig, - selfIndex, - hosts)))); + auto runner = [&](const executor::TaskExecutor::CallbackArgs& data) { + invariant(data.status.isOK()); + StatusWith<executor::TaskExecutor::EventHandle> evh = + _checker.start(data.executor, lastOpTimeApplied, currentConfig, selfIndex, hosts); + _checkerDoneEvent = assertGet(evh); + }; + getExecutor().wait(assertGet(getExecutor().scheduleWork(std::move(runner)))); } TEST_F(FreshnessCheckerTest, TwoNodes) { diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 4b1c882b1eb..ccd39ca0bf3 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -227,11 +227,9 @@ Status InitialSyncer::startup(OperationContext* opCtx, // Start first initial sync attempt. std::uint32_t initialSyncAttempt = 0; auto status = _scheduleWorkAndSaveHandle_inlock( - stdx::bind(&InitialSyncer::_startInitialSyncAttemptCallback, - this, - stdx::placeholders::_1, - initialSyncAttempt, - initialSyncMaxAttempts), + [=](const executor::TaskExecutor::CallbackArgs& args) { + _startInitialSyncAttemptCallback(args, initialSyncAttempt, initialSyncMaxAttempts); + }, &_startInitialSyncAttemptHandle, str::stream() << "_startInitialSyncAttemptCallback-" << initialSyncAttempt); @@ -443,15 +441,13 @@ void InitialSyncer::_startInitialSyncAttemptCallback( static_cast<std::uint32_t>(numInitialSyncConnectAttempts.load()); // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware. - status = _scheduleWorkAndSaveHandle_inlock(stdx::bind(&InitialSyncer::_chooseSyncSourceCallback, - this, - stdx::placeholders::_1, - chooseSyncSourceAttempt, - chooseSyncSourceMaxAttempts, - onCompletionGuard), - &_chooseSyncSourceHandle, - str::stream() << "_chooseSyncSourceCallback-" - << chooseSyncSourceAttempt); + status = _scheduleWorkAndSaveHandle_inlock( + [=](const executor::TaskExecutor::CallbackArgs& args) { + _chooseSyncSourceCallback( + args, chooseSyncSourceAttempt, chooseSyncSourceMaxAttempts, onCompletionGuard); + }, + &_chooseSyncSourceHandle, + str::stream() << "_chooseSyncSourceCallback-" << chooseSyncSourceAttempt); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -497,12 +493,12 @@ void InitialSyncer::_chooseSyncSourceCallback( << (chooseSyncSourceAttempt + 1) << " of " << numInitialSyncConnectAttempts.load(); auto status = _scheduleWorkAtAndSaveHandle_inlock( when, - stdx::bind(&InitialSyncer::_chooseSyncSourceCallback, - this, - stdx::placeholders::_1, - chooseSyncSourceAttempt + 1, - chooseSyncSourceMaxAttempts, - onCompletionGuard), + [=](const executor::TaskExecutor::CallbackArgs& args) { + _chooseSyncSourceCallback(args, + chooseSyncSourceAttempt + 1, + chooseSyncSourceMaxAttempts, + onCompletionGuard); + }, &_chooseSyncSourceHandle, str::stream() << "_chooseSyncSourceCallback-" << (chooseSyncSourceAttempt + 1)); if (!status.isOK()) { @@ -524,11 +520,9 @@ void InitialSyncer::_chooseSyncSourceCallback( // Schedule rollback ID checker. _syncSource = syncSource.getValue(); _rollbackChecker = stdx::make_unique<RollbackChecker>(_exec, _syncSource); - auto scheduleResult = - _rollbackChecker->reset(stdx::bind(&InitialSyncer::_rollbackCheckerResetCallback, - this, - stdx::placeholders::_1, - onCompletionGuard)); + auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) { + return _rollbackCheckerResetCallback(result, onCompletionGuard); + }); status = scheduleResult.getStatus(); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); @@ -575,10 +569,11 @@ void InitialSyncer::_rollbackCheckerResetCallback( } status = _scheduleLastOplogEntryFetcher_inlock( - stdx::bind(&InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp, - this, - stdx::placeholders::_1, - onCompletionGuard)); + [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + _lastOplogEntryFetcherCallbackForBeginTimestamp(response, onCompletionGuard); + }); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -616,11 +611,11 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp( _syncSource, nsToDatabaseSubstring(FeatureCompatibilityVersion::kCollection).toString(), queryBob.obj(), - stdx::bind(&InitialSyncer::_fcvFetcherCallback, - this, - stdx::placeholders::_1, - onCompletionGuard, - lastOpTimeWithHash), + [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + _fcvFetcherCallback(response, onCompletionGuard, lastOpTimeWithHash); + }, ReadPreferenceSetting::secondaryPreferredMetadata(), RemoteCommandRequest::kNoTimeout /* find network timeout */, RemoteCommandRequest::kNoTimeout /* getMore network timeout */, @@ -699,16 +694,13 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> } return (name != "local"); }; - _initialSyncState = stdx::make_unique<InitialSyncState>( - stdx::make_unique<DatabasesCloner>(_storage, - _exec, - _dataReplicatorExternalState->getDbWorkThreadPool(), - _syncSource, - listDatabasesFilter, - stdx::bind(&InitialSyncer::_databasesClonerCallback, - this, - stdx::placeholders::_1, - onCompletionGuard))); + _initialSyncState = stdx::make_unique<InitialSyncState>(stdx::make_unique<DatabasesCloner>( + _storage, + _exec, + _dataReplicatorExternalState->getDbWorkThreadPool(), + _syncSource, + listDatabasesFilter, + [=](const Status& status) { _databasesClonerCallback(status, onCompletionGuard); })); _initialSyncState->beginTimestamp = lastOpTimeWithHash.opTime.getTimestamp(); @@ -737,13 +729,12 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> _rollbackChecker->getBaseRBID(), false /* requireFresherSyncSource */, _dataReplicatorExternalState.get(), - stdx::bind(&InitialSyncer::_enqueueDocuments, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3), - stdx::bind( - &InitialSyncer::_oplogFetcherCallback, this, stdx::placeholders::_1, onCompletionGuard), + [=](Fetcher::Documents::const_iterator first, + Fetcher::Documents::const_iterator last, + const OplogFetcher::DocumentsInfo& info) { + return _enqueueDocuments(first, last, info); + }, + [=](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); }, initialSyncOplogFetcherBatchSize); LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString(); @@ -835,10 +826,11 @@ void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishS } status = _scheduleLastOplogEntryFetcher_inlock( - stdx::bind(&InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp, - this, - stdx::placeholders::_1, - onCompletionGuard)); + [=](const StatusWith<mongo::Fetcher::QueryResponse>& status, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + _lastOplogEntryFetcherCallbackForStopTimestamp(status, onCompletionGuard); + }); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -930,20 +922,16 @@ void InitialSyncer::_getNextApplierBatchCallback( const auto& ops = batchResult.getValue(); if (!ops.empty()) { _fetchCount.store(0); - // "_syncSource" has to be copied to stdx::bind result. - HostAndPort source = _syncSource; - auto applyOperationsForEachReplicationWorkerThreadFn = - stdx::bind(&DataReplicatorExternalState::_multiInitialSyncApply, - _dataReplicatorExternalState.get(), - stdx::placeholders::_1, - source, - &_fetchCount); - auto applyBatchOfOperationsFn = stdx::bind(&DataReplicatorExternalState::_multiApply, - _dataReplicatorExternalState.get(), - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3); - + MultiApplier::ApplyOperationFn applyOperationsForEachReplicationWorkerThreadFn = + [ =, source = _syncSource ](MultiApplier::OperationPtrs * x) { + return _dataReplicatorExternalState->_multiInitialSyncApply(x, source, &_fetchCount); + }; + MultiApplier::MultiApplyFn applyBatchOfOperationsFn = + [=](OperationContext* opCtx, + MultiApplier::Operations ops, + MultiApplier::ApplyOperationFn apply) { + return _dataReplicatorExternalState->_multiApply(opCtx, ops, apply); + }; const auto lastEntry = ops.back().raw; const auto opTimeWithHashStatus = AbstractOplogFetcher::parseOpTimeWithHash(lastEntry); status = opTimeWithHashStatus.getStatus(); @@ -954,16 +942,16 @@ void InitialSyncer::_getNextApplierBatchCallback( auto lastApplied = opTimeWithHashStatus.getValue(); auto numApplied = ops.size(); - _applier = stdx::make_unique<MultiApplier>(_exec, - ops, - applyOperationsForEachReplicationWorkerThreadFn, - applyBatchOfOperationsFn, - stdx::bind(&InitialSyncer::_multiApplierCallback, - this, - stdx::placeholders::_1, - lastApplied, - numApplied, - onCompletionGuard)); + MultiApplier::CallbackFn onCompletionFn = [=](const Status& s) { + return _multiApplierCallback(s, lastApplied, numApplied, onCompletionGuard); + }; + + _applier = stdx::make_unique<MultiApplier>( + _exec, + ops, + std::move(applyOperationsForEachReplicationWorkerThreadFn), + std::move(applyBatchOfOperationsFn), + std::move(onCompletionFn)); status = _startupComponent_inlock(_applier); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); @@ -991,14 +979,11 @@ void InitialSyncer::_getNextApplierBatchCallback( // the sync source, we'll check the oplog buffer again in // '_opts.getApplierBatchCallbackRetryWait' ms. auto when = _exec->now() + _opts.getApplierBatchCallbackRetryWait; - status = - _scheduleWorkAtAndSaveHandle_inlock(when, - stdx::bind(&InitialSyncer::_getNextApplierBatchCallback, - this, - stdx::placeholders::_1, - onCompletionGuard), - &_getNextApplierBatchHandle, - "_getNextApplierBatchCallback"); + status = _scheduleWorkAtAndSaveHandle_inlock( + when, + [=](const CallbackArgs& args) { _getNextApplierBatchCallback(args, onCompletionGuard); }, + &_getNextApplierBatchHandle, + "_getNextApplierBatchCallback"); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -1028,10 +1013,12 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, _initialSyncState->fetchedMissingDocs += fetchCount; _fetchCount.store(0); status = _scheduleLastOplogEntryFetcher_inlock( - stdx::bind(&InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments, - this, - stdx::placeholders::_1, - onCompletionGuard)); + [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + return _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( + response, onCompletionGuard); + }); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -1138,8 +1125,8 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& // declare the scope guard before the lock guard. auto result = lastApplied; auto finishCallbackGuard = MakeGuard([this, &result] { - auto scheduleResult = - _exec->scheduleWork(stdx::bind(&InitialSyncer::_finishCallback, this, result)); + auto scheduleResult = _exec->scheduleWork( + [=](const mongo::executor::TaskExecutor::CallbackArgs&) { _finishCallback(result); }); if (!scheduleResult.isOK()) { warning() << "Unable to schedule initial syncer completion task due to " << redact(scheduleResult.getStatus()) @@ -1189,11 +1176,10 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& auto when = _exec->now() + _opts.initialSyncRetryWait; auto status = _scheduleWorkAtAndSaveHandle_inlock( when, - stdx::bind(&InitialSyncer::_startInitialSyncAttemptCallback, - this, - stdx::placeholders::_1, - _stats.failedInitialSyncAttempts, - _stats.maxFailedInitialSyncAttempts), + [=](const executor::TaskExecutor::CallbackArgs& args) { + _startInitialSyncAttemptCallback( + args, _stats.failedInitialSyncAttempts, _stats.maxFailedInitialSyncAttempts); + }, &_startInitialSyncAttemptHandle, str::stream() << "_startInitialSyncAttemptCallback-" << _stats.failedInitialSyncAttempts); @@ -1321,13 +1307,12 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( // Get another batch to apply. // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware. - auto status = - _scheduleWorkAndSaveHandle_inlock(stdx::bind(&InitialSyncer::_getNextApplierBatchCallback, - this, - stdx::placeholders::_1, - onCompletionGuard), - &_getNextApplierBatchHandle, - "_getNextApplierBatchCallback"); + auto status = _scheduleWorkAndSaveHandle_inlock( + [=](const executor::TaskExecutor::CallbackArgs& args) { + return _getNextApplierBatchCallback(args, onCompletionGuard); + }, + &_getNextApplierBatchHandle, + "_getNextApplierBatchCallback"); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -1349,11 +1334,10 @@ void InitialSyncer::_scheduleRollbackCheckerCheckForRollback_inlock( return; } - auto scheduleResult = _rollbackChecker->checkForRollback( - stdx::bind(&InitialSyncer::_rollbackCheckerCheckForRollbackCallback, - this, - stdx::placeholders::_1, - onCompletionGuard)); + auto scheduleResult = + _rollbackChecker->checkForRollback([=](const RollbackChecker::Result& result) { + _rollbackCheckerCheckForRollbackCallback(result, onCompletionGuard); + }); auto status = scheduleResult.getStatus(); if (!status.isOK()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 3dd9826e018..0f01a1aad4d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -503,12 +503,9 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) // that the server's networking layer be up and running and accepting connections, which // doesn't happen until startReplication finishes. auto handle = - _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig, - this, - stdx::placeholders::_1, - localConfig, - lastOpTimeStatus, - lastVote)); + _replExecutor->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& args) { + _finishLoadLocalConfig(args, localConfig, lastOpTimeStatus, lastVote); + }); if (handle == ErrorCodes::ShutdownInProgress) { handle = CallbackHandle{}; } @@ -1782,9 +1779,9 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, onExitGuard.Dismiss(); updateMemberState(); // Schedule work to (potentially) step back up once the stepdown period has ended. - _scheduleWorkAt( - stepDownUntil, - stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, this, stdx::placeholders::_1)); + _scheduleWorkAt(stepDownUntil, [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _handleTimePassing(cbData); + }); return Status::OK(); } @@ -2246,10 +2243,8 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt } _setConfigState_inlock(kConfigReconfiguring); - ScopeGuard configStateGuard = MakeGuard( - lockAndCall, - &lk, - stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigSteady)); + ScopeGuard configStateGuard = + MakeGuard(lockAndCall, &lk, [=] { _setConfigState_inlock(kConfigSteady); }); ReplSetConfig oldConfig = _rsConfig; lk.unlock(); @@ -2303,14 +2298,10 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt } auto reconfigFinished = uassertStatusOK(_replExecutor->makeEvent()); - uassertStatusOK( - _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, - this, - stdx::placeholders::_1, - newConfig, - args.force, - myIndex.getValue(), - reconfigFinished))); + uassertStatusOK(_replExecutor->scheduleWork([ =, f = args.force, v = myIndex.getValue() ]( + const executor::TaskExecutor::CallbackArgs& cbData) { + _finishReplSetReconfig(cbData, newConfig, f, v, reconfigFinished); + })); configStateGuard.Dismiss(); _replExecutor->waitForEvent(reconfigFinished); return Status::OK(); @@ -2344,13 +2335,10 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig( // Wait for the election to complete and the node's Role to be set to follower. _replExecutor ->onEvent(electionFinishedEvent, - stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, - this, - stdx::placeholders::_1, - newConfig, - isForceReconfig, - myIndex, - finishedEvent)) + [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _finishReplSetReconfig( + cbData, newConfig, isForceReconfig, myIndex, finishedEvent); + }) .status_with_transitional_ignore(); return; } @@ -2390,11 +2378,8 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt invariant(!_rsConfig.isInitialized()); _setConfigState_inlock(kConfigInitiating); - ScopeGuard configStateGuard = MakeGuard( - lockAndCall, - &lk, - stdx::bind( - &ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigUninitialized)); + ScopeGuard configStateGuard = + MakeGuard(lockAndCall, &lk, [=] { _setConfigState_inlock(kConfigUninitialized); }); lk.unlock(); ReplSetConfig newConfig; @@ -3079,11 +3064,9 @@ void ReplicationCoordinatorImpl::_unblacklistSyncSource( void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { stdx::lock_guard<stdx::mutex> lock(_mutex); _topCoord->blacklistSyncSource(host, until); - _scheduleWorkAt(until, - stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource, - this, - stdx::placeholders::_1, - host)); + _scheduleWorkAt(until, [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _unblacklistSyncSource(cbData, host); + }); } void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx, diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index b542f445f90..8ddb88cb61d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -112,10 +112,9 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::Call const RemoteCommandRequest request( target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout); const executor::TaskExecutor::RemoteCommandCallbackFn callback = - stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse, - this, - stdx::placeholders::_1, - targetIndex); + [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { + return _handleHeartbeatResponse(cbData, targetIndex); + }; LOG_FOR_HEARTBEATS(2) << "Sending heartbeat (requestId: " << request.id << ") to " << target << ", " << heartbeatObj; @@ -127,13 +126,10 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd Date_t when) { LOG_FOR_HEARTBEATS(2) << "Scheduling heartbeat to " << target << " at " << dateToISOStringUTC(when); - _trackHeartbeatHandle_inlock( - _replExecutor->scheduleWorkAt(when, - stdx::bind(&ReplicationCoordinatorImpl::_doMemberHeartbeat, - this, - stdx::placeholders::_1, - target, - targetIndex))); + _trackHeartbeatHandle_inlock(_replExecutor->scheduleWorkAt( + when, [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _doMemberHeartbeat(cbData, target, targetIndex); + })); } void ReplicationCoordinatorImpl::_handleHeartbeatResponse( @@ -297,10 +293,10 @@ stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatRespo _priorityTakeoverWhen = _replExecutor->now() + priorityTakeoverDelay + randomOffset; log() << "Scheduling priority takeover at " << _priorityTakeoverWhen; _priorityTakeoverCbh = _scheduleWorkAt( - _priorityTakeoverWhen, - stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, - this, - TopologyCoordinator::StartElectionReason::kPriorityTakeover)); + _priorityTakeoverWhen, [=](const mongo::executor::TaskExecutor::CallbackArgs&) { + _startElectSelfIfEligibleV1( + TopologyCoordinator::StartElectionReason::kPriorityTakeover); + }); } break; } @@ -311,10 +307,10 @@ stdx::unique_lock<stdx::mutex> ReplicationCoordinatorImpl::_handleHeartbeatRespo _catchupTakeoverWhen = _replExecutor->now() + catchupTakeoverDelay; log() << "Scheduling catchup takeover at " << _catchupTakeoverWhen; _catchupTakeoverCbh = _scheduleWorkAt( - _catchupTakeoverWhen, - stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, - this, - TopologyCoordinator::StartElectionReason::kCatchupTakeover)); + _catchupTakeoverWhen, [=](const mongo::executor::TaskExecutor::CallbackArgs&) { + _startElectSelfIfEligibleV1( + TopologyCoordinator::StartElectionReason::kCatchupTakeover); + }); } break; } @@ -366,10 +362,9 @@ executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() } _replExecutor - ->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish, - this, - stdx::placeholders::_1, - finishEvent)) + ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) { + _stepDownFinish(cbData, finishEvent); + }) .status_with_transitional_ignore(); return finishEvent; } @@ -448,18 +443,16 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSet _replExecutor ->onEvent(electionFinishedEvent, - stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, - this, - stdx::placeholders::_1, - newConfig)) + [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _heartbeatReconfigStore(cbData, newConfig); + }) .status_with_transitional_ignore(); return; } _replExecutor - ->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, - this, - stdx::placeholders::_1, - newConfig)) + ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) { + _heartbeatReconfigStore(cbData, newConfig); + }) .status_with_transitional_ignore(); } @@ -553,11 +546,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( "_heartbeatReconfigFinish until fail point is disabled."; _replExecutor ->scheduleWorkAt(_replExecutor->now() + Milliseconds{10}, - stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, - this, - stdx::placeholders::_1, - newConfig, - myIndex)) + [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _heartbeatReconfigFinish(cbData, newConfig, myIndex); + }) .status_with_transitional_ignore(); return; } @@ -586,11 +577,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( // Wait for the election to complete and the node's Role to be set to follower. _replExecutor ->onEvent(electionFinishedEvent, - stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, - this, - stdx::placeholders::_1, - newConfig, - myIndex)) + [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _heartbeatReconfigFinish(cbData, newConfig, myIndex); + }) .status_with_transitional_ignore(); return; } @@ -645,10 +634,9 @@ void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { LOG_FOR_HEARTBEATS(2) << "Cancelling all heartbeats."; - std::for_each( - _heartbeatHandles.begin(), - _heartbeatHandles.end(), - stdx::bind(&executor::TaskExecutor::cancel, _replExecutor.get(), stdx::placeholders::_1)); + for (const auto& handle : _heartbeatHandles) { + _replExecutor->cancel(handle); + } // Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with // CallbackCanceled status, so it's better to leave the handles in the list, for now. @@ -735,10 +723,10 @@ void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock() { // just barely fresh and it has become stale since then. We must schedule another liveness // check to continue conducting liveness checks and be able to step down from primary if we // lose contact with a majority of nodes. - auto cbh = _scheduleWorkAt(nextTimeout, - stdx::bind(&ReplicationCoordinatorImpl::_handleLivenessTimeout, - this, - stdx::placeholders::_1)); + auto cbh = + _scheduleWorkAt(nextTimeout, [=](const executor::TaskExecutor::CallbackArgs& cbData) { + _handleLivenessTimeout(cbData); + }); if (!cbh) { return; } @@ -809,10 +797,9 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { LOG(4) << "Scheduling election timeout callback at " << when; _handleElectionTimeoutWhen = when; _handleElectionTimeoutCbh = - _scheduleWorkAt(when, - stdx::bind(&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, - this, - TopologyCoordinator::StartElectionReason::kElectionTimeout)); + _scheduleWorkAt(when, [=](const mongo::executor::TaskExecutor::CallbackArgs&) { + _startElectSelfIfEligibleV1(TopologyCoordinator::StartElectionReason::kElectionTimeout); + }); } void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1( diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index 417381a4c18..0894c8e7488 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -309,8 +309,7 @@ TEST_F(ReplCoordTest, Status status(ErrorCodes::InternalError, "Not Set"); const auto opCtx = makeOperationContext(); - stdx::thread reconfigThread( - stdx::bind(doReplSetReconfig, getReplCoord(), &status, opCtx.get())); + stdx::thread reconfigThread([&] { doReplSetReconfig(getReplCoord(), &status, opCtx.get()); }); NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); @@ -353,8 +352,7 @@ TEST_F(ReplCoordTest, NodeReturnsOutOfDiskSpaceWhenSavingANewConfigFailsDuringRe getExternalState()->setStoreLocalConfigDocumentStatus( Status(ErrorCodes::OutOfDiskSpace, "The test set this")); const auto opCtx = makeOperationContext(); - stdx::thread reconfigThread( - stdx::bind(doReplSetReconfig, getReplCoord(), &status, opCtx.get())); + stdx::thread reconfigThread([&] { doReplSetReconfig(getReplCoord(), &status, opCtx.get()); }); replyToReceivedHeartbeatV1(); reconfigThread.join(); @@ -382,8 +380,7 @@ TEST_F(ReplCoordTest, Status status(ErrorCodes::InternalError, "Not Set"); const auto opCtx = makeOperationContext(); // first reconfig - stdx::thread reconfigThread( - stdx::bind(doReplSetReconfig, getReplCoord(), &status, opCtx.get())); + stdx::thread reconfigThread([&] { doReplSetReconfig(getReplCoord(), &status, opCtx.get()); }); getNet()->enterNetwork(); getNet()->blackHole(getNet()->getNextReadyRequest()); getNet()->exitNetwork(); @@ -420,7 +417,7 @@ TEST_F(ReplCoordTest, NodeReturnsConfigurationInProgressWhenReceivingAReconfigWh // initiate Status status(ErrorCodes::InternalError, "Not Set"); const auto opCtx = makeOperationContext(); - stdx::thread initateThread(stdx::bind(doReplSetInitiate, getReplCoord(), &status, opCtx.get())); + stdx::thread initateThread([&] { doReplSetInitiate(getReplCoord(), &status, opCtx.get()); }); getNet()->enterNetwork(); getNet()->blackHole(getNet()->getNextReadyRequest()); getNet()->exitNetwork(); @@ -467,8 +464,7 @@ TEST_F(ReplCoordTest, PrimaryNodeAcceptsNewConfigWhenReceivingAReconfigWithAComp Status status(ErrorCodes::InternalError, "Not Set"); const auto opCtx = makeOperationContext(); - stdx::thread reconfigThread( - stdx::bind(doReplSetReconfig, getReplCoord(), &status, opCtx.get())); + stdx::thread reconfigThread([&] { doReplSetReconfig(getReplCoord(), &status, opCtx.get()); }); NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); @@ -579,8 +575,7 @@ TEST_F(ReplCoordTest, NodeDoesNotAcceptHeartbeatReconfigWhileInTheMidstOfReconfi // start reconfigThread Status status(ErrorCodes::InternalError, "Not Set"); const auto opCtx = makeOperationContext(); - stdx::thread reconfigThread( - stdx::bind(doReplSetReconfig, getReplCoord(), &status, opCtx.get())); + stdx::thread reconfigThread([&] { doReplSetReconfig(getReplCoord(), &status, opCtx.get()); }); // wait for reconfigThread to create network requests to ensure the replication coordinator // is in state kConfigReconfiguring diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 3cf40cda0d4..1d4164876f8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -337,7 +337,7 @@ TEST_F(ReplCoordTest, NodeReturnsNodeNotFoundWhenQuorumCheckFailsWhileInitiating hbArgs.setTerm(0); Status status(ErrorCodes::InternalError, "Not set"); - stdx::thread prsiThread(stdx::bind(doReplSetInitiate, getReplCoord(), &status)); + stdx::thread prsiThread([&] { doReplSetInitiate(getReplCoord(), &status); }); const Date_t startDate = getNet()->now(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); @@ -372,7 +372,7 @@ TEST_F(ReplCoordTest, InitiateSucceedsWhenQuorumCheckPasses) { getReplCoord()->setMyLastAppliedOpTime(OpTime(appliedTS, 1)); Status status(ErrorCodes::InternalError, "Not set"); - stdx::thread prsiThread(stdx::bind(doReplSetInitiate, getReplCoord(), &status)); + stdx::thread prsiThread([&] { doReplSetInitiate(getReplCoord(), &status); }); const Date_t startDate = getNet()->now(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); @@ -1046,7 +1046,7 @@ public: void start() { ASSERT(!_finished); - _thread = stdx::thread(stdx::bind(&ReplicationAwaiter::_awaitReplication, this)); + _thread = stdx::thread([this] { _awaitReplication(); }); } void reset() { @@ -3146,7 +3146,7 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) { // reconfig Status status(ErrorCodes::InternalError, "Not Set"); - stdx::thread reconfigThread(stdx::bind(doReplSetReconfig, getReplCoord(), &status)); + stdx::thread reconfigThread([&] { doReplSetReconfig(getReplCoord(), &status); }); replyToReceivedHeartbeatV1(); reconfigThread.join(); @@ -3236,7 +3236,7 @@ TEST_F( // reconfig to fewer nodes Status status(ErrorCodes::InternalError, "Not Set"); - stdx::thread reconfigThread(stdx::bind(doReplSetReconfigToFewer, getReplCoord(), &status)); + stdx::thread reconfigThread([&] { doReplSetReconfigToFewer(getReplCoord(), &status); }); replyToReceivedHeartbeatV1(); @@ -3321,7 +3321,7 @@ TEST_F(ReplCoordTest, // reconfig to three nodes Status status(ErrorCodes::InternalError, "Not Set"); - stdx::thread reconfigThread(stdx::bind(doReplSetReconfig, getReplCoord(), &status)); + stdx::thread reconfigThread([&] { doReplSetReconfig(getReplCoord(), &status); }); replyToReceivedHeartbeatV1(); reconfigThread.join(); diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index b1fe3d50e36..edddf76ade0 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -170,8 +170,10 @@ Status Reporter::trigger() { return Status::OK(); } - auto scheduleResult = _executor->scheduleWork( - stdx::bind(&Reporter::_prepareAndSendCommandCallback, this, stdx::placeholders::_1, true)); + auto scheduleResult = + _executor->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& args) { + _prepareAndSendCommandCallback(args, true); + }); _status = scheduleResult.getStatus(); if (!_status.isOK()) { @@ -212,7 +214,9 @@ void Reporter::_sendCommand_inlock(BSONObj commandRequest) { auto scheduleResult = _executor->scheduleRemoteCommand( executor::RemoteCommandRequest(_target, "admin", commandRequest, nullptr), - stdx::bind(&Reporter::_processResponseCallback, this, stdx::placeholders::_1)); + [this](const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd) { + _processResponseCallback(rcbd); + }); _status = scheduleResult.getStatus(); if (!_status.isOK()) { @@ -268,13 +272,10 @@ void Reporter::_processResponseCallback( // triggered. auto when = _executor->now() + _keepAliveInterval; bool fromTrigger = false; - auto scheduleResult = - _executor->scheduleWorkAt(when, - stdx::bind(&Reporter::_prepareAndSendCommandCallback, - this, - stdx::placeholders::_1, - fromTrigger)); - + auto scheduleResult = _executor->scheduleWorkAt( + when, [=](const executor::TaskExecutor::CallbackArgs& args) { + _prepareAndSendCommandCallback(args, fromTrigger); + }); _status = scheduleResult.getStatus(); if (!_status.isOK()) { _onShutdown_inlock(); diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index aec94c02387..a827e1dec7c 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -146,8 +146,9 @@ void ReporterTest::setUp() { posUpdater = stdx::make_unique<MockProgressManager>(); posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); - prepareReplSetUpdatePositionCommandFn = - stdx::bind(&MockProgressManager::prepareReplSetUpdatePositionCommand, posUpdater.get()); + prepareReplSetUpdatePositionCommandFn = [updater = posUpdater.get()] { + return updater->prepareReplSetUpdatePositionCommand(); + }; reporter = stdx::make_unique<Reporter>(_executorProxy.get(), diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp index 226b36f46ff..7d9569c4663 100644 --- a/src/mongo/db/repl/scatter_gather_runner.cpp +++ b/src/mongo/db/repl/scatter_gather_runner.cpp @@ -152,10 +152,9 @@ void ScatterGatherRunner::RunnerImpl::processResponse( void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() { if (_sufficientResponsesReceived.isValid()) { - std::for_each( - _callbacks.begin(), - _callbacks.end(), - stdx::bind(&executor::TaskExecutor::cancel, _executor, stdx::placeholders::_1)); + for (const CallbackHandle& cbh : _callbacks) { + _executor->cancel(cbh); + }; // Clear _callbacks to break the cycle of shared_ptr. _callbacks.clear(); _executor->signalEvent(_sufficientResponsesReceived); diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index cb9fae75ecb..bb4ca86d25c 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -119,8 +119,7 @@ public: } void run() { - _thread.reset( - new stdx::thread(stdx::bind(&ScatterGatherRunnerRunner::_run, this, _executor))); + _thread = stdx::make_unique<stdx::thread>([this] { _run(_executor); }); } private: diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index f95e5ae5aeb..f66f26c4cd4 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -173,11 +173,11 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher( << BSON(OplogEntryBase::kTimestampFieldName << 1 << OplogEntryBase::kTermFieldName << 1)), - stdx::bind(&SyncSourceResolver::_firstOplogEntryFetcherCallback, - this, - stdx::placeholders::_1, - candidate, - earliestOpTimeSeen), + [=](const StatusWith<Fetcher::QueryResponse>& response, + Fetcher::NextAction*, + BSONObjBuilder*) { + return _firstOplogEntryFetcherCallback(response, candidate, earliestOpTimeSeen); + }, ReadPreferenceSetting::secondaryPreferredMetadata(), kFetcherTimeout /* find network timeout */, kFetcherTimeout /* getMore network timeout */); @@ -195,12 +195,11 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP BSON("find" << kLocalOplogNss.coll() << "oplogReplay" << true << "filter" << BSON("ts" << BSON("$gte" << _requiredOpTime.getTimestamp() << "$lte" << _requiredOpTime.getTimestamp()))), - stdx::bind(&SyncSourceResolver::_requiredOpTimeFetcherCallback, - this, - stdx::placeholders::_1, - candidate, - earliestOpTimeSeen, - rbid), + [=](const StatusWith<Fetcher::QueryResponse>& response, + Fetcher::NextAction*, + BSONObjBuilder*) { + return _requiredOpTimeFetcherCallback(response, candidate, earliestOpTimeSeen, rbid); + }, ReadPreferenceSetting::secondaryPreferredMetadata(), kFetcherTimeout /* find network timeout */, kFetcherTimeout /* getMore network timeout */); @@ -354,12 +353,9 @@ Status SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime ea invariant(_state == State::kRunning); auto handle = _taskExecutor->scheduleRemoteCommand( {candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout}, - stdx::bind(&SyncSourceResolver::_rbidRequestCallback, - this, - candidate, - earliestOpTimeSeen, - stdx::placeholders::_1)); - + [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { + _rbidRequestCallback(candidate, earliestOpTimeSeen, rbidReply); + }); if (!handle.isOK()) { return handle.getStatus(); } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 8f1a1e63795..3fa8f87db9f 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -414,12 +414,10 @@ Status SyncTail::syncApply(OperationContext* opCtx, Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) { - return SyncTail::syncApply(opCtx, - op, - oplogApplicationMode, - applyOperation_inlock, - applyCommand_inlock, - stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL)); + return SyncTail::syncApply( + opCtx, op, oplogApplicationMode, applyOperation_inlock, applyCommand_inlock, [] { + opsAppliedStats.increment(1); + }); } diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index ff3080cf161..909bb594ad2 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -113,7 +113,7 @@ void TaskRunner::schedule(const Task& task) { return; } - _threadPool->schedule(stdx::bind(&TaskRunner::_runTasks, this)); + _threadPool->schedule([this] { _runTasks(); }); _active = true; _cancelRequested = false; diff --git a/src/mongo/db/repl/topology_coordinator_test.cpp b/src/mongo/db/repl/topology_coordinator_test.cpp index 9f738b17d5a..edbe0f15d9b 100644 --- a/src/mongo/db/repl/topology_coordinator_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_test.cpp @@ -103,9 +103,9 @@ protected: } int64_t countLogLinesContaining(const std::string& needle) { - return std::count_if(getCapturedLogMessages().begin(), - getCapturedLogMessages().end(), - stdx::bind(stringContains, stdx::placeholders::_1, needle)); + const auto& msgs = getCapturedLogMessages(); + return std::count_if( + msgs.begin(), msgs.end(), [&](const auto& s) { return stringContains(s, needle); }); } void makeSelfPrimary(const Timestamp& electionTimestamp = Timestamp(0, 0), diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 88137f5cdc1..bbee05955b0 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -104,9 +104,9 @@ protected: } int64_t countLogLinesContaining(const std::string& needle) { - return std::count_if(getCapturedLogMessages().begin(), - getCapturedLogMessages().end(), - stdx::bind(stringContains, stdx::placeholders::_1, needle)); + const auto& msgs = getCapturedLogMessages(); + return std::count_if( + msgs.begin(), msgs.end(), [&](const auto& s) { return stringContains(s, needle); }); } void makeSelfPrimary(const Timestamp& electionTimestamp = Timestamp(0, 0)) { diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp index 04142a340a3..d8fee83598e 100644 --- a/src/mongo/db/repl/vote_requester_test.cpp +++ b/src/mongo/db/repl/vote_requester_test.cpp @@ -99,9 +99,9 @@ public: protected: int64_t countLogLinesContaining(const std::string& needle) { - return std::count_if(getCapturedLogMessages().begin(), - getCapturedLogMessages().end(), - stdx::bind(stringContains, stdx::placeholders::_1, needle)); + const auto& msgs = getCapturedLogMessages(); + return std::count_if( + msgs.begin(), msgs.end(), [&](const auto& s) { return stringContains(s, needle); }); } bool hasReceivedSufficientResponses() { diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 53b6f9e4b9a..34a0732625f 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -338,8 +338,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { _isStateChanged.notify_all(); } - _thread = stdx::thread(stdx::bind( - &SessionCatalogMigrationDestination::_retrieveSessionStateFromSource, this, service)); + _thread = stdx::thread([=] { _retrieveSessionStateFromSource(service); }); } void SessionCatalogMigrationDestination::finish() { diff --git a/src/mongo/db/sessions_collection_mock.cpp b/src/mongo/db/sessions_collection_mock.cpp index 7f2cff63e70..1319fd891df 100644 --- a/src/mongo/db/sessions_collection_mock.cpp +++ b/src/mongo/db/sessions_collection_mock.cpp @@ -33,11 +33,8 @@ namespace mongo { MockSessionsCollectionImpl::MockSessionsCollectionImpl() - : _sessions(), - _refresh( - stdx::bind(&MockSessionsCollectionImpl::_refreshSessions, this, stdx::placeholders::_1)), - _remove( - stdx::bind(&MockSessionsCollectionImpl::_removeRecords, this, stdx::placeholders::_1)) {} + : _refresh([=](const LogicalSessionRecordSet& sessions) { return _refreshSessions(sessions); }), + _remove([=](const LogicalSessionIdSet& sessions) { return _removeRecords(sessions); }) {} void MockSessionsCollectionImpl::setRefreshHook(RefreshHook hook) { _refresh = std::move(hook); @@ -48,9 +45,8 @@ void MockSessionsCollectionImpl::setRemoveHook(RemoveHook hook) { } void MockSessionsCollectionImpl::clearHooks() { - _refresh = - stdx::bind(&MockSessionsCollectionImpl::_refreshSessions, this, stdx::placeholders::_1); - _remove = stdx::bind(&MockSessionsCollectionImpl::_removeRecords, this, stdx::placeholders::_1); + _refresh = [=](const LogicalSessionRecordSet& sessions) { return _refreshSessions(sessions); }; + _remove = [=](const LogicalSessionIdSet& sessions) { return _removeRecords(sessions); }; } Status MockSessionsCollectionImpl::refreshSessions(const LogicalSessionRecordSet& sessions) { diff --git a/src/mongo/db/storage/mmap_v1/catalog/namespace_index.cpp b/src/mongo/db/storage/mmap_v1/catalog/namespace_index.cpp index 8e5306b6500..a6530e391ba 100644 --- a/src/mongo/db/storage/mmap_v1/catalog/namespace_index.cpp +++ b/src/mongo/db/storage/mmap_v1/catalog/namespace_index.cpp @@ -128,19 +128,14 @@ boost::filesystem::path NamespaceIndex::path() const { return ret; } -static void namespaceGetNamespacesCallback(const Namespace& k, - NamespaceDetails& v, - list<string>* l) { - if (!k.hasDollarSign() || k == "local.oplog.$main") { - // we call out local.oplog.$main specifically as its the only "normal" - // collection that has a $, so we make sure it gets added - l->push_back(k.toString()); - } -} - void NamespaceIndex::getCollectionNamespaces(list<string>* tofill) const { - _ht->iterAll(stdx::bind( - namespaceGetNamespacesCallback, stdx::placeholders::_1, stdx::placeholders::_2, tofill)); + _ht->iterAll([tofill](const Namespace& k, NamespaceDetails& v) { + if (!k.hasDollarSign() || k == "local.oplog.$main") { + // we call out local.oplog.$main specifically as its the only "normal" + // collection that has a $, so we make sure it gets added + tofill->push_back(k.toString()); + } + }); } void NamespaceIndex::maybeMkdir() const { diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp index 985db4864a0..4df45b463ce 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp @@ -140,7 +140,7 @@ void JournalWriter::start() { } // Start the thread - stdx::thread t(stdx::bind(&JournalWriter::_journalWriterThread, this)); + stdx::thread t([this] { _journalWriterThread(); }); _journalWriterThreadHandle.swap(t); } diff --git a/src/mongo/db/storage/mmap_v1/file_allocator.cpp b/src/mongo/db/storage/mmap_v1/file_allocator.cpp index d905adb2855..3f95c013442 100644 --- a/src/mongo/db/storage/mmap_v1/file_allocator.cpp +++ b/src/mongo/db/storage/mmap_v1/file_allocator.cpp @@ -130,7 +130,7 @@ FileAllocator::FileAllocator() : _failed() {} void FileAllocator::start() { - stdx::thread t(stdx::bind(&FileAllocator::run, this)); + stdx::thread t([this] { run(this); }); t.detach(); } diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp index e6a5753b0fe..0af7e1c3631 100644 --- a/src/mongo/dbtests/threadedtests.cpp +++ b/src/mongo/dbtests/threadedtests.cpp @@ -77,7 +77,7 @@ private: if (!remaining) return; - stdx::thread athread(stdx::bind(&ThreadedTest::subthread, this, remaining)); + stdx::thread athread([=] { subthread(remaining); }); launch_subthreads(remaining - 1); athread.join(); } diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 048aa895f2f..fb71872ce39 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -161,7 +161,9 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( const CallbackHandle& cbHandle, const std::vector<NetworkOperationList*> queuesToCheck, const ResponseStatus& response) { - auto matchFn = stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle); + + auto matchFn = [&cbHandle](const auto& ops) { return ops.isForCallback(cbHandle); }; + for (auto list : queuesToCheck) { auto noi = std::find_if(list->begin(), list->end(), matchFn); if (noi == list->end()) { @@ -447,11 +449,9 @@ void NetworkInterfaceMock::_enqueueOperation_inlock( invariant(op.getRequest().timeout >= Milliseconds(0)); ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0)); std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_scheduled}; - auto action = stdx::bind(&NetworkInterfaceMock::_interruptWithResponse_inlock, - this, - op.getCallbackHandle(), - queuesToCheck, - rs); + auto action = [ =, cbh = op.getCallbackHandle() ] { + _interruptWithResponse_inlock(cbh, queuesToCheck, rs); + }; _alarms.emplace(_now_inlock() + op.getRequest().timeout, action); } } diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index 8cf3521e095..2185c221cf0 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -108,44 +108,62 @@ public: }); \ void CET_##TEST_NAME::_doTest() -void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* target) { - *target = cbData.status; +auto makeSetStatusClosure(Status* target) { + return [target](const TaskExecutor::CallbackArgs& cbData) { *target = cbData.status; }; } -void setStatusAndShutdown(const TaskExecutor::CallbackArgs& cbData, Status* target) { - setStatus(cbData, target); - if (cbData.status != ErrorCodes::CallbackCanceled) - cbData.executor->shutdown(); +auto makeSetStatusAndShutdownClosure(Status* target) { + return [target](const TaskExecutor::CallbackArgs& cbData) { + *target = cbData.status; + if (cbData.status != ErrorCodes::CallbackCanceled) { + cbData.executor->shutdown(); + } + }; } -void setStatusAndTriggerEvent(const TaskExecutor::CallbackArgs& cbData, - Status* outStatus, - TaskExecutor::EventHandle event) { - *outStatus = cbData.status; - if (!cbData.status.isOK()) - return; - cbData.executor->signalEvent(event); +auto makeSetStatusAndTriggerEventClosure(Status* target, TaskExecutor::EventHandle event) { + return [=](const TaskExecutor::CallbackArgs& cbData) { + *target = cbData.status; + if (!cbData.status.isOK()) + return; + cbData.executor->signalEvent(event); + }; } -void scheduleSetStatusAndShutdown(const TaskExecutor::CallbackArgs& cbData, - Status* outStatus1, - Status* outStatus2) { - if (!cbData.status.isOK()) { - *outStatus1 = cbData.status; - return; - } - *outStatus1 = - cbData.executor - ->scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, outStatus2)) - .getStatus(); +auto makeScheduleSetStatusAndShutdownClosure(Status* outStatus1, Status* outStatus2) { + return [=](const TaskExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + *outStatus1 = cbData.status; + return; + } + *outStatus1 = + cbData.executor->scheduleWork(makeSetStatusAndShutdownClosure(outStatus2)).getStatus(); + }; +} + +auto makeSetStatusOnRemoteCommandCompletionClosure(const RemoteCommandRequest* expectedRequest, + Status* outStatus) { + return [=](const TaskExecutor::RemoteCommandCallbackArgs& cbData) { + if (cbData.request != *expectedRequest) { + auto desc = [](const RemoteCommandRequest& request) -> std::string { + return mongoutils::str::stream() << "Request(" << request.target.toString() << ", " + << request.dbname << ", " << request.cmdObj << ')'; + }; + *outStatus = + Status(ErrorCodes::BadValue, + mongoutils::str::stream() << "Actual request: " << desc(cbData.request) + << "; expected: " + << desc(*expectedRequest)); + return; + } + *outStatus = cbData.response.status; + }; } COMMON_EXECUTOR_TEST(RunOne) { TaskExecutor& executor = getExecutor(); Status status = getDetectableErrorStatus(); - ASSERT_OK( - executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, &status)) - .getStatus()); + ASSERT_OK(executor.scheduleWork(makeSetStatusAndShutdownClosure(&status)).getStatus()); launchExecutorThread(); joinExecutorThread(); ASSERT_OK(status); @@ -154,9 +172,7 @@ COMMON_EXECUTOR_TEST(RunOne) { COMMON_EXECUTOR_TEST(Schedule1ButShutdown) { TaskExecutor& executor = getExecutor(); Status status = getDetectableErrorStatus(); - ASSERT_OK( - executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, &status)) - .getStatus()); + ASSERT_OK(executor.scheduleWork(makeSetStatusAndShutdownClosure(&status)).getStatus()); executor.shutdown(); launchExecutorThread(); joinExecutorThread(); @@ -167,12 +183,10 @@ COMMON_EXECUTOR_TEST(Schedule2Cancel1) { TaskExecutor& executor = getExecutor(); Status status1 = getDetectableErrorStatus(); Status status2 = getDetectableErrorStatus(); - TaskExecutor::CallbackHandle cb = unittest::assertGet( - executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, &status1))); + TaskExecutor::CallbackHandle cb = + unittest::assertGet(executor.scheduleWork(makeSetStatusAndShutdownClosure(&status1))); executor.cancel(cb); - ASSERT_OK( - executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, &status2)) - .getStatus()); + ASSERT_OK(executor.scheduleWork(makeSetStatusAndShutdownClosure(&status2)).getStatus()); launchExecutorThread(); joinExecutorThread(); ASSERT_EQUALS(status1, ErrorCodes::CallbackCanceled); @@ -183,9 +197,7 @@ COMMON_EXECUTOR_TEST(OneSchedulesAnother) { TaskExecutor& executor = getExecutor(); Status status1 = getDetectableErrorStatus(); Status status2 = getDetectableErrorStatus(); - ASSERT_OK(executor - .scheduleWork(stdx::bind( - scheduleSetStatusAndShutdown, stdx::placeholders::_1, &status1, &status2)) + ASSERT_OK(executor.scheduleWork(makeScheduleSetStatusAndShutdownClosure(&status1, &status2)) .getStatus()); launchExecutorThread(); joinExecutorThread(); @@ -243,8 +255,8 @@ EventChainAndWaitingTest::EventChainAndWaitingTest(TaskExecutor* exec, status3(ErrorCodes::InternalError, "Not mutated"), status4(ErrorCodes::InternalError, "Not mutated"), status5(ErrorCodes::InternalError, "Not mutated") { - triggered2 = stdx::bind(setStatusAndTriggerEvent, stdx::placeholders::_1, &status2, event2); - triggered3 = stdx::bind(setStatusAndTriggerEvent, stdx::placeholders::_1, &status3, event3); + triggered2 = makeSetStatusAndTriggerEventClosure(&status2, event2); + triggered3 = makeSetStatusAndTriggerEventClosure(&status3, event3); } EventChainAndWaitingTest::~EventChainAndWaitingTest() { @@ -254,9 +266,7 @@ EventChainAndWaitingTest::~EventChainAndWaitingTest() { } void EventChainAndWaitingTest::run() { - executor - ->onEvent(goEvent, - stdx::bind(&EventChainAndWaitingTest::onGo, this, stdx::placeholders::_1)) + executor->onEvent(goEvent, [=](const TaskExecutor::CallbackArgs& cbData) { onGo(cbData); }) .status_with_transitional_ignore(); executor->signalEvent(goEvent); executor->waitForEvent(goEvent); @@ -268,8 +278,8 @@ void EventChainAndWaitingTest::run() { executor->waitForEvent(neverSignaledEvent); }; neverSignaledWaiter = stdx::thread(waitForeverCallback); - TaskExecutor::CallbackHandle shutdownCallback = unittest::assertGet( - executor->scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, &status5))); + TaskExecutor::CallbackHandle shutdownCallback = + unittest::assertGet(executor->scheduleWork(makeSetStatusAndShutdownClosure(&status5))); executor->wait(shutdownCallback); } @@ -309,8 +319,7 @@ void EventChainAndWaitingTest::onGo(const TaskExecutor::CallbackArgs& cbData) { } cbHandle = executor->onEvent( - goEvent, - stdx::bind(&EventChainAndWaitingTest::onGoAfterTriggered, this, stdx::placeholders::_1)); + goEvent, [=](const TaskExecutor::CallbackArgs& cbData) { onGoAfterTriggered(cbData); }); if (!cbHandle.isOK()) { status1 = cbHandle.getStatus(); executor->shutdown(); @@ -359,15 +368,14 @@ COMMON_EXECUTOR_TEST(ScheduleWorkAt) { Status status4 = getDetectableErrorStatus(); const Date_t now = net->now(); - const TaskExecutor::CallbackHandle cb1 = unittest::assertGet(executor.scheduleWorkAt( - now + Milliseconds(100), stdx::bind(setStatus, stdx::placeholders::_1, &status1))); - const TaskExecutor::CallbackHandle cb4 = unittest::assertGet(executor.scheduleWorkAt( - now - Milliseconds(50), stdx::bind(setStatus, stdx::placeholders::_1, &status4))); - unittest::assertGet(executor.scheduleWorkAt( - now + Milliseconds(5000), stdx::bind(setStatus, stdx::placeholders::_1, &status3))); + const TaskExecutor::CallbackHandle cb1 = unittest::assertGet( + executor.scheduleWorkAt(now + Milliseconds(100), makeSetStatusClosure(&status1))); + const TaskExecutor::CallbackHandle cb4 = unittest::assertGet( + executor.scheduleWorkAt(now - Milliseconds(50), makeSetStatusClosure(&status4))); + unittest::assertGet( + executor.scheduleWorkAt(now + Milliseconds(5000), makeSetStatusClosure(&status3))); const TaskExecutor::CallbackHandle cb2 = unittest::assertGet(executor.scheduleWorkAt( - now + Milliseconds(200), - stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, &status2))); + now + Milliseconds(200), makeSetStatusAndShutdownClosure(&status2))); executor.wait(cb4); ASSERT_OK(status4); @@ -386,26 +394,6 @@ COMMON_EXECUTOR_TEST(ScheduleWorkAt) { ASSERT_EQUALS(status3, ErrorCodes::CallbackCanceled); } -std::string getRequestDescription(const RemoteCommandRequest& request) { - return mongoutils::str::stream() << "Request(" << request.target.toString() << ", " - << request.dbname << ", " << request.cmdObj << ')'; -} - -static void setStatusOnRemoteCommandCompletion( - const TaskExecutor::RemoteCommandCallbackArgs& cbData, - const RemoteCommandRequest& expectedRequest, - Status* outStatus) { - if (cbData.request != expectedRequest) { - *outStatus = Status(ErrorCodes::BadValue, - mongoutils::str::stream() << "Actual request: " - << getRequestDescription(cbData.request) - << "; expected: " - << getRequestDescription(expectedRequest)); - return; - } - *outStatus = cbData.response.status; -} - COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) { NetworkInterfaceMock* net = getNet(); TaskExecutor& executor = getExecutor(); @@ -417,8 +405,7 @@ COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) { << "doc"), nullptr); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( - request, - stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1))); + request, makeSetStatusOnRemoteCommandCompletionClosure(&request, &status1))); net->enterNetwork(); ASSERT(net->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); @@ -441,8 +428,7 @@ COMMON_EXECUTOR_TEST(ScheduleAndCancelRemoteCommand) { << "doc"), nullptr); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( - request, - stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1))); + request, makeSetStatusOnRemoteCommandCompletionClosure(&request, &status1))); executor.cancel(cbHandle); launchExecutorThread(); getNet()->enterNetwork(); @@ -463,8 +449,7 @@ COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) { const RemoteCommandRequest request( HostAndPort("lazy", 27017), "admin", BSON("sleep" << 1), nullptr, Milliseconds(1)); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( - request, - stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status))); + request, makeSetStatusOnRemoteCommandCompletionClosure(&request, &status))); net->enterNetwork(); ASSERT(net->hasReadyRequests()); const Date_t startTime = net->now(); @@ -484,11 +469,9 @@ COMMON_EXECUTOR_TEST(CallbackHandleComparison) { const RemoteCommandRequest request( HostAndPort("lazy", 27017), "admin", BSON("cmd" << 1), nullptr); TaskExecutor::CallbackHandle cbHandle1 = unittest::assertGet(executor.scheduleRemoteCommand( - request, - stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1))); + request, makeSetStatusOnRemoteCommandCompletionClosure(&request, &status1))); TaskExecutor::CallbackHandle cbHandle2 = unittest::assertGet(executor.scheduleRemoteCommand( - request, - stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status2))); + request, makeSetStatusOnRemoteCommandCompletionClosure(&request, &status2))); // test equality ASSERT_TRUE(cbHandle1 == cbHandle1); diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp index 2c4d29d8436..5fae69c33a2 100644 --- a/src/mongo/executor/thread_pool_task_executor_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -54,10 +54,6 @@ MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) { return Status::OK(); } -void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* outStatus) { - *outStatus = cbData.status; -} - TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) { auto net = getNet(); auto& executor = getExecutor(); @@ -65,7 +61,8 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) { auto status1 = getDetectableErrorStatus(); const auto now = net->now(); const auto cb1 = unittest::assertGet(executor.scheduleWorkAt( - now + Milliseconds(5000), stdx::bind(setStatus, stdx::placeholders::_1, &status1))); + now + Milliseconds(5000), + [&](const TaskExecutor::CallbackArgs& cbData) { status1 = cbData.status; })); const auto startTime = net->now(); net->enterNetwork(); diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 0dc955fe119..8cdb7190050 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -241,9 +241,9 @@ Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) { *remote.shardHostAndPort, _db, remote.cmdObj, _metadataObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand( - request, - stdx::bind( - &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, remoteIndex)); + request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { + _handleResponse(cbData, remoteIndex); + }); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp index 1877ae1d078..5c9472c39c1 100644 --- a/src/mongo/shell/bench.cpp +++ b/src/mongo/shell/bench.cpp @@ -706,7 +706,7 @@ BenchRunWorker::BenchRunWorker(size_t id, BenchRunWorker::~BenchRunWorker() = default; void BenchRunWorker::start() { - stdx::thread(stdx::bind(&BenchRunWorker::run, this)).detach(); + stdx::thread([this] { run(); }).detach(); } bool BenchRunWorker::shouldStop() const { diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index d675cdf7bff..027d7baa2b1 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -56,7 +56,6 @@ namespace mongo { using namespace transport; -using namespace stdx::placeholders; using TicketCallback = TransportLayer::TicketCallback; using SEPTestSession = ServiceEntryPointTestSuite::SEPTestSession; @@ -102,11 +101,13 @@ const auto kEndConnectionStatus = Status(ErrorCodes::HostUnreachable, "connectio } // namespace ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness() - : _sourceMessage( - stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3)), - _sinkMessage( - stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSink, this, _1, _2, _3)), - _wait(stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1)), + : _sourceMessage([this](const transport::SessionHandle& h, Message * m, Date_t d) { + return _defaultSource(h, m, d); + }), + _sinkMessage([this](const transport::SessionHandle& h, const Message& m, Date_t d) { + return _defaultSink(h, m, d); + }), + _wait([this](transport::Ticket t) { return _defaultWait(std::move(t)); }), _asyncWait(kDefaultAsyncWait), _end(kDefaultEnd) {} @@ -160,7 +161,7 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitError(transport::Ticket t } Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::Ticket ticket) { - _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, this, _1); + _wait = [this](transport::Ticket t) { return _waitError(std::move(t)); }; return _defaultWait(std::move(ticket)); } @@ -179,16 +180,18 @@ Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const SessionHand Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const SessionHandle& s, const Message& m, Date_t d) { - _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, this, _1); + _wait = [=](transport::Ticket t) { return _waitOnceThenError(std::move(t)); }; return _defaultSink(s, m, d); } void ServiceEntryPointTestSuite::MockTLHarness::_resetHooks() { - _sourceMessage = - stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSource, this, _1, _2, _3); - _sinkMessage = - stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultSink, this, _1, _2, _3); - _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_defaultWait, this, _1); + _sourceMessage = [this](const transport::SessionHandle& h, Message* m, Date_t d) { + return _defaultSource(h, m, d); + }; + _sinkMessage = [this](const transport::SessionHandle& h, const Message& m, Date_t d) { + return _defaultSink(h, m, d); + }; + _wait = [this](transport::Ticket t) { return _defaultWait(std::move(t)); }; _asyncWait = kDefaultAsyncWait; _end = kDefaultEnd; _destroy_hook = kDefaultDestroyHook; @@ -220,7 +223,9 @@ void ServiceEntryPointTestSuite::noLifeCycleTest() { // Step 1: SEP gets a ticket to source a Message // Step 2: SEP calls wait() on the ticket and receives an error - _tl->_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1); + _tl->_wait = [tlp = _tl.get()](transport::Ticket t) { + return tlp->_waitError(std::move(t)); + }; // Step 3: SEP destroys the session, which calls end() _tl->_destroy_hook = [&testComplete](SEPTestSession&) { testComplete.set_value(); }; @@ -245,8 +250,9 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() { _tl->_sinkMessage = [this](const SessionHandle& session, const Message& m, Date_t expiration) { // Step 4: SEP calls wait() on the ticket and receives an error - _tl->_wait = - stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitError, _tl.get(), _1); + _tl->_wait = [tlp = _tl.get()](transport::Ticket t) { + return tlp->_waitError(std::move(t)); + }; return _tl->_defaultSink(session, m, expiration); }; @@ -270,8 +276,9 @@ void ServiceEntryPointTestSuite::fullLifeCycleTest() { // Step 1: SEP gets a ticket to source a Message // Step 2: SEP calls wait() on the ticket and receives a Message - _tl->_sinkMessage = stdx::bind( - &ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait, _tl.get(), _1, _2, _3); + _tl->_sinkMessage = [tlp = _tl.get()](auto&& a1, auto&& a2, auto&& a3) { + return tlp->_sinkThenErrorOnWait(a1, a2, a3); + }; // Step 3: SEP gets a ticket to sink a Message // Step 4: SEP calls wait() on the ticket and receives Status::OK() @@ -325,8 +332,9 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() { startB.set_value(); resumeAFuture.wait(); - _tl->_wait = stdx::bind( - &ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, _tl.get(), _1); + _tl->_wait = [tlp = _tl.get()](transport::Ticket t) { + return tlp->_waitOnceThenError(std::move(t)); + }; return Status::OK(); }; @@ -338,7 +346,7 @@ void ServiceEntryPointTestSuite::interruptingSessionTest() { // Step 7: SEP calls sourceMessage() for B, gets tB3 // Step 8: SEP calls wait() for tB3, gets an error // Step 9: SEP calls end(B) - _tl->_destroy_hook = [this, idA, idB, &resumeA, &testComplete](SEPTestSession& session) { + _tl->_destroy_hook = [idA, idB, &resumeA, &testComplete](SEPTestSession& session) { // When end(B) is called, time to resume session A if (session.id() == idB) { // Resume session A @@ -380,8 +388,7 @@ void ServiceEntryPointTestSuite::burstStressTest(int numSessions, _tl->_resetHooks(); // Same wait() callback for all sessions. - _tl->_wait = [this, &completedCycles, &cyclesLock, numSessions, numCycles, &delay]( - Ticket ticket) -> Status { + _tl->_wait = [this, &completedCycles, &cyclesLock, numCycles, &delay](Ticket ticket) -> Status { auto id = ticket.sessionId(); int cycleCount; diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp index 0deea151158..6d9a9d8a255 100644 --- a/src/mongo/transport/transport_layer_legacy.cpp +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -66,9 +66,10 @@ void TransportLayerLegacy::ListenerLegacy::accepted(std::unique_ptr<AbstractMess TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts, ServiceEntryPoint* sep) : _sep(sep), - _listener(stdx::make_unique<ListenerLegacy>( - opts, - stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))), + _listener(stdx::make_unique<ListenerLegacy>(opts, + [=](std::unique_ptr<AbstractMessagingPort> port) { + _handleNewConnection(std::move(port)); + })), _running(false), _options(opts) {} |