summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2018-10-31 15:42:17 -0400
committerMathias Stearn <mathias@10gen.com>2018-11-15 17:25:11 -0500
commit3d7ed4fdd5e4840e9599a74ec92e16bc619bebf0 (patch)
tree2bd06655367af012e17bf80947cdb792fa1b9b44 /src/mongo/db/repl
parent1ded7067e2d1a6161b15e5a462f8cba2d755c9a6 (diff)
downloadmongo-3d7ed4fdd5e4840e9599a74ec92e16bc619bebf0.tar.gz
SERVER-35682 kill existing SharedPromise type
This required plumbing unique_function into many more places.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/abstract_async_component.cpp8
-rw-r--r--src/mongo/db/repl/abstract_async_component.h4
-rw-r--r--src/mongo/db/repl/abstract_async_component_test.cpp12
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp19
-rw-r--r--src/mongo/db/repl/collection_cloner.h8
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp18
-rw-r--r--src/mongo/db/repl/database_cloner.cpp8
-rw-r--r--src/mongo/db/repl/database_cloner.h9
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp7
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp2
-rw-r--r--src/mongo/db/repl/databases_cloner.h12
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp6
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp10
-rw-r--r--src/mongo/db/repl/initial_syncer.h10
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp6
-rw-r--r--src/mongo/db/repl/multiapplier.cpp6
-rw-r--r--src/mongo/db/repl/multiapplier.h4
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp17
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/reporter_test.cpp4
-rw-r--r--src/mongo/db/repl/task_executor_mock.cpp11
-rw-r--r--src/mongo/db/repl/task_executor_mock.h4
-rw-r--r--src/mongo/db/repl/task_runner.cpp46
-rw-r--r--src/mongo/db/repl/task_runner.h15
-rw-r--r--src/mongo/db/repl/task_runner_test.cpp8
27 files changed, 113 insertions, 153 deletions
diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp
index cf8941a05f3..9967dd8012a 100644
--- a/src/mongo/db/repl/abstract_async_component.cpp
+++ b/src/mongo/db/repl/abstract_async_component.cpp
@@ -166,7 +166,7 @@ Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock(
}
Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock(
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -175,7 +175,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock(
str::stream() << "failed to schedule work " << name << ": " << _componentName
<< " is shutting down");
}
- auto result = _executor->scheduleWork(work);
+ auto result = _executor->scheduleWork(std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(str::stream() << "failed to schedule work " << name);
}
@@ -185,7 +185,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock(
Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock(
Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -196,7 +196,7 @@ Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock(
<< _componentName
<< " is shutting down");
}
- auto result = _executor->scheduleWorkAt(when, work);
+ auto result = _executor->scheduleWorkAt(when, std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(
str::stream() << "failed to schedule work " << name << " at " << when.toString());
diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h
index e7f14121c9a..63fa5db7e46 100644
--- a/src/mongo/db/repl/abstract_async_component.h
+++ b/src/mongo/db/repl/abstract_async_component.h
@@ -148,11 +148,11 @@ protected:
* Saves handle if work was successfully scheduled.
* Returns scheduleWork status (without the handle).
*/
- Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work,
+ Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp
index 691c417ad2b..4dd1183b2b4 100644
--- a/src/mongo/db/repl/abstract_async_component_test.cpp
+++ b/src/mongo/db/repl/abstract_async_component_test.cpp
@@ -65,7 +65,7 @@ public:
* Publicly visible versions of _scheduleWorkAndSaveHandle_inlock() and
* _scheduleWorkAtAndSaveHandle_inlock() for testing.
*/
- Status scheduleWorkAndSaveHandle_forTest(const executor::TaskExecutor::CallbackFn& work,
+ Status scheduleWorkAndSaveHandle_forTest(executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
@@ -73,7 +73,7 @@ public:
* Publicly visible version of _scheduleWorkAtAndSaveHandle_inlock() for testing.
*/
Status scheduleWorkAtAndSaveHandle_forTest(Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
@@ -122,20 +122,20 @@ Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest(const Status
}
Status MockAsyncComponent::scheduleWorkAndSaveHandle_forTest(
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _scheduleWorkAndSaveHandle_inlock(work, handle, name);
+ return _scheduleWorkAndSaveHandle_inlock(std::move(work), handle, name);
}
Status MockAsyncComponent::scheduleWorkAtAndSaveHandle_forTest(
Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _scheduleWorkAtAndSaveHandle_inlock(when, work, handle, name);
+ return _scheduleWorkAtAndSaveHandle_inlock(when, std::move(work), handle, name);
}
void MockAsyncComponent::cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle) {
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 6876e88f5aa..6038ec6f544 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -109,7 +109,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
- const CallbackFn& onCompletion,
+ CallbackFn onCompletion,
StorageInterface* storageInterface,
const int batchSize)
: _executor(executor),
@@ -118,7 +118,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_sourceNss(sourceNss),
_destNss(_sourceNss),
_options(options),
- _onCompletion(onCompletion),
+ _onCompletion(std::move(onCompletion)),
_storageInterface(storageInterface),
_countScheduler(_executor,
RemoteCommandRequest(
@@ -155,9 +155,10 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_indexSpecs(),
_documentsToInsert(),
_dbWorkTaskRunner(_dbWorkThreadPool),
- _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) {
- auto task = [ this, work ](OperationContext * opCtx,
- const Status& status) noexcept->TaskRunner::NextAction {
+ _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) {
+ auto task = [ this, work = std::move(work) ](
+ OperationContext * opCtx,
+ const Status& status) mutable noexcept->TaskRunner::NextAction {
try {
work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx));
} catch (...) {
@@ -165,7 +166,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
}
return TaskRunner::NextAction::kDisposeOperationContext;
};
- _dbWorkTaskRunner.schedule(task);
+ _dbWorkTaskRunner.schedule(std::move(task));
return executor::TaskExecutor::CallbackHandle();
}),
_createClientFn([] { return stdx::make_unique<DBClientConnection>(); }),
@@ -184,7 +185,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
uassert(50953,
"Missing collection UUID in CollectionCloner, collection name: " + sourceNss.ns(),
_options.uuid);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface);
uassert(
50954, "collectionClonerBatchSize must be non-negative.", _collectionClonerBatchSize >= 0);
@@ -292,9 +293,9 @@ void CollectionCloner::waitForDbWorker() {
_dbWorkTaskRunner.join();
}
-void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn) {
+void CollectionCloner::setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) {
LockGuard lk(_mutex);
- _scheduleDbWorkFn = scheduleDbWorkFn;
+ _scheduleDbWorkFn = std::move(scheduleDbWorkFn);
}
void CollectionCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) {
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index c7149970f5e..4ed39cc5512 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -93,8 +93,8 @@ public:
*
* Used for testing only.
*/
- using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
- const executor::TaskExecutor::CallbackFn&)>;
+ using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
/**
* Type of function to create a database client
@@ -117,7 +117,7 @@ public:
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
- const CallbackFn& onCompletion,
+ CallbackFn onCompletion,
StorageInterface* storageInterface,
const int batchSize);
@@ -152,7 +152,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn);
/**
* Allows a different client class to be injected.
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 05c36c41cd6..c17cff0cc60 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -668,12 +668,13 @@ TEST_F(CollectionClonerTest,
// status.
auto exec = &getExecutor();
collectionCloner->setScheduleDbWorkFn_forTest([exec](
- const executor::TaskExecutor::CallbackFn& workFn) {
- auto wrappedTask = [workFn](const executor::TaskExecutor::CallbackArgs& cbd) {
+ executor::TaskExecutor::CallbackFn workFn) {
+ auto wrappedTask = [workFn = std::move(workFn)](
+ const executor::TaskExecutor::CallbackArgs& cbd) {
workFn(executor::TaskExecutor::CallbackArgs(
cbd.executor, cbd.myHandle, Status(ErrorCodes::CallbackCanceled, ""), cbd.opCtx));
};
- return exec->scheduleWork(wrappedTask);
+ return exec->scheduleWork(std::move(wrappedTask));
});
bool collectionCreated = false;
@@ -1177,12 +1178,11 @@ TEST_F(CollectionClonerTest,
// Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet.
executor::TaskExecutor::CallbackFn insertDocumentsFn;
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- insertDocumentsFn = workFn;
- executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
- return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
- });
+ collectionCloner->setScheduleDbWorkFn_forTest([&](executor::TaskExecutor::CallbackFn workFn) {
+ insertDocumentsFn = std::move(workFn);
+ executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
+ return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
+ });
ASSERT_FALSE(insertDocumentsFn);
// Return first batch of collection documents from remote server for the getMore request.
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index a4648bc8e9a..b9c1e23edea 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -116,7 +116,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor,
const ListCollectionsPredicateFn& listCollectionsPred,
StorageInterface* si,
const CollectionCallbackFn& collWork,
- const CallbackFn& onCompletion)
+ CallbackFn onCompletion)
: _executor(executor),
_dbWorkThreadPool(dbWorkThreadPool),
_source(source),
@@ -128,7 +128,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor,
_listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred),
_storageInterface(si),
_collectionWork(collWork),
- _onCompletion(onCompletion),
+ _onCompletion(std::move(onCompletion)),
_listCollectionsFetcher(_executor,
_source,
_dbname,
@@ -152,7 +152,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor,
uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty());
uassert(ErrorCodes::BadValue, "storage interface cannot be null", si);
uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
_stats.dbname = _dbname;
}
@@ -259,7 +259,7 @@ void DatabaseCloner::join() {
_condition.wait(lk, [this]() { return !_isActive_inlock(); });
}
-void DatabaseCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
+void DatabaseCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h
index 0a5b893b294..7b2cd152592 100644
--- a/src/mongo/db/repl/database_cloner.h
+++ b/src/mongo/db/repl/database_cloner.h
@@ -97,6 +97,9 @@ public:
*/
using StartCollectionClonerFn = stdx::function<Status(CollectionCloner&)>;
+ using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
+
/**
* Creates DatabaseCloner task in inactive state. Use start() to activate cloner.
*
@@ -116,7 +119,7 @@ public:
const ListCollectionsPredicateFn& listCollectionsPredicate,
StorageInterface* storageInterface,
const CollectionCallbackFn& collectionWork,
- const CallbackFn& onCompletion);
+ CallbackFn onCompletion);
virtual ~DatabaseCloner();
@@ -146,7 +149,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
/**
* Overrides how executor starts a collection cloner.
@@ -234,7 +237,7 @@ private:
std::vector<NamespaceString> _collectionNamespaces; // (M)
std::list<CollectionCloner> _collectionCloners; // (M)
std::list<CollectionCloner>::iterator _currentCollectionClonerIter; // (M)
- CollectionCloner::ScheduleDbWorkFn
+ ScheduleDbWorkFn
_scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
StartCollectionClonerFn _startCollectionCloner; // (RT)
Stats _stats; // (M) Stats about what this instance did.
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index 65a0e054b9a..83c428340a7 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -104,10 +104,9 @@ void DatabaseClonerTest::setUp() {
storageInterface.get(),
makeCollectionWorkClosure(),
makeSetStatusClosure());
- _databaseCloner->setScheduleDbWorkFn_forTest(
- [this](const executor::TaskExecutor::CallbackFn& work) {
- return getExecutor().scheduleWork(work);
- });
+ _databaseCloner->setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) {
+ return getExecutor().scheduleWork(std::move(work));
+ });
_mockServer = stdx::make_unique<MockRemoteDBServer>(target.toString());
_mockServer->assignCollectionUuid("db.a", *_options1.uuid);
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index 960d84fe960..ac3bc398059 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -227,7 +227,7 @@ Status DatabasesCloner::startup() noexcept {
return Status::OK();
}
-void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
+void DatabasesCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
}
diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h
index 6af3b927229..970abc5da60 100644
--- a/src/mongo/db/repl/databases_cloner.h
+++ b/src/mongo/db/repl/databases_cloner.h
@@ -69,6 +69,8 @@ public:
using IncludeDbFilterFn = stdx::function<bool(const BSONObj& dbInfo)>;
using OnFinishFn = stdx::function<void(const Status&)>;
using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn;
+ using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
@@ -98,7 +100,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
/**
* Overrides how executor starts a collection cloner.
@@ -178,10 +180,10 @@ private:
mutable stdx::mutex _mutex; // (S)
Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything.
executor::TaskExecutor* _exec; // (R) executor to schedule things with
- ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
- const HostAndPort _source; // (R) The source to use.
- CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
- StartCollectionClonerFn _startCollectionClonerFn; // (M)
+ ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
+ const HostAndPort _source; // (R) The source to use.
+ ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
+ StartCollectionClonerFn _startCollectionClonerFn; // (M)
const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned.
OnFinishFn _finishFn; // (M) function called when finished.
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 47bec4f18ce..40a3b39a744 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -134,7 +134,7 @@ public:
getNet()->runReadyNetworkOperations();
if (getNet()->hasReadyRequests()) {
log() << "The network has unexpected requests to process, next req:";
- NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest();
log() << req.getDiagnosticString();
}
ASSERT_FALSE(getNet()->hasReadyRequests());
@@ -301,8 +301,8 @@ protected:
result = status;
cvDone.notify_all();
}};
- cloner.setScheduleDbWorkFn_forTest([this](const executor::TaskExecutor::CallbackFn& work) {
- return getExecutor().scheduleWork(work);
+ cloner.setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) {
+ return getExecutor().scheduleWork(std::move(work));
});
cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) {
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 6b68c13a7ea..28d3266ca82 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -378,7 +378,7 @@ BSONObj InitialSyncer::_getInitialSyncProgress_inlock() const {
return bob.obj();
}
-void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
+void InitialSyncer::setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
}
@@ -1431,7 +1431,7 @@ Status InitialSyncer::_checkForShutdownAndConvertStatus_inlock(const Status& sta
}
Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -1440,7 +1440,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
str::stream() << "failed to schedule work " << name
<< ": initial syncer is shutting down");
}
- auto result = _exec->scheduleWork(work);
+ auto result = _exec->scheduleWork(std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(str::stream() << "failed to schedule work " << name);
}
@@ -1450,7 +1450,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock(
Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -1460,7 +1460,7 @@ Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock(
<< when.toString()
<< ": initial syncer is shutting down");
}
- auto result = _exec->scheduleWorkAt(when, work);
+ auto result = _exec->scheduleWorkAt(when, std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(
str::stream() << "failed to schedule work " << name << " at " << when.toString());
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 0ead52f6f46..929643116c5 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -222,7 +222,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
/**
* Overrides how executor starts a collection cloner.
@@ -540,11 +540,11 @@ private:
* Saves handle if work was successfully scheduled.
* Returns scheduleWork status (without the handle).
*/
- Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work,
+ Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
@@ -633,8 +633,8 @@ private:
State _state = State::kPreStart; // (M)
// Passed to CollectionCloner via DatabasesCloner.
- CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
- StartCollectionClonerFn _startCollectionClonerFn; // (M)
+ DatabaseCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
+ StartCollectionClonerFn _startCollectionClonerFn; // (M)
// Contains stats on the current initial sync request (includes all attempts).
// To access these stats in a user-readable format, use getInitialSyncProgress().
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 6bc02d4d15a..620d9c5abff 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -215,7 +215,7 @@ public:
getNet()->runReadyNetworkOperations();
if (getNet()->hasReadyRequests()) {
log() << "The network has unexpected requests to process, next req:";
- NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest();
log() << req.getDiagnosticString();
}
ASSERT_FALSE(getNet()->hasReadyRequests());
@@ -404,8 +404,8 @@ protected:
_onCompletion(lastApplied);
});
_initialSyncer->setScheduleDbWorkFn_forTest(
- [this](const executor::TaskExecutor::CallbackFn& work) {
- return getExecutor().scheduleWork(work);
+ [this](executor::TaskExecutor::CallbackFn work) {
+ return getExecutor().scheduleWork(std::move(work));
});
_initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) {
cloner.setCreateClientFn_forTest([&cloner, this]() {
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp
index 655426a5ce3..e568bb3f8ca 100644
--- a/src/mongo/db/repl/multiapplier.cpp
+++ b/src/mongo/db/repl/multiapplier.cpp
@@ -45,15 +45,15 @@ namespace repl {
MultiApplier::MultiApplier(executor::TaskExecutor* executor,
const Operations& operations,
const MultiApplyFn& multiApply,
- const CallbackFn& onCompletion)
+ CallbackFn onCompletion)
: _executor(executor),
_operations(operations),
_multiApply(multiApply),
- _onCompletion(onCompletion) {
+ _onCompletion(std::move(onCompletion)) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
uassert(ErrorCodes::BadValue, "multi apply function cannot be null", multiApply);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
}
MultiApplier::~MultiApplier() {
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
index 3b2a59f7fc8..79fec694b98 100644
--- a/src/mongo/db/repl/multiapplier.h
+++ b/src/mongo/db/repl/multiapplier.h
@@ -67,7 +67,7 @@ public:
/**
* Callback function to report final status of applying operations.
*/
- using CallbackFn = stdx::function<void(const Status&)>;
+ using CallbackFn = unique_function<void(const Status&)>;
using MultiApplyFn =
stdx::function<StatusWith<OpTime>(OperationContext*, MultiApplier::Operations)>;
@@ -88,7 +88,7 @@ public:
MultiApplier(executor::TaskExecutor* executor,
const Operations& operations,
const MultiApplyFn& multiApply,
- const CallbackFn& onCompletion);
+ CallbackFn onCompletion);
/**
* Blocks while applier is active.
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 676d9c1f787..0eb44197c76 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -120,14 +120,14 @@ OplogBuffer* OplogApplier::getBuffer() const {
Future<void> OplogApplier::startup() {
auto pf = makePromiseFuture<void>();
auto callback =
- [ this, promise = pf.promise.share() ](const CallbackArgs& args) mutable noexcept {
+ [ this, promise = std::move(pf.promise) ](const CallbackArgs& args) mutable noexcept {
invariant(args.status);
log() << "Starting oplog application";
_run(_oplogBuffer);
log() << "Finished oplog application";
promise.setWith([] {});
};
- invariant(_executor->scheduleWork(callback).getStatus());
+ invariant(_executor->scheduleWork(std::move(callback)).getStatus());
return std::move(pf.future);
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 18c9ccac0a8..f5b1cbb55db 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -191,9 +191,9 @@ auto makeTaskExecutor(ServiceContext* service, const std::string& poolName) {
* Schedules a task using the executor. This task is always run unless the task executor is shutting
* down.
*/
-void scheduleWork(executor::TaskExecutor* executor,
- const executor::TaskExecutor::CallbackFn& work) {
- auto cbh = executor->scheduleWork([work](const executor::TaskExecutor::CallbackArgs& args) {
+void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) {
+ auto cbh = executor->scheduleWork([work = std::move(work)](
+ const executor::TaskExecutor::CallbackArgs& args) {
if (args.status == ErrorCodes::CallbackCanceled) {
return;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 824db78c888..e151e0b2482 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2726,7 +2726,7 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
}
// Schedule timeout callback.
auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
- auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
+ auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, std::move(timeoutCB));
if (!status.isOK()) {
log() << "Failed to schedule catchup timeout work.";
abort_inlock();
@@ -3512,13 +3512,14 @@ void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
}
}
-CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) {
- auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) {
- if (args.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- work(args);
- });
+CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, CallbackFn work) {
+ auto cbh =
+ _replExecutor->scheduleWorkAt(when, [work = std::move(work)](const CallbackArgs& args) {
+ if (args.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ work(args);
+ });
if (cbh == ErrorCodes::ShutdownInProgress) {
return {};
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index cff7179d1ce..233c0bf88e0 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1034,7 +1034,7 @@ private:
* All other non-shutdown scheduling failures will abort the process.
* Does not run 'work' if callback is canceled.
*/
- CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work);
+ CallbackHandle _scheduleWorkAt(Date_t when, CallbackFn work);
/**
* Creates an event.
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index f11ef6c3d94..e5b7607390d 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -579,7 +579,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp,
TaskExecutorWithFailureInScheduleWork(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWork(
- const CallbackFn& work) override {
+ CallbackFn work) override {
return Status(ErrorCodes::OperationFailed, "failed to schedule work");
}
};
@@ -631,7 +631,7 @@ TEST_F(ReporterTest, FailingToScheduleTimeoutShouldMakeReporterInactive) {
TaskExecutorWithFailureInScheduleWorkAt(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWorkAt(
- Date_t when, const CallbackFn& work) override {
+ Date_t when, CallbackFn work) override {
return Status(ErrorCodes::OperationFailed, "failed to schedule work");
}
};
diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp
index d3e14a7d8f1..727014a5cc2 100644
--- a/src/mongo/db/repl/task_executor_mock.cpp
+++ b/src/mongo/db/repl/task_executor_mock.cpp
@@ -38,25 +38,24 @@ namespace repl {
TaskExecutorMock::TaskExecutorMock(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
-StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork(
- const CallbackFn& work) {
+StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork(CallbackFn work) {
if (shouldFailScheduleWorkRequest()) {
return Status(ErrorCodes::OperationFailed, "failed to schedule work");
}
if (shouldDeferScheduleWorkRequestByOneSecond()) {
auto when = now() + Seconds(1);
- return getExecutor()->scheduleWorkAt(when, work);
+ return getExecutor()->scheduleWorkAt(when, std::move(work));
}
- return getExecutor()->scheduleWork(work);
+ return getExecutor()->scheduleWork(std::move(work));
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWorkAt(
- Date_t when, const CallbackFn& work) {
+ Date_t when, CallbackFn work) {
if (shouldFailScheduleWorkAtRequest()) {
return Status(ErrorCodes::OperationFailed,
str::stream() << "failed to schedule work at " << when.toString());
}
- return getExecutor()->scheduleWorkAt(when, work);
+ return getExecutor()->scheduleWorkAt(when, std::move(work));
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand(
diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h
index 41740b74433..e5e92e0f0e1 100644
--- a/src/mongo/db/repl/task_executor_mock.h
+++ b/src/mongo/db/repl/task_executor_mock.h
@@ -47,8 +47,8 @@ public:
explicit TaskExecutorMock(executor::TaskExecutor* executor);
- StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
StatusWith<CallbackHandle> scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index 94bd65d1d8d..43f0b735789 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -102,12 +102,12 @@ bool TaskRunner::isActive() const {
return _active;
}
-void TaskRunner::schedule(const Task& task) {
+void TaskRunner::schedule(Task task) {
invariant(task);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _tasks.push_back(task);
+ _tasks.push_back(std::move(task));
_condition.notify_all();
if (_active) {
@@ -174,8 +174,8 @@ void TaskRunner::_runTasks() {
tasks.swap(_tasks);
lk.unlock();
// Cancel remaining tasks with a CallbackCanceled status.
- for (auto task : tasks) {
- runSingleTask(task,
+ for (auto&& task : tasks) {
+ runSingleTask(std::move(task),
nullptr,
Status(ErrorCodes::CallbackCanceled,
"this task has been canceled by a previously invoked task"));
@@ -207,46 +207,10 @@ TaskRunner::Task TaskRunner::_waitForNextTask() {
return Task();
}
- Task task = _tasks.front();
+ Task task = std::move(_tasks.front());
_tasks.pop_front();
return task;
}
-Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextAction nextAction) {
- // Setup cond_var for signaling when done.
- bool done = false;
- stdx::mutex mutex;
- stdx::condition_variable waitTillDoneCond;
-
- Status returnStatus{Status::OK()};
- this->schedule([&](OperationContext* opCtx, const Status taskStatus) {
- if (!taskStatus.isOK()) {
- returnStatus = taskStatus;
- } else {
- // Run supplied function.
- try {
- returnStatus = func(opCtx);
- } catch (...) {
- returnStatus = exceptionToStatus();
- error() << "Exception thrown in runSynchronousTask: " << redact(returnStatus);
- }
- }
-
- // Signal done.
- LockGuard lk2{mutex};
- done = true;
- waitTillDoneCond.notify_all();
-
- // return nextAction based on status from supplied function.
- if (returnStatus.isOK()) {
- return nextAction;
- }
- return TaskRunner::NextAction::kCancel;
- });
-
- UniqueLock lk{mutex};
- waitTillDoneCond.wait(lk, [&done] { return done; });
- return returnStatus;
-}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h
index 6875229bb9e..85313cc4ce9 100644
--- a/src/mongo/db/repl/task_runner.h
+++ b/src/mongo/db/repl/task_runner.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/functional.h"
namespace mongo {
@@ -60,17 +61,7 @@ public:
kCancel = 3,
};
- using Task = stdx::function<NextAction(OperationContext*, const Status&)>;
- using SynchronousTask = stdx::function<Status(OperationContext* opCtx)>;
-
- /**
- * Returns the Status from the supplied function after running it..
- *
- * Note: TaskRunner::NextAction controls when the operation context and thread will be released.
- */
- Status runSynchronousTask(
- SynchronousTask func,
- TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext);
+ using Task = unique_function<NextAction(OperationContext*, const Status&)>;
/**
* Creates a Task returning kCancel. This is useful in shutting down the task runner after
@@ -126,7 +117,7 @@ public:
* immediately. This is usually the case when the task runner is canceled. Accessing the
* operation context in the task will result in undefined behavior.
*/
- void schedule(const Task& task);
+ void schedule(Task task);
/**
* If there is a task that is already running, allows the task to run to completion.
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp
index b01885ba332..17771287f90 100644
--- a/src/mongo/db/repl/task_runner_test.cpp
+++ b/src/mongo/db/repl/task_runner_test.cpp
@@ -83,7 +83,7 @@ using OpIdVector = std::vector<unsigned int>;
OpIdVector _testRunTaskTwice(TaskRunnerTest& test,
TaskRunner::NextAction nextAction,
- stdx::function<void(const Task& task)> schedule) {
+ unique_function<void(Task task)> schedule) {
unittest::Barrier barrier(2U);
stdx::mutex mutex;
std::vector<OperationContext*> txns;
@@ -121,7 +121,7 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test,
std::vector<unsigned int> _testRunTaskTwice(TaskRunnerTest& test,
TaskRunner::NextAction nextAction) {
- auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); };
+ auto schedule = [&](Task task) { test.getTaskRunner().schedule(std::move(task)); };
return _testRunTaskTwice(test, nextAction, schedule);
}
@@ -134,9 +134,9 @@ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) {
// Joining thread pool before scheduling second task ensures that task runner releases
// thread back to pool after disposing of operation context.
TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) {
- auto schedule = [this](const Task& task) {
+ auto schedule = [this](Task task) {
getThreadPool().waitForIdle();
- getTaskRunner().schedule(task);
+ getTaskRunner().schedule(std::move(task));
};
auto txnId =
_testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule);