summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/authenticate.cpp5
-rw-r--r--src/mongo/client/fetcher.cpp6
-rw-r--r--src/mongo/client/fetcher.h2
-rw-r--r--src/mongo/db/free_mon/free_mon_controller_test.cpp4
-rw-r--r--src/mongo/db/free_mon/free_mon_mongod.cpp25
-rw-r--r--src/mongo/db/repl/abstract_async_component.cpp8
-rw-r--r--src/mongo/db/repl/abstract_async_component.h4
-rw-r--r--src/mongo/db/repl/abstract_async_component_test.cpp12
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp19
-rw-r--r--src/mongo/db/repl/collection_cloner.h8
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp18
-rw-r--r--src/mongo/db/repl/database_cloner.cpp8
-rw-r--r--src/mongo/db/repl/database_cloner.h9
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp7
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp2
-rw-r--r--src/mongo/db/repl/databases_cloner.h12
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp6
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp10
-rw-r--r--src/mongo/db/repl/initial_syncer.h10
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp6
-rw-r--r--src/mongo/db/repl/multiapplier.cpp6
-rw-r--r--src/mongo/db/repl/multiapplier.h4
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp17
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/reporter_test.cpp4
-rw-r--r--src/mongo/db/repl/task_executor_mock.cpp11
-rw-r--r--src/mongo/db/repl/task_executor_mock.h4
-rw-r--r--src/mongo/db/repl/task_runner.cpp46
-rw-r--r--src/mongo/db/repl/task_runner.h15
-rw-r--r--src/mongo/db/repl/task_runner_test.cpp8
-rw-r--r--src/mongo/executor/connection_pool.cpp4
-rw-r--r--src/mongo/executor/network_interface.h21
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp7
-rw-r--r--src/mongo/executor/network_interface_mock.cpp44
-rw-r--r--src/mongo/executor/network_interface_mock.h15
-rw-r--r--src/mongo/executor/network_interface_tl.cpp70
-rw-r--r--src/mongo/executor/network_interface_tl.h4
-rw-r--r--src/mongo/executor/task_executor.h9
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp5
-rw-r--r--src/mongo/executor/thread_pool_mock.h1
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp17
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h6
-rw-r--r--src/mongo/s/sharding_task_executor.cpp15
-rw-r--r--src/mongo/s/sharding_task_executor.h6
-rw-r--r--src/mongo/transport/asio_utils.h12
-rw-r--r--src/mongo/transport/baton.h4
-rw-r--r--src/mongo/transport/baton_asio_linux.h25
-rw-r--r--src/mongo/transport/service_executor_test.cpp4
-rw-r--r--src/mongo/transport/transport_layer.h7
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp10
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp12
-rw-r--r--src/mongo/unittest/task_executor_proxy.h7
-rw-r--r--src/mongo/util/background_thread_clock_source.cpp4
-rw-r--r--src/mongo/util/background_thread_clock_source.h2
-rw-r--r--src/mongo/util/clock_source.h4
-rw-r--r--src/mongo/util/clock_source_mock.cpp2
-rw-r--r--src/mongo/util/clock_source_mock.h6
-rw-r--r--src/mongo/util/concurrency/thread_pool_interface.h4
-rw-r--r--src/mongo/util/functional.h4
-rw-r--r--src/mongo/util/future.h75
-rw-r--r--src/mongo/util/future_test_future_int.cpp4
-rw-r--r--src/mongo/util/future_test_future_move_only.cpp22
-rw-r--r--src/mongo/util/future_test_future_void.cpp4
-rw-r--r--src/mongo/util/keyed_executor.h4
-rw-r--r--src/mongo/util/keyed_executor_test.cpp6
-rw-r--r--src/mongo/util/out_of_line_executor.h6
68 files changed, 328 insertions, 432 deletions
diff --git a/src/mongo/client/authenticate.cpp b/src/mongo/client/authenticate.cpp
index 6dc0337d627..8931c5de702 100644
--- a/src/mongo/client/authenticate.cpp
+++ b/src/mongo/client/authenticate.cpp
@@ -137,7 +137,7 @@ Future<void> authX509(RunCommandHook runCommand, const BSONObj& params, StringDa
// The runCommand hook checks whether the command returned { ok: 1.0 }, and we don't need to
// extract anything from the command payload, so this is just turning a Future<BSONObj>
// into a Future<void>
- return runCommand(authRequest.getValue()).then([](BSONObj obj) { return Status::OK(); });
+ return runCommand(authRequest.getValue()).ignoreValue();
}
} // namespace
@@ -149,7 +149,6 @@ Future<void> authenticateClient(const BSONObj& params,
const HostAndPort& hostname,
const std::string& clientName,
RunCommandHook runCommand) {
- std::string mechanism;
auto errorHandler = [](Status status) {
if (serverGlobalParams.transitionToAuth && !status.isA<ErrorCategory::NetworkError>()) {
// If auth failed in transitionToAuth, just pretend it succeeded.
@@ -161,6 +160,8 @@ Future<void> authenticateClient(const BSONObj& params,
return status;
};
+
+ std::string mechanism;
auto response = bsonExtractStringField(params, saslCommandMechanismFieldName, &mechanism);
if (!response.isOK())
return response;
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index f321b956c20..65540e088c4 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -172,7 +172,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
const HostAndPort& source,
const std::string& dbname,
const BSONObj& findCmdObj,
- const CallbackFn& work,
+ CallbackFn work,
const BSONObj& metadata,
Milliseconds findNetworkTimeout,
Milliseconds getMoreNetworkTimeout,
@@ -182,7 +182,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
_dbname(dbname),
_cmdObj(findCmdObj.getOwned()),
_metadata(metadata.getOwned()),
- _work(work),
+ _work(std::move(work)),
_findNetworkTimeout(findNetworkTimeout),
_getMoreNetworkTimeout(getMoreNetworkTimeout),
_firstRemoteCommandScheduler(
@@ -190,7 +190,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, nullptr, _findNetworkTimeout),
[this](const auto& x) { return this->_callback(x, kFirstBatchFieldName); },
std::move(firstCommandRetryPolicy)) {
- uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _work);
}
Fetcher::~Fetcher() {
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index e917e9f3f30..e7672b382f0 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -127,7 +127,7 @@ public:
const HostAndPort& source,
const std::string& dbname,
const BSONObj& cmdObj,
- const CallbackFn& work,
+ CallbackFn work,
const BSONObj& metadata = ReadPreferenceSetting::secondaryPreferredMetadata(),
Milliseconds findNetworkTimeout = RemoteCommandRequest::kNoTimeout,
Milliseconds getMoreNetworkTimeout = RemoteCommandRequest::kNoTimeout,
diff --git a/src/mongo/db/free_mon/free_mon_controller_test.cpp b/src/mongo/db/free_mon/free_mon_controller_test.cpp
index e1d8e3610ac..9d277fe5fb9 100644
--- a/src/mongo/db/free_mon/free_mon_controller_test.cpp
+++ b/src/mongo/db/free_mon/free_mon_controller_test.cpp
@@ -250,7 +250,7 @@ public:
pf.promise.setFrom(doRegister(req));
} else {
auto swSchedule =
- _threadPool->scheduleWork([ sharedPromise = pf.promise.share(), req, this ](
+ _threadPool->scheduleWork([ sharedPromise = std::move(pf.promise), req, this ](
const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
sharedPromise.setWith([&] { return doRegister(req); });
@@ -297,7 +297,7 @@ public:
pf.promise.setFrom(doMetrics(req));
} else {
auto swSchedule =
- _threadPool->scheduleWork([ sharedPromise = pf.promise.share(), req, this ](
+ _threadPool->scheduleWork([ sharedPromise = std::move(pf.promise), req, this ](
const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
sharedPromise.setWith([&] { return doMetrics(req); });
diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp
index 9708a1071ef..38c71e6f543 100644
--- a/src/mongo/db/free_mon/free_mon_mongod.cpp
+++ b/src/mongo/db/free_mon/free_mon_mongod.cpp
@@ -180,20 +180,17 @@ private:
auto pf = makePromiseFuture<DataBuilder>();
std::string url(exportedExportedFreeMonEndpointURL.getLocked() + path.toString());
- auto status = _executor->scheduleWork([
- shared_promise = pf.promise.share(),
- url = std::move(url),
- data = std::move(data),
- this
- ](const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
- ConstDataRange cdr(reinterpret_cast<char*>(data->data()), data->size());
- try {
- auto result = this->_client->post(url, cdr);
- shared_promise.emplaceValue(std::move(result));
- } catch (...) {
- shared_promise.setError(exceptionToStatus());
- }
- });
+ auto status = _executor->scheduleWork(
+ [ promise = std::move(pf.promise), url = std::move(url), data = std::move(data), this ](
+ const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
+ ConstDataRange cdr(reinterpret_cast<char*>(data->data()), data->size());
+ try {
+ auto result = this->_client->post(url, cdr);
+ promise.emplaceValue(std::move(result));
+ } catch (...) {
+ promise.setError(exceptionToStatus());
+ }
+ });
uassertStatusOK(status);
return std::move(pf.future);
diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp
index cf8941a05f3..9967dd8012a 100644
--- a/src/mongo/db/repl/abstract_async_component.cpp
+++ b/src/mongo/db/repl/abstract_async_component.cpp
@@ -166,7 +166,7 @@ Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock(
}
Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock(
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -175,7 +175,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock(
str::stream() << "failed to schedule work " << name << ": " << _componentName
<< " is shutting down");
}
- auto result = _executor->scheduleWork(work);
+ auto result = _executor->scheduleWork(std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(str::stream() << "failed to schedule work " << name);
}
@@ -185,7 +185,7 @@ Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock(
Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock(
Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -196,7 +196,7 @@ Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock(
<< _componentName
<< " is shutting down");
}
- auto result = _executor->scheduleWorkAt(when, work);
+ auto result = _executor->scheduleWorkAt(when, std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(
str::stream() << "failed to schedule work " << name << " at " << when.toString());
diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h
index e7f14121c9a..63fa5db7e46 100644
--- a/src/mongo/db/repl/abstract_async_component.h
+++ b/src/mongo/db/repl/abstract_async_component.h
@@ -148,11 +148,11 @@ protected:
* Saves handle if work was successfully scheduled.
* Returns scheduleWork status (without the handle).
*/
- Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work,
+ Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp
index 691c417ad2b..4dd1183b2b4 100644
--- a/src/mongo/db/repl/abstract_async_component_test.cpp
+++ b/src/mongo/db/repl/abstract_async_component_test.cpp
@@ -65,7 +65,7 @@ public:
* Publicly visible versions of _scheduleWorkAndSaveHandle_inlock() and
* _scheduleWorkAtAndSaveHandle_inlock() for testing.
*/
- Status scheduleWorkAndSaveHandle_forTest(const executor::TaskExecutor::CallbackFn& work,
+ Status scheduleWorkAndSaveHandle_forTest(executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
@@ -73,7 +73,7 @@ public:
* Publicly visible version of _scheduleWorkAtAndSaveHandle_inlock() for testing.
*/
Status scheduleWorkAtAndSaveHandle_forTest(Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
@@ -122,20 +122,20 @@ Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest(const Status
}
Status MockAsyncComponent::scheduleWorkAndSaveHandle_forTest(
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _scheduleWorkAndSaveHandle_inlock(work, handle, name);
+ return _scheduleWorkAndSaveHandle_inlock(std::move(work), handle, name);
}
Status MockAsyncComponent::scheduleWorkAtAndSaveHandle_forTest(
Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _scheduleWorkAtAndSaveHandle_inlock(when, work, handle, name);
+ return _scheduleWorkAtAndSaveHandle_inlock(when, std::move(work), handle, name);
}
void MockAsyncComponent::cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle) {
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 6876e88f5aa..6038ec6f544 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -109,7 +109,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
- const CallbackFn& onCompletion,
+ CallbackFn onCompletion,
StorageInterface* storageInterface,
const int batchSize)
: _executor(executor),
@@ -118,7 +118,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_sourceNss(sourceNss),
_destNss(_sourceNss),
_options(options),
- _onCompletion(onCompletion),
+ _onCompletion(std::move(onCompletion)),
_storageInterface(storageInterface),
_countScheduler(_executor,
RemoteCommandRequest(
@@ -155,9 +155,10 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_indexSpecs(),
_documentsToInsert(),
_dbWorkTaskRunner(_dbWorkThreadPool),
- _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) {
- auto task = [ this, work ](OperationContext * opCtx,
- const Status& status) noexcept->TaskRunner::NextAction {
+ _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) {
+ auto task = [ this, work = std::move(work) ](
+ OperationContext * opCtx,
+ const Status& status) mutable noexcept->TaskRunner::NextAction {
try {
work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx));
} catch (...) {
@@ -165,7 +166,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
}
return TaskRunner::NextAction::kDisposeOperationContext;
};
- _dbWorkTaskRunner.schedule(task);
+ _dbWorkTaskRunner.schedule(std::move(task));
return executor::TaskExecutor::CallbackHandle();
}),
_createClientFn([] { return stdx::make_unique<DBClientConnection>(); }),
@@ -184,7 +185,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
uassert(50953,
"Missing collection UUID in CollectionCloner, collection name: " + sourceNss.ns(),
_options.uuid);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface);
uassert(
50954, "collectionClonerBatchSize must be non-negative.", _collectionClonerBatchSize >= 0);
@@ -292,9 +293,9 @@ void CollectionCloner::waitForDbWorker() {
_dbWorkTaskRunner.join();
}
-void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn) {
+void CollectionCloner::setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) {
LockGuard lk(_mutex);
- _scheduleDbWorkFn = scheduleDbWorkFn;
+ _scheduleDbWorkFn = std::move(scheduleDbWorkFn);
}
void CollectionCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) {
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index c7149970f5e..4ed39cc5512 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -93,8 +93,8 @@ public:
*
* Used for testing only.
*/
- using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
- const executor::TaskExecutor::CallbackFn&)>;
+ using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
/**
* Type of function to create a database client
@@ -117,7 +117,7 @@ public:
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
- const CallbackFn& onCompletion,
+ CallbackFn onCompletion,
StorageInterface* storageInterface,
const int batchSize);
@@ -152,7 +152,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn);
/**
* Allows a different client class to be injected.
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 05c36c41cd6..c17cff0cc60 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -668,12 +668,13 @@ TEST_F(CollectionClonerTest,
// status.
auto exec = &getExecutor();
collectionCloner->setScheduleDbWorkFn_forTest([exec](
- const executor::TaskExecutor::CallbackFn& workFn) {
- auto wrappedTask = [workFn](const executor::TaskExecutor::CallbackArgs& cbd) {
+ executor::TaskExecutor::CallbackFn workFn) {
+ auto wrappedTask = [workFn = std::move(workFn)](
+ const executor::TaskExecutor::CallbackArgs& cbd) {
workFn(executor::TaskExecutor::CallbackArgs(
cbd.executor, cbd.myHandle, Status(ErrorCodes::CallbackCanceled, ""), cbd.opCtx));
};
- return exec->scheduleWork(wrappedTask);
+ return exec->scheduleWork(std::move(wrappedTask));
});
bool collectionCreated = false;
@@ -1177,12 +1178,11 @@ TEST_F(CollectionClonerTest,
// Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet.
executor::TaskExecutor::CallbackFn insertDocumentsFn;
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- insertDocumentsFn = workFn;
- executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
- return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
- });
+ collectionCloner->setScheduleDbWorkFn_forTest([&](executor::TaskExecutor::CallbackFn workFn) {
+ insertDocumentsFn = std::move(workFn);
+ executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
+ return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
+ });
ASSERT_FALSE(insertDocumentsFn);
// Return first batch of collection documents from remote server for the getMore request.
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index a4648bc8e9a..b9c1e23edea 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -116,7 +116,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor,
const ListCollectionsPredicateFn& listCollectionsPred,
StorageInterface* si,
const CollectionCallbackFn& collWork,
- const CallbackFn& onCompletion)
+ CallbackFn onCompletion)
: _executor(executor),
_dbWorkThreadPool(dbWorkThreadPool),
_source(source),
@@ -128,7 +128,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor,
_listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred),
_storageInterface(si),
_collectionWork(collWork),
- _onCompletion(onCompletion),
+ _onCompletion(std::move(onCompletion)),
_listCollectionsFetcher(_executor,
_source,
_dbname,
@@ -152,7 +152,7 @@ DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor,
uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty());
uassert(ErrorCodes::BadValue, "storage interface cannot be null", si);
uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
_stats.dbname = _dbname;
}
@@ -259,7 +259,7 @@ void DatabaseCloner::join() {
_condition.wait(lk, [this]() { return !_isActive_inlock(); });
}
-void DatabaseCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
+void DatabaseCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h
index 0a5b893b294..7b2cd152592 100644
--- a/src/mongo/db/repl/database_cloner.h
+++ b/src/mongo/db/repl/database_cloner.h
@@ -97,6 +97,9 @@ public:
*/
using StartCollectionClonerFn = stdx::function<Status(CollectionCloner&)>;
+ using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
+
/**
* Creates DatabaseCloner task in inactive state. Use start() to activate cloner.
*
@@ -116,7 +119,7 @@ public:
const ListCollectionsPredicateFn& listCollectionsPredicate,
StorageInterface* storageInterface,
const CollectionCallbackFn& collectionWork,
- const CallbackFn& onCompletion);
+ CallbackFn onCompletion);
virtual ~DatabaseCloner();
@@ -146,7 +149,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
/**
* Overrides how executor starts a collection cloner.
@@ -234,7 +237,7 @@ private:
std::vector<NamespaceString> _collectionNamespaces; // (M)
std::list<CollectionCloner> _collectionCloners; // (M)
std::list<CollectionCloner>::iterator _currentCollectionClonerIter; // (M)
- CollectionCloner::ScheduleDbWorkFn
+ ScheduleDbWorkFn
_scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
StartCollectionClonerFn _startCollectionCloner; // (RT)
Stats _stats; // (M) Stats about what this instance did.
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index 65a0e054b9a..83c428340a7 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -104,10 +104,9 @@ void DatabaseClonerTest::setUp() {
storageInterface.get(),
makeCollectionWorkClosure(),
makeSetStatusClosure());
- _databaseCloner->setScheduleDbWorkFn_forTest(
- [this](const executor::TaskExecutor::CallbackFn& work) {
- return getExecutor().scheduleWork(work);
- });
+ _databaseCloner->setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) {
+ return getExecutor().scheduleWork(std::move(work));
+ });
_mockServer = stdx::make_unique<MockRemoteDBServer>(target.toString());
_mockServer->assignCollectionUuid("db.a", *_options1.uuid);
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index 960d84fe960..ac3bc398059 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -227,7 +227,7 @@ Status DatabasesCloner::startup() noexcept {
return Status::OK();
}
-void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
+void DatabasesCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
}
diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h
index 6af3b927229..970abc5da60 100644
--- a/src/mongo/db/repl/databases_cloner.h
+++ b/src/mongo/db/repl/databases_cloner.h
@@ -69,6 +69,8 @@ public:
using IncludeDbFilterFn = stdx::function<bool(const BSONObj& dbInfo)>;
using OnFinishFn = stdx::function<void(const Status&)>;
using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn;
+ using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
@@ -98,7 +100,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
/**
* Overrides how executor starts a collection cloner.
@@ -178,10 +180,10 @@ private:
mutable stdx::mutex _mutex; // (S)
Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything.
executor::TaskExecutor* _exec; // (R) executor to schedule things with
- ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
- const HostAndPort _source; // (R) The source to use.
- CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
- StartCollectionClonerFn _startCollectionClonerFn; // (M)
+ ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
+ const HostAndPort _source; // (R) The source to use.
+ ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
+ StartCollectionClonerFn _startCollectionClonerFn; // (M)
const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned.
OnFinishFn _finishFn; // (M) function called when finished.
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 47bec4f18ce..40a3b39a744 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -134,7 +134,7 @@ public:
getNet()->runReadyNetworkOperations();
if (getNet()->hasReadyRequests()) {
log() << "The network has unexpected requests to process, next req:";
- NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest();
log() << req.getDiagnosticString();
}
ASSERT_FALSE(getNet()->hasReadyRequests());
@@ -301,8 +301,8 @@ protected:
result = status;
cvDone.notify_all();
}};
- cloner.setScheduleDbWorkFn_forTest([this](const executor::TaskExecutor::CallbackFn& work) {
- return getExecutor().scheduleWork(work);
+ cloner.setScheduleDbWorkFn_forTest([this](executor::TaskExecutor::CallbackFn work) {
+ return getExecutor().scheduleWork(std::move(work));
});
cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) {
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 6b68c13a7ea..28d3266ca82 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -378,7 +378,7 @@ BSONObj InitialSyncer::_getInitialSyncProgress_inlock() const {
return bob.obj();
}
-void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
+void InitialSyncer::setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& work) {
LockGuard lk(_mutex);
_scheduleDbWorkFn = work;
}
@@ -1431,7 +1431,7 @@ Status InitialSyncer::_checkForShutdownAndConvertStatus_inlock(const Status& sta
}
Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -1440,7 +1440,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
str::stream() << "failed to schedule work " << name
<< ": initial syncer is shutting down");
}
- auto result = _exec->scheduleWork(work);
+ auto result = _exec->scheduleWork(std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(str::stream() << "failed to schedule work " << name);
}
@@ -1450,7 +1450,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock(
Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name) {
invariant(handle);
@@ -1460,7 +1460,7 @@ Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock(
<< when.toString()
<< ": initial syncer is shutting down");
}
- auto result = _exec->scheduleWorkAt(when, work);
+ auto result = _exec->scheduleWorkAt(when, std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(
str::stream() << "failed to schedule work " << name << " at " << when.toString());
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 0ead52f6f46..929643116c5 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -222,7 +222,7 @@ public:
*
* For testing only.
*/
- void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ void setScheduleDbWorkFn_forTest(const DatabaseCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
/**
* Overrides how executor starts a collection cloner.
@@ -540,11 +540,11 @@ private:
* Saves handle if work was successfully scheduled.
* Returns scheduleWork status (without the handle).
*/
- Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work,
+ Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when,
- const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackFn work,
executor::TaskExecutor::CallbackHandle* handle,
const std::string& name);
@@ -633,8 +633,8 @@ private:
State _state = State::kPreStart; // (M)
// Passed to CollectionCloner via DatabasesCloner.
- CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
- StartCollectionClonerFn _startCollectionClonerFn; // (M)
+ DatabaseCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
+ StartCollectionClonerFn _startCollectionClonerFn; // (M)
// Contains stats on the current initial sync request (includes all attempts).
// To access these stats in a user-readable format, use getInitialSyncProgress().
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 6bc02d4d15a..620d9c5abff 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -215,7 +215,7 @@ public:
getNet()->runReadyNetworkOperations();
if (getNet()->hasReadyRequests()) {
log() << "The network has unexpected requests to process, next req:";
- NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperation& req = *getNet()->getNextReadyRequest();
log() << req.getDiagnosticString();
}
ASSERT_FALSE(getNet()->hasReadyRequests());
@@ -404,8 +404,8 @@ protected:
_onCompletion(lastApplied);
});
_initialSyncer->setScheduleDbWorkFn_forTest(
- [this](const executor::TaskExecutor::CallbackFn& work) {
- return getExecutor().scheduleWork(work);
+ [this](executor::TaskExecutor::CallbackFn work) {
+ return getExecutor().scheduleWork(std::move(work));
});
_initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) {
cloner.setCreateClientFn_forTest([&cloner, this]() {
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp
index 655426a5ce3..e568bb3f8ca 100644
--- a/src/mongo/db/repl/multiapplier.cpp
+++ b/src/mongo/db/repl/multiapplier.cpp
@@ -45,15 +45,15 @@ namespace repl {
MultiApplier::MultiApplier(executor::TaskExecutor* executor,
const Operations& operations,
const MultiApplyFn& multiApply,
- const CallbackFn& onCompletion)
+ CallbackFn onCompletion)
: _executor(executor),
_operations(operations),
_multiApply(multiApply),
- _onCompletion(onCompletion) {
+ _onCompletion(std::move(onCompletion)) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
uassert(ErrorCodes::BadValue, "multi apply function cannot be null", multiApply);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion);
}
MultiApplier::~MultiApplier() {
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
index 3b2a59f7fc8..79fec694b98 100644
--- a/src/mongo/db/repl/multiapplier.h
+++ b/src/mongo/db/repl/multiapplier.h
@@ -67,7 +67,7 @@ public:
/**
* Callback function to report final status of applying operations.
*/
- using CallbackFn = stdx::function<void(const Status&)>;
+ using CallbackFn = unique_function<void(const Status&)>;
using MultiApplyFn =
stdx::function<StatusWith<OpTime>(OperationContext*, MultiApplier::Operations)>;
@@ -88,7 +88,7 @@ public:
MultiApplier(executor::TaskExecutor* executor,
const Operations& operations,
const MultiApplyFn& multiApply,
- const CallbackFn& onCompletion);
+ CallbackFn onCompletion);
/**
* Blocks while applier is active.
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 676d9c1f787..0eb44197c76 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -120,14 +120,14 @@ OplogBuffer* OplogApplier::getBuffer() const {
Future<void> OplogApplier::startup() {
auto pf = makePromiseFuture<void>();
auto callback =
- [ this, promise = pf.promise.share() ](const CallbackArgs& args) mutable noexcept {
+ [ this, promise = std::move(pf.promise) ](const CallbackArgs& args) mutable noexcept {
invariant(args.status);
log() << "Starting oplog application";
_run(_oplogBuffer);
log() << "Finished oplog application";
promise.setWith([] {});
};
- invariant(_executor->scheduleWork(callback).getStatus());
+ invariant(_executor->scheduleWork(std::move(callback)).getStatus());
return std::move(pf.future);
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 18c9ccac0a8..f5b1cbb55db 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -191,9 +191,9 @@ auto makeTaskExecutor(ServiceContext* service, const std::string& poolName) {
* Schedules a task using the executor. This task is always run unless the task executor is shutting
* down.
*/
-void scheduleWork(executor::TaskExecutor* executor,
- const executor::TaskExecutor::CallbackFn& work) {
- auto cbh = executor->scheduleWork([work](const executor::TaskExecutor::CallbackArgs& args) {
+void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) {
+ auto cbh = executor->scheduleWork([work = std::move(work)](
+ const executor::TaskExecutor::CallbackArgs& args) {
if (args.status == ErrorCodes::CallbackCanceled) {
return;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 824db78c888..e151e0b2482 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2726,7 +2726,7 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
}
// Schedule timeout callback.
auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
- auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
+ auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, std::move(timeoutCB));
if (!status.isOK()) {
log() << "Failed to schedule catchup timeout work.";
abort_inlock();
@@ -3512,13 +3512,14 @@ void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
}
}
-CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) {
- auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) {
- if (args.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- work(args);
- });
+CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, CallbackFn work) {
+ auto cbh =
+ _replExecutor->scheduleWorkAt(when, [work = std::move(work)](const CallbackArgs& args) {
+ if (args.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ work(args);
+ });
if (cbh == ErrorCodes::ShutdownInProgress) {
return {};
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index cff7179d1ce..233c0bf88e0 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1034,7 +1034,7 @@ private:
* All other non-shutdown scheduling failures will abort the process.
* Does not run 'work' if callback is canceled.
*/
- CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work);
+ CallbackHandle _scheduleWorkAt(Date_t when, CallbackFn work);
/**
* Creates an event.
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index f11ef6c3d94..e5b7607390d 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -579,7 +579,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp,
TaskExecutorWithFailureInScheduleWork(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWork(
- const CallbackFn& work) override {
+ CallbackFn work) override {
return Status(ErrorCodes::OperationFailed, "failed to schedule work");
}
};
@@ -631,7 +631,7 @@ TEST_F(ReporterTest, FailingToScheduleTimeoutShouldMakeReporterInactive) {
TaskExecutorWithFailureInScheduleWorkAt(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWorkAt(
- Date_t when, const CallbackFn& work) override {
+ Date_t when, CallbackFn work) override {
return Status(ErrorCodes::OperationFailed, "failed to schedule work");
}
};
diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp
index d3e14a7d8f1..727014a5cc2 100644
--- a/src/mongo/db/repl/task_executor_mock.cpp
+++ b/src/mongo/db/repl/task_executor_mock.cpp
@@ -38,25 +38,24 @@ namespace repl {
TaskExecutorMock::TaskExecutorMock(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
-StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork(
- const CallbackFn& work) {
+StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork(CallbackFn work) {
if (shouldFailScheduleWorkRequest()) {
return Status(ErrorCodes::OperationFailed, "failed to schedule work");
}
if (shouldDeferScheduleWorkRequestByOneSecond()) {
auto when = now() + Seconds(1);
- return getExecutor()->scheduleWorkAt(when, work);
+ return getExecutor()->scheduleWorkAt(when, std::move(work));
}
- return getExecutor()->scheduleWork(work);
+ return getExecutor()->scheduleWork(std::move(work));
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWorkAt(
- Date_t when, const CallbackFn& work) {
+ Date_t when, CallbackFn work) {
if (shouldFailScheduleWorkAtRequest()) {
return Status(ErrorCodes::OperationFailed,
str::stream() << "failed to schedule work at " << when.toString());
}
- return getExecutor()->scheduleWorkAt(when, work);
+ return getExecutor()->scheduleWorkAt(when, std::move(work));
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand(
diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h
index 41740b74433..e5e92e0f0e1 100644
--- a/src/mongo/db/repl/task_executor_mock.h
+++ b/src/mongo/db/repl/task_executor_mock.h
@@ -47,8 +47,8 @@ public:
explicit TaskExecutorMock(executor::TaskExecutor* executor);
- StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
StatusWith<CallbackHandle> scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index 94bd65d1d8d..43f0b735789 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -102,12 +102,12 @@ bool TaskRunner::isActive() const {
return _active;
}
-void TaskRunner::schedule(const Task& task) {
+void TaskRunner::schedule(Task task) {
invariant(task);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _tasks.push_back(task);
+ _tasks.push_back(std::move(task));
_condition.notify_all();
if (_active) {
@@ -174,8 +174,8 @@ void TaskRunner::_runTasks() {
tasks.swap(_tasks);
lk.unlock();
// Cancel remaining tasks with a CallbackCanceled status.
- for (auto task : tasks) {
- runSingleTask(task,
+ for (auto&& task : tasks) {
+ runSingleTask(std::move(task),
nullptr,
Status(ErrorCodes::CallbackCanceled,
"this task has been canceled by a previously invoked task"));
@@ -207,46 +207,10 @@ TaskRunner::Task TaskRunner::_waitForNextTask() {
return Task();
}
- Task task = _tasks.front();
+ Task task = std::move(_tasks.front());
_tasks.pop_front();
return task;
}
-Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextAction nextAction) {
- // Setup cond_var for signaling when done.
- bool done = false;
- stdx::mutex mutex;
- stdx::condition_variable waitTillDoneCond;
-
- Status returnStatus{Status::OK()};
- this->schedule([&](OperationContext* opCtx, const Status taskStatus) {
- if (!taskStatus.isOK()) {
- returnStatus = taskStatus;
- } else {
- // Run supplied function.
- try {
- returnStatus = func(opCtx);
- } catch (...) {
- returnStatus = exceptionToStatus();
- error() << "Exception thrown in runSynchronousTask: " << redact(returnStatus);
- }
- }
-
- // Signal done.
- LockGuard lk2{mutex};
- done = true;
- waitTillDoneCond.notify_all();
-
- // return nextAction based on status from supplied function.
- if (returnStatus.isOK()) {
- return nextAction;
- }
- return TaskRunner::NextAction::kCancel;
- });
-
- UniqueLock lk{mutex};
- waitTillDoneCond.wait(lk, [&done] { return done; });
- return returnStatus;
-}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h
index 6875229bb9e..85313cc4ce9 100644
--- a/src/mongo/db/repl/task_runner.h
+++ b/src/mongo/db/repl/task_runner.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/functional.h"
namespace mongo {
@@ -60,17 +61,7 @@ public:
kCancel = 3,
};
- using Task = stdx::function<NextAction(OperationContext*, const Status&)>;
- using SynchronousTask = stdx::function<Status(OperationContext* opCtx)>;
-
- /**
- * Returns the Status from the supplied function after running it..
- *
- * Note: TaskRunner::NextAction controls when the operation context and thread will be released.
- */
- Status runSynchronousTask(
- SynchronousTask func,
- TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext);
+ using Task = unique_function<NextAction(OperationContext*, const Status&)>;
/**
* Creates a Task returning kCancel. This is useful in shutting down the task runner after
@@ -126,7 +117,7 @@ public:
* immediately. This is usually the case when the task runner is canceled. Accessing the
* operation context in the task will result in undefined behavior.
*/
- void schedule(const Task& task);
+ void schedule(Task task);
/**
* If there is a task that is already running, allows the task to run to completion.
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp
index b01885ba332..17771287f90 100644
--- a/src/mongo/db/repl/task_runner_test.cpp
+++ b/src/mongo/db/repl/task_runner_test.cpp
@@ -83,7 +83,7 @@ using OpIdVector = std::vector<unsigned int>;
OpIdVector _testRunTaskTwice(TaskRunnerTest& test,
TaskRunner::NextAction nextAction,
- stdx::function<void(const Task& task)> schedule) {
+ unique_function<void(Task task)> schedule) {
unittest::Barrier barrier(2U);
stdx::mutex mutex;
std::vector<OperationContext*> txns;
@@ -121,7 +121,7 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test,
std::vector<unsigned int> _testRunTaskTwice(TaskRunnerTest& test,
TaskRunner::NextAction nextAction) {
- auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); };
+ auto schedule = [&](Task task) { test.getTaskRunner().schedule(std::move(task)); };
return _testRunTaskTwice(test, nextAction, schedule);
}
@@ -134,9 +134,9 @@ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) {
// Joining thread pool before scheduling second task ensures that task runner releases
// thread back to pool after disposing of operation context.
TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) {
- auto schedule = [this](const Task& task) {
+ auto schedule = [this](Task task) {
getThreadPool().waitForIdle();
- getTaskRunner().schedule(task);
+ getTaskRunner().schedule(std::move(task));
};
auto txnId =
_testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule);
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 63445075780..a6438e3cf7e 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -179,7 +179,7 @@ private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>;
- using Request = std::pair<Date_t, SharedPromise<ConnectionHandle>>;
+ using Request = std::pair<Date_t, Promise<ConnectionHandle>>;
struct RequestComparator {
bool operator()(const Request& a, const Request& b) {
return a.first > b.first;
@@ -461,7 +461,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
const auto expiration = _parent->_factory->now() + timeout;
auto pf = makePromiseFuture<ConnectionHandle>();
- _requests.push_back(make_pair(expiration, pf.promise.share()));
+ _requests.push_back(make_pair(expiration, std::move(pf.promise)));
std::push_heap(begin(_requests), end(_requests), RequestComparator{});
updateStateInLock();
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 97320d12bf0..c55a7925bdc 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/transport/baton.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/functional.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -57,7 +58,7 @@ class NetworkInterface {
public:
using Response = RemoteCommandResponse;
- using RemoteCommandCompletionFn = stdx::function<void(const TaskExecutor::ResponseStatus&)>;
+ using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseStatus&)>;
virtual ~NetworkInterface();
@@ -145,7 +146,7 @@ public:
*/
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton = nullptr) = 0;
Future<TaskExecutor::ResponseStatus> startCommand(
@@ -154,13 +155,13 @@ public:
const transport::BatonHandle& baton = nullptr) {
auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>();
- auto status =
- startCommand(cbHandle,
- request,
- [sp = pf.promise.share()](const TaskExecutor::ResponseStatus& rs) mutable {
- sp.emplaceValue(rs);
- },
- baton);
+ auto status = startCommand(
+ cbHandle,
+ request,
+ [p = std::move(pf.promise)](const TaskExecutor::ResponseStatus& rs) mutable {
+ p.emplaceValue(rs);
+ },
+ baton);
if (!status.isOK()) {
return status;
@@ -190,7 +191,7 @@ public:
* return true. See that method for why.
*/
virtual Status setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton = nullptr) = 0;
/**
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index bd6cd9bf41e..f43af8bd17a 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -323,9 +323,10 @@ TEST_F(NetworkInterfaceTest, SetAlarm) {
Date_t expiration = net().now() + Milliseconds(100);
auto makeTimerFuture = [&] {
auto pf = makePromiseFuture<Date_t>();
- return std::make_pair(
- [ this, promise = pf.promise.share() ]() mutable { promise.emplaceValue(net().now()); },
- std::move(pf.future));
+ return std::make_pair([ this, promise = std::move(pf.promise) ]() mutable {
+ promise.emplaceValue(net().now());
+ },
+ std::move(pf.future));
};
auto futurePair = makeTimerFuture();
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 258c531707e..242e81126f6 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -115,7 +115,7 @@ std::string NetworkInterfaceMock::getHostName() {
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
@@ -124,7 +124,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
stdx::lock_guard<stdx::mutex> lk(_mutex);
const Date_t now = _now_inlock();
- auto op = NetworkOperation(cbHandle, request, now, onFinish);
+ auto op = NetworkOperation(cbHandle, request, now, std::move(onFinish));
// If we don't have a hook, or we have already 'connected' to this host, enqueue the op.
if (!_hook || _connections.count(request.target)) {
@@ -180,7 +180,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock(
}
Status NetworkInterfaceMock::setAlarm(const Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
@@ -193,7 +193,7 @@ Status NetworkInterfaceMock::setAlarm(const Date_t when,
action();
return Status::OK();
}
- _alarms.emplace(when, action);
+ _alarms.emplace(when, std::move(action));
return Status::OK();
}
@@ -460,16 +460,21 @@ void NetworkInterfaceMock::_enqueueOperation_inlock(
return a.getNextConsiderationDate() < b.getNextConsiderationDate();
});
+ const auto timeout = op.getRequest().timeout;
+ auto cbh = op.getCallbackHandle();
+
_unscheduled.emplace(insertBefore, std::move(op));
- if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) {
- invariant(op.getRequest().timeout >= Milliseconds(0));
+ if (timeout != RemoteCommandRequest::kNoTimeout) {
+ invariant(timeout >= Milliseconds(0));
ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0));
std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_scheduled};
- auto action = [ =, cbh = op.getCallbackHandle() ] {
- _interruptWithResponse_inlock(cbh, queuesToCheck, rs);
- };
- _alarms.emplace(_now_inlock() + op.getRequest().timeout, action);
+ _alarms.emplace(_now_inlock() + timeout, [
+ this,
+ cbh = std::move(cbh),
+ queuesToCheck = std::move(queuesToCheck),
+ rs = std::move(rs)
+ ] { _interruptWithResponse_inlock(cbh, queuesToCheck, rs); });
}
}
@@ -504,13 +509,14 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
if (!hookPostconnectCommand) {
// If we don't have a post connect command, enqueue the actual command.
- _enqueueOperation_inlock(std::move(op));
_connections.emplace(op.getRequest().target);
+ _enqueueOperation_inlock(std::move(op));
return;
}
+ auto cbh = op.getCallbackHandle();
// The completion handler for the postconnect command schedules the original command.
- auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable {
+ auto postconnectCompletionHandler = [ this, op = std::move(op) ](ResponseStatus rs) mutable {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!rs.isOK()) {
op.setResponse(_now_inlock(), rs);
@@ -526,11 +532,11 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
return;
}
- _enqueueOperation_inlock(std::move(op));
_connections.emplace(op.getRequest().target);
+ _enqueueOperation_inlock(std::move(op));
};
- auto postconnectOp = NetworkOperation(op.getCallbackHandle(),
+ auto postconnectOp = NetworkOperation(cbh,
std::move(*hookPostconnectCommand),
_now_inlock(),
std::move(postconnectCompletionHandler));
@@ -563,7 +569,7 @@ void NetworkInterfaceMock::signalWorkAvailable() {
void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) {
while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) {
- auto fn = _alarms.top().action;
+ auto fn = std::move(_alarms.top().action);
_alarms.pop();
lk->unlock();
fn();
@@ -571,7 +577,7 @@ void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<s
}
while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
invariant(_currentlyRunning == kNetworkThread);
- NetworkOperation op = _scheduled.front();
+ NetworkOperation op = std::move(_scheduled.front());
_scheduled.pop_front();
_waitingToRunMask |= kExecutorThread;
lk->unlock();
@@ -637,16 +643,14 @@ NetworkInterfaceMock::NetworkOperation::NetworkOperation()
NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle,
const RemoteCommandRequest& theRequest,
Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish)
+ RemoteCommandCompletionFn onFinish)
: _requestDate(theRequestDate),
_nextConsiderationDate(theRequestDate),
_responseDate(),
_cbHandle(cbHandle),
_request(theRequest),
_response(kUnsetResponse),
- _onFinish(onFinish) {}
-
-NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
+ _onFinish(std::move(onFinish)) {}
std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const {
return str::stream() << "NetworkOperation -- request:'" << _request.toString()
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index f87b18c27b3..7023101fd59 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -113,7 +113,7 @@ public:
virtual std::string getHostName();
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton = nullptr);
/**
@@ -129,7 +129,7 @@ public:
* Not implemented.
*/
virtual Status setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton = nullptr);
virtual bool onNetworkThread();
@@ -284,7 +284,7 @@ private:
* Information describing a scheduled alarm.
*/
struct AlarmInfo {
- using AlarmAction = stdx::function<void()>;
+ using AlarmAction = unique_function<void()>;
AlarmInfo(Date_t inWhen, AlarmAction inAction)
: when(inWhen), action(std::move(inAction)) {}
bool operator>(const AlarmInfo& rhs) const {
@@ -292,7 +292,7 @@ private:
}
Date_t when;
- AlarmAction action;
+ mutable AlarmAction action;
};
/**
@@ -435,8 +435,7 @@ public:
NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& theRequest,
Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish);
- ~NetworkOperation();
+ RemoteCommandCompletionFn onFinish);
/**
* Adjusts the stored virtual time at which this entry will be subject to consideration
@@ -556,8 +555,8 @@ public:
Date_t now() override {
return _net->now();
}
- Status setAlarm(Date_t when, stdx::function<void()> action) override {
- return _net->setAlarm(when, action);
+ Status setAlarm(Date_t when, unique_function<void()> action) override {
+ return _net->setAlarm(when, std::move(action));
}
private:
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 74681c42308..a395b1f48bb 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -172,7 +172,7 @@ Date_t NetworkInterfaceTL::now() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
@@ -205,7 +205,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) {
log() << "Discarding command due to failpoint before acquireConn";
- std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) {
+ std::move(pf.future).getAsync([onFinish = std::move(onFinish)](
+ StatusWith<RemoteCommandResponse> response) mutable {
onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0}));
});
return Status::OK();
@@ -237,8 +238,9 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
});
});
- auto remainingWork = [ this, state, future = std::move(pf.future), baton, onFinish ](
- StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ auto remainingWork =
+ [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
return _onAcquireConn(
state, std::move(future), std::move(*uassertStatusOK(swConn)), baton);
@@ -251,7 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
}
return error;
})
- .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
+ .getAsync([ this, state, onFinish = std::move(onFinish) ](
+ StatusWith<RemoteCommandResponse> response) {
auto duration = now() - state->start;
if (!response.isOK()) {
onFinish(RemoteCommandResponse(response.getStatus(), duration));
@@ -430,7 +433,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
}
Status NetworkInterfaceTL::setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
@@ -454,37 +457,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when,
}
alarmTimer->waitUntil(when, baton)
- .getAsync([this, weakTimer, action, when, baton](Status status) {
- auto alarmTimer = weakTimer.lock();
- if (!alarmTimer) {
- return;
- } else {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgressAlarms.erase(alarmTimer);
- }
-
- auto nowVal = now();
- if (nowVal < when) {
- warning() << "Alarm returned early. Expected at: " << when
- << ", fired at: " << nowVal;
- const auto status = setAlarm(when, std::move(action), baton);
- if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(50785, status);
+ .getAsync(
+ [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
}
- return;
- }
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when
+ << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action), baton);
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
+ }
- if (status.isOK()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ return;
}
- } else if (status != ErrorCodes::CallbackCanceled) {
- warning() << "setAlarm() received an error: " << status;
- }
- });
+
+ if (status.isOK()) {
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
return Status::OK();
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 048aaba7f5b..1dbcc7a7678 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -67,13 +67,13 @@ public:
Date_t now() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const transport::BatonHandle& baton) override;
Status setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton) override;
bool onNetworkThread() override;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 3c845a18824..e8cff8ee0f6 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -94,7 +94,7 @@ public:
* the callback was canceled for any reason (including shutdown). Otherwise, it should have
* Status::OK().
*/
- using CallbackFn = stdx::function<void(const CallbackArgs&)>;
+ using CallbackFn = unique_function<void(const CallbackArgs&)>;
/**
* Type of a callback from a request to run a command on a remote MongoDB node.
@@ -175,8 +175,7 @@ public:
*
* May be called by client threads or callbacks running in the executor.
*/
- virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
- const CallbackFn& work) = 0;
+ virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) = 0;
/**
* Blocks the calling thread until "event" is signaled. Also returns if the event is never
@@ -209,7 +208,7 @@ public:
* Contract: Implementations should guarantee that callback should be called *after* doing any
* processing related to the callback.
*/
- virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0;
+ virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) = 0;
/**
* Schedules "work" to be run by the executor no sooner than "when".
@@ -224,7 +223,7 @@ public:
* Contract: Implementations should guarantee that callback should be called *after* doing any
* processing related to the callback.
*/
- virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0;
+ virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) = 0;
/**
* Schedules "cb" to be run by the executor with the result of executing the remote command
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index a60b115b923..42373c2b47b 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -307,13 +307,14 @@ void EventChainAndWaitingTest::onGo(const TaskExecutor::CallbackArgs& cbData) {
return;
}
triggerEvent = errorOrTriggerEvent.getValue();
- StatusWith<TaskExecutor::CallbackHandle> cbHandle = executor->onEvent(triggerEvent, triggered2);
+ StatusWith<TaskExecutor::CallbackHandle> cbHandle =
+ executor->onEvent(triggerEvent, std::move(triggered2));
if (!cbHandle.isOK()) {
status1 = cbHandle.getStatus();
executor->shutdown();
return;
}
- cbHandle = executor->onEvent(triggerEvent, triggered3);
+ cbHandle = executor->onEvent(triggerEvent, std::move(triggered3));
if (!cbHandle.isOK()) {
status1 = cbHandle.getStatus();
executor->shutdown();
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
index 5520de68396..8d596b3d3c5 100644
--- a/src/mongo/executor/thread_pool_mock.h
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -34,6 +34,7 @@
#include <vector>
#include "mongo/platform/random.h"
+#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index d48a0888bc6..94e547b87f3 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -274,11 +274,11 @@ void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) {
}
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event,
- const CallbackFn& work) {
+ CallbackFn work) {
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
- auto wq = makeSingletonWorkQueue(work, nullptr);
+ auto wq = makeSingletonWorkQueue(std::move(work), nullptr);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
@@ -323,9 +323,8 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
}
}
-StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
- const CallbackFn& work) {
- auto wq = makeSingletonWorkQueue(work, nullptr);
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(CallbackFn work) {
+ auto wq = makeSingletonWorkQueue(std::move(work), nullptr);
WorkQueue temp;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
@@ -336,12 +335,12 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
return cbHandle;
}
-StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
- Date_t when, const CallbackFn& work) {
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(Date_t when,
+ CallbackFn work) {
if (when <= now()) {
- return scheduleWork(work);
+ return scheduleWork(std::move(work));
}
- auto wq = makeSingletonWorkQueue(work, nullptr, when);
+ auto wq = makeSingletonWorkQueue(std::move(work), nullptr, when);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 0fd821baa95..5163ad9d10d 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -75,13 +75,13 @@ public:
Date_t now() override;
StatusWith<EventHandle> makeEvent() override;
void signalEvent(const EventHandle& event) override;
- StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override;
StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
const EventHandle& event,
Date_t deadline) override;
void waitForEvent(const EventHandle& event) override;
- StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
StatusWith<CallbackHandle> scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp
index dd9bde0518a..d11de192011 100644
--- a/src/mongo/s/sharding_task_executor.cpp
+++ b/src/mongo/s/sharding_task_executor.cpp
@@ -89,8 +89,8 @@ void ShardingTaskExecutor::signalEvent(const EventHandle& event) {
}
StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::onEvent(const EventHandle& event,
- const CallbackFn& work) {
- return _executor->onEvent(event, work);
+ CallbackFn work) {
+ return _executor->onEvent(event, std::move(work));
}
void ShardingTaskExecutor::waitForEvent(const EventHandle& event) {
@@ -103,14 +103,13 @@ StatusWith<stdx::cv_status> ShardingTaskExecutor::waitForEvent(OperationContext*
return _executor->waitForEvent(opCtx, event, deadline);
}
-StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork(
- const CallbackFn& work) {
- return _executor->scheduleWork(work);
+StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork(CallbackFn work) {
+ return _executor->scheduleWork(std::move(work));
}
-StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(
- Date_t when, const CallbackFn& work) {
- return _executor->scheduleWorkAt(when, work);
+StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(Date_t when,
+ CallbackFn work) {
+ return _executor->scheduleWorkAt(when, std::move(work));
}
StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand(
diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h
index 9e018f497aa..3a0df5fb57e 100644
--- a/src/mongo/s/sharding_task_executor.h
+++ b/src/mongo/s/sharding_task_executor.h
@@ -62,13 +62,13 @@ public:
Date_t now() override;
StatusWith<EventHandle> makeEvent() override;
void signalEvent(const EventHandle& event) override;
- StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override;
void waitForEvent(const EventHandle& event) override;
StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
const EventHandle& event,
Date_t deadline) override;
- StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
StatusWith<CallbackHandle> scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
diff --git a/src/mongo/transport/asio_utils.h b/src/mongo/transport/asio_utils.h
index a48a1a7043b..2a8a2310a6b 100644
--- a/src/mongo/transport/asio_utils.h
+++ b/src/mongo/transport/asio_utils.h
@@ -350,7 +350,7 @@ struct AsyncHandlerHelper {
template <>
struct AsyncHandlerHelper<> {
using Result = void;
- static void complete(SharedPromise<Result>* promise) {
+ static void complete(Promise<Result>* promise) {
promise->emplaceValue();
}
};
@@ -358,7 +358,7 @@ struct AsyncHandlerHelper<> {
template <typename Arg>
struct AsyncHandlerHelper<Arg> {
using Result = Arg;
- static void complete(SharedPromise<Result>* promise, Arg arg) {
+ static void complete(Promise<Result>* promise, Arg arg) {
promise->emplaceValue(arg);
}
};
@@ -369,7 +369,7 @@ struct AsyncHandlerHelper<std::error_code, Args...> {
using Result = typename Helper::Result;
template <typename... Args2>
- static void complete(SharedPromise<Result>* promise, std::error_code ec, Args2&&... args) {
+ static void complete(Promise<Result>* promise, std::error_code ec, Args2&&... args) {
if (ec) {
promise->setError(errorCodeToStatus(ec));
} else {
@@ -381,7 +381,7 @@ struct AsyncHandlerHelper<std::error_code, Args...> {
template <>
struct AsyncHandlerHelper<std::error_code> {
using Result = void;
- static void complete(SharedPromise<Result>* promise, std::error_code ec) {
+ static void complete(Promise<Result>* promise, std::error_code ec) {
if (ec) {
promise->setError(errorCodeToStatus(ec));
} else {
@@ -402,7 +402,7 @@ struct AsyncHandler {
Helper::complete(&promise, std::forward<Args2>(args)...);
}
- SharedPromise<Result> promise;
+ Promise<Result> promise;
};
template <typename... Args>
@@ -414,7 +414,7 @@ struct AsyncResult {
explicit AsyncResult(completion_handler_type& handler) {
auto pf = makePromiseFuture<RealResult>();
fut = std::move(pf.future);
- handler.promise = pf.promise.share();
+ handler.promise = std::move(pf.promise);
}
auto get() {
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index 6440b7d4e39..9dbf570ede2 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -75,8 +75,8 @@ public:
Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
- schedule([ cb = std::forward<Callback>(cb), sp = pf.promise.share() ]() mutable {
- sp.setWith(std::move(cb));
+ schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
+ p.setWith(std::move(cb));
});
return std::move(pf.future);
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index f96e73b67a3..e8dcf79deeb 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -150,8 +150,8 @@ public:
auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
auto pf = makePromiseFuture<void>();
- _safeExecute([ fd, type, sp = pf.promise.share(), this ] {
- _sessions[fd] = TransportSession{type, sp};
+ _safeExecute([ fd, type, promise = std::move(pf.promise), this ]() mutable {
+ _sessions[fd] = TransportSession{type, std::move(promise)};
});
return std::move(pf.future);
@@ -159,13 +159,14 @@ public:
Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
auto pf = makePromiseFuture<void>();
- _safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] {
- auto pair = _timers.insert({
- timerPtr, expiration, sp,
+ _safeExecute(
+ [ timerPtr = &timer, expiration, promise = std::move(pf.promise), this ]() mutable {
+ auto pair = _timers.insert({
+ timerPtr, expiration, std::move(promise),
+ });
+ invariant(pair.second);
+ _timersById[pair.first->id] = pair.first;
});
- invariant(pair.second);
- _timersById[pair.first->id] = pair.first;
- });
return std::move(pf.future);
}
@@ -244,7 +245,7 @@ public:
}
void run(ClockSource* clkSource) noexcept override {
- std::vector<SharedPromise<void>> toFulfill;
+ std::vector<Promise<void>> toFulfill;
// We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks
const auto guard = MakeGuard([&] {
@@ -368,7 +369,7 @@ private:
struct Timer {
const ReactorTimer* id;
Date_t expiration;
- SharedPromise<void> promise;
+ mutable Promise<void> promise; // Needs to be mutable to move from it while in std::set.
struct LessThan {
bool operator()(const Timer& lhs, const Timer& rhs) const {
@@ -379,7 +380,7 @@ private:
struct TransportSession {
Type type;
- SharedPromise<void> promise;
+ Promise<void> promise;
};
template <typename Callback>
@@ -394,7 +395,7 @@ private:
template <typename Callback>
void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
if (_inPoll) {
- _scheduled.push_back([cb, this] {
+ _scheduled.push_back([ cb = std::forward<Callback>(cb), this ]() mutable {
stdx::lock_guard<stdx::mutex> lk(_mutex);
cb();
});
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index f13b8ec5c5f..f6c66ec98b7 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -130,9 +130,9 @@ public:
void schedule(ScheduleMode mode, Task task) final {
if (mode == kDispatch) {
- _ioContext.dispatch(std::move(task));
+ asio::dispatch(_ioContext, std::move(task));
} else {
- _ioContext.post(std::move(task));
+ asio::post(_ioContext, std::move(task));
}
}
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index d41d48a8658..ca1453cb3a4 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/stdx/functional.h"
#include "mongo/transport/session.h"
+#include "mongo/util/functional.h"
#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
@@ -158,7 +159,7 @@ public:
virtual void stop() = 0;
virtual void drain() = 0;
- using Task = stdx::function<void()>;
+ using Task = unique_function<void()>;
enum ScheduleMode { kDispatch, kPost };
virtual void schedule(ScheduleMode mode, Task task) = 0;
@@ -166,8 +167,8 @@ public:
template <typename Callback>
Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
- schedule(kPost, [ cb = std::forward<Callback>(cb), sp = pf.promise.share() ]() mutable {
- sp.setWith(cb);
+ schedule(kPost, [ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
+ p.setWith(cb);
});
return std::move(pf.future);
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 800263a4485..c41c5d6be68 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -120,11 +120,11 @@ private:
cancel(baton);
auto pf = makePromiseFuture<void>();
- armTimer().getAsync([sp = pf.promise.share()](Status status) mutable {
+ armTimer().getAsync([p = std::move(pf.promise)](Status status) mutable {
if (status.isOK()) {
- sp.emplaceValue();
+ p.emplaceValue();
} else {
- sp.setError(status);
+ p.setError(status);
}
});
@@ -182,9 +182,9 @@ public:
void schedule(ScheduleMode mode, Task task) override {
if (mode == kDispatch) {
- _ioContext.dispatch(std::move(task));
+ asio::dispatch(_ioContext, std::move(task));
} else {
- _ioContext.post(std::move(task));
+ asio::post(_ioContext, std::move(task));
}
}
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
index 9860c968102..2cf2886afa0 100644
--- a/src/mongo/unittest/task_executor_proxy.cpp
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -76,8 +76,8 @@ void TaskExecutorProxy::signalEvent(const EventHandle& event) {
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::onEvent(
- const EventHandle& event, const CallbackFn& work) {
- return _executor->onEvent(event, work);
+ const EventHandle& event, CallbackFn work) {
+ return _executor->onEvent(event, std::move(work));
}
void TaskExecutorProxy::waitForEvent(const EventHandle& event) {
@@ -91,13 +91,13 @@ StatusWith<stdx::cv_status> TaskExecutorProxy::waitForEvent(OperationContext* op
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWork(
- const CallbackFn& work) {
- return _executor->scheduleWork(work);
+ CallbackFn work) {
+ return _executor->scheduleWork(std::move(work));
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWorkAt(
- Date_t when, const CallbackFn& work) {
- return _executor->scheduleWorkAt(when, work);
+ Date_t when, CallbackFn work) {
+ return _executor->scheduleWorkAt(when, std::move(work));
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand(
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
index 2087ed1e6ad..e03e2a6955e 100644
--- a/src/mongo/unittest/task_executor_proxy.h
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -59,14 +59,13 @@ public:
virtual Date_t now() override;
virtual StatusWith<EventHandle> makeEvent() override;
virtual void signalEvent(const EventHandle& event) override;
- virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
- const CallbackFn& work) override;
+ virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override;
virtual void waitForEvent(const EventHandle& event) override;
virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
const EventHandle& event,
Date_t deadline) override;
- virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
- virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
+ virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
diff --git a/src/mongo/util/background_thread_clock_source.cpp b/src/mongo/util/background_thread_clock_source.cpp
index 4233b016477..89df9e7e970 100644
--- a/src/mongo/util/background_thread_clock_source.cpp
+++ b/src/mongo/util/background_thread_clock_source.cpp
@@ -66,8 +66,8 @@ Milliseconds BackgroundThreadClockSource::getPrecision() {
return _granularity;
}
-Status BackgroundThreadClockSource::setAlarm(Date_t when, stdx::function<void()> action) {
- return _clockSource->setAlarm(when, action);
+Status BackgroundThreadClockSource::setAlarm(Date_t when, unique_function<void()> action) {
+ return _clockSource->setAlarm(when, std::move(action));
}
Date_t BackgroundThreadClockSource::now() {
diff --git a/src/mongo/util/background_thread_clock_source.h b/src/mongo/util/background_thread_clock_source.h
index a251ba4f2ce..6bfb5fa8f92 100644
--- a/src/mongo/util/background_thread_clock_source.h
+++ b/src/mongo/util/background_thread_clock_source.h
@@ -58,7 +58,7 @@ public:
~BackgroundThreadClockSource() override;
Milliseconds getPrecision() override;
Date_t now() override;
- Status setAlarm(Date_t when, stdx::function<void()> action) override;
+ Status setAlarm(Date_t when, unique_function<void()> action) override;
/**
* Doesn't count as a call to now() for determining whether this ClockSource is idle.
diff --git a/src/mongo/util/clock_source.h b/src/mongo/util/clock_source.h
index e8fc2d06d4c..4043c166dbf 100644
--- a/src/mongo/util/clock_source.h
+++ b/src/mongo/util/clock_source.h
@@ -33,8 +33,8 @@
#include <type_traits>
#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/functional.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -74,7 +74,7 @@ public:
* Returns InternalError if this clock source does not implement setAlarm. May also
* return ShutdownInProgress during shutdown. Other errors are also allowed.
*/
- virtual Status setAlarm(Date_t when, stdx::function<void()> action) {
+ virtual Status setAlarm(Date_t when, unique_function<void()> action) {
return {ErrorCodes::InternalError, "This clock source does not implement setAlarm."};
}
diff --git a/src/mongo/util/clock_source_mock.cpp b/src/mongo/util/clock_source_mock.cpp
index ad4aa90802c..3a84e43932c 100644
--- a/src/mongo/util/clock_source_mock.cpp
+++ b/src/mongo/util/clock_source_mock.cpp
@@ -57,7 +57,7 @@ void ClockSourceMock::reset(Date_t newNow) {
_processAlarms(std::move(lk));
}
-Status ClockSourceMock::setAlarm(Date_t when, stdx::function<void()> action) {
+Status ClockSourceMock::setAlarm(Date_t when, unique_function<void()> action) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (when <= _now) {
lk.unlock();
diff --git a/src/mongo/util/clock_source_mock.h b/src/mongo/util/clock_source_mock.h
index a257775bb7c..ac220f912cd 100644
--- a/src/mongo/util/clock_source_mock.h
+++ b/src/mongo/util/clock_source_mock.h
@@ -54,7 +54,7 @@ public:
Milliseconds getPrecision() override;
Date_t now() override;
- Status setAlarm(Date_t when, stdx::function<void()> action) override;
+ Status setAlarm(Date_t when, unique_function<void()> action) override;
/**
* Advances the current time by the given value.
@@ -67,7 +67,7 @@ public:
void reset(Date_t newNow);
private:
- using Alarm = std::pair<Date_t, stdx::function<void()>>;
+ using Alarm = std::pair<Date_t, unique_function<void()>>;
void _processAlarms(stdx::unique_lock<stdx::mutex> lk);
stdx::mutex _mutex;
@@ -106,7 +106,7 @@ public:
return _source->now();
}
- Status setAlarm(Date_t when, stdx::function<void()> action) override {
+ Status setAlarm(Date_t when, unique_function<void()> action) override {
return _source->setAlarm(when, std::move(action));
}
diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h
index 6bca39c3aaa..e558a8cf701 100644
--- a/src/mongo/util/concurrency/thread_pool_interface.h
+++ b/src/mongo/util/concurrency/thread_pool_interface.h
@@ -31,7 +31,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
-#include "mongo/stdx/functional.h"
+#include "mongo/util/functional.h"
namespace mongo {
@@ -44,7 +44,7 @@ class ThreadPoolInterface {
MONGO_DISALLOW_COPYING(ThreadPoolInterface);
public:
- using Task = stdx::function<void()>;
+ using Task = unique_function<void()>;
/**
* Destroys a thread pool.
diff --git a/src/mongo/util/functional.h b/src/mongo/util/functional.h
index 31e57d0868a..975e974e762 100644
--- a/src/mongo/util/functional.h
+++ b/src/mongo/util/functional.h
@@ -153,7 +153,7 @@ private:
template <typename Functor>
static auto makeImpl(Functor&& functor) {
struct SpecificImpl : Impl {
- explicit SpecificImpl(Functor&& func) : f(std::move(func)) {}
+ explicit SpecificImpl(Functor&& func) : f(std::forward<Functor>(func)) {}
RetType call(Args&&... args) override {
return callRegularVoid(std::is_void<RetType>(), f, std::forward<Args>(args)...);
@@ -162,7 +162,7 @@ private:
std::decay_t<Functor> f;
};
- return std::make_unique<SpecificImpl>(std::move(functor));
+ return std::make_unique<SpecificImpl>(std::forward<Functor>(functor));
}
std::unique_ptr<Impl> impl;
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index 855ea76416d..04d154502e3 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -52,9 +52,6 @@
namespace mongo {
-template <typename T>
-class SharedPromise;
-
namespace future_details {
template <typename T>
class Promise;
@@ -482,10 +479,8 @@ using future_details::Future;
* destroyed, a error will be set with ErrorCode::BrokenPromise. This should generally be considered
* a programmer error, and should not be relied upon. We may make it debug-fatal in the future.
*
- * Only one thread can use a given Promise at a time. It is legal to have different threads setting
- * the value/error and extracting the Future, but it is the user's responsibility to ensure that
- * those calls are strictly synchronized. This is usually easiest to achieve by calling
- * makePromiseFuture<T>() then passing a SharedPromise to the completing threads.
+ * Only one thread can use a given Promise at a time, but another thread may be using the associated
+ * Future object.
*
* If the result is ready when producing the Future, it is more efficient to use
* makeReadyFutureWith() or Future<T>::makeReady() than to use a Promise<T>.
@@ -569,18 +564,6 @@ public:
});
}
- /**
- * Get a copyable SharedPromise that can be used to complete this Promise's Future.
- *
- * Callers are required to extract the Future before calling share() to prevent race conditions.
- * Even with a SharedPromise, callers must ensure it is only completed at most once. Copyability
- * is primarily to allow capturing lambdas to be put in std::functions which don't support
- * move-only types.
- *
- * It is safe to destroy the original Promise as soon as this call returns.
- */
- SharedPromise<T> share() noexcept;
-
static auto makePromiseFutureImpl() {
struct PromiseAndFuture {
Promise<T> promise{make_intrusive<SharedState<T>>()};
@@ -621,54 +604,6 @@ private:
};
/**
- * A SharedPromise is a copyable object that can be used to complete a Promise.
- *
- * All copies derived from the same call to Promise::share() will complete the same shared state.
- * Callers must ensure that the shared state is only completed at most once. Copyability is
- * primarily to allow capturing lambdas to be put in std::functions which don't support move-only
- * types. If the final derived SharedPromise is destroyed without completion, the Promise will be
- * broken.
- *
- * All methods behave the same as on the underlying Promise.
- */
-template <typename T>
-class SharedPromise {
-public:
- SharedPromise() = default;
-
- template <typename Func>
- void setWith(Func&& func) noexcept {
- _promise->setWith(std::forward<Func>(func));
- }
-
- void setFrom(Future<T>&& future) noexcept {
- _promise->setFrom(std::move(future));
- }
-
- template <typename... Args>
- void emplaceValue(Args&&... args) noexcept {
- _promise->emplaceValue(std::forward<Args>(args)...);
- }
-
- void setError(Status status) noexcept {
- _promise->setError(std::move(status));
- }
-
-private:
- // Only Promise<T> needs to be a friend, but MSVC2015 doesn't respect that friendship.
- // TODO see if this is still needed on MSVC2017+
- template <typename T2>
- friend class Promise;
-
- explicit SharedPromise(std::shared_ptr<Promise<T>>&& promise) : _promise(std::move(promise)) {}
-
- // TODO consider adding a SharedPromise refcount to SharedStateBase to avoid the extra
- // allocation. The tricky part will be ensuring that BrokenPromise is set when the last copy is
- // destroyed.
- std::shared_ptr<Promise<T>> _promise;
-};
-
-/**
* Future<T> is logically a possibly-deferred StatusWith<T> (or Status when T is void).
*
* As is usual for rvalue-qualified methods, you may call at most one of them on a given Future.
@@ -1431,12 +1366,6 @@ inline Future<T> Promise<T>::getFuture() noexcept {
}
template <typename T>
-inline SharedPromise<T> Promise<T>::share() noexcept {
- invariant(_sharedState);
- return SharedPromise<T>(std::make_shared<Promise<T>>(std::move(*this)));
-}
-
-template <typename T>
inline void Promise<T>::setFrom(Future<T>&& future) noexcept {
setImpl([&](boost::intrusive_ptr<SharedState<T>>&& sharedState) {
future.propagateResultTo(sharedState.get());
diff --git a/src/mongo/util/future_test_future_int.cpp b/src/mongo/util/future_test_future_int.cpp
index aec0a556d87..92955d367fb 100644
--- a/src/mongo/util/future_test_future_int.cpp
+++ b/src/mongo/util/future_test_future_int.cpp
@@ -94,7 +94,7 @@ TEST(Future, Success_getAsync) {
[] { return 1; },
[](Future<int>&& fut) {
auto pf = makePromiseFuture<int>();
- std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable {
+ std::move(fut).getAsync([outside = std::move(pf.promise)](StatusWith<int> sw) mutable {
ASSERT_OK(sw);
outside.emplaceValue(sw.getValue());
});
@@ -132,7 +132,7 @@ TEST(Future, Fail_getNothrowRvalue) {
TEST(Future, Fail_getAsync) {
FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
auto pf = makePromiseFuture<int>();
- std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable {
+ std::move(fut).getAsync([outside = std::move(pf.promise)](StatusWith<int> sw) mutable {
ASSERT(!sw.isOK());
outside.setError(sw.getStatus());
});
diff --git a/src/mongo/util/future_test_future_move_only.cpp b/src/mongo/util/future_test_future_move_only.cpp
index 75bda9505c8..b3d6dcc9730 100644
--- a/src/mongo/util/future_test_future_move_only.cpp
+++ b/src/mongo/util/future_test_future_move_only.cpp
@@ -110,16 +110,16 @@ TEST(Future_MoveOnly, Success_getNothrowRvalue) {
}
TEST(Future_MoveOnly, Success_getAsync) {
- FUTURE_SUCCESS_TEST(
- [] { return Widget(1); },
- [](Future<Widget>&& fut) {
- auto pf = makePromiseFuture<Widget>();
- std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable {
- ASSERT_OK(sw);
- outside.emplaceValue(std::move(sw.getValue()));
- });
- ASSERT_EQ(std::move(pf.future).get(), 1);
- });
+ FUTURE_SUCCESS_TEST([] { return Widget(1); },
+ [](Future<Widget>&& fut) {
+ auto pf = makePromiseFuture<Widget>();
+ std::move(fut).getAsync([outside = std::move(pf.promise)](
+ StatusWith<Widget> sw) mutable {
+ ASSERT_OK(sw);
+ outside.emplaceValue(std::move(sw.getValue()));
+ });
+ ASSERT_EQ(std::move(pf.future).get(), 1);
+ });
}
TEST(Future_MoveOnly, Fail_getLvalue) {
@@ -156,7 +156,7 @@ TEST(Future_MoveOnly, Fail_getNothrowRvalue) {
TEST(Future_MoveOnly, Fail_getAsync) {
FUTURE_FAIL_TEST<Widget>([](Future<Widget>&& fut) {
auto pf = makePromiseFuture<Widget>();
- std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable {
+ std::move(fut).getAsync([outside = std::move(pf.promise)](StatusWith<Widget> sw) mutable {
ASSERT(!sw.isOK());
outside.setError(sw.getStatus());
});
diff --git a/src/mongo/util/future_test_future_void.cpp b/src/mongo/util/future_test_future_void.cpp
index 02ab0e73e76..4095706739f 100644
--- a/src/mongo/util/future_test_future_void.cpp
+++ b/src/mongo/util/future_test_future_void.cpp
@@ -73,7 +73,7 @@ TEST(Future_Void, Success_getAsync) {
[] {},
[](Future<void>&& fut) {
auto pf = makePromiseFuture<void>();
- std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable {
+ std::move(fut).getAsync([outside = std::move(pf.promise)](Status status) mutable {
ASSERT_OK(status);
outside.emplaceValue();
});
@@ -111,7 +111,7 @@ TEST(Future_Void, Fail_getNothrowRvalue) {
TEST(Future_Void, Fail_getAsync) {
FUTURE_FAIL_TEST<void>([](Future<void>&& fut) {
auto pf = makePromiseFuture<void>();
- std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable {
+ std::move(fut).getAsync([outside = std::move(pf.promise)](Status status) mutable {
ASSERT(!status.isOK());
outside.setError(status);
});
diff --git a/src/mongo/util/keyed_executor.h b/src/mongo/util/keyed_executor.h
index 093d3a30b7c..2f0938c907c 100644
--- a/src/mongo/util/keyed_executor.h
+++ b/src/mongo/util/keyed_executor.h
@@ -77,7 +77,7 @@ template <typename Key, typename... MapArgs>
class KeyedExecutor {
// We hold a deque per key. Each entry in the deque represents a task we'll eventually execute
// and a list of callers who need to be notified after it completes.
- using Deque = std::deque<std::vector<SharedPromise<void>>>;
+ using Deque = std::deque<std::vector<Promise<void>>>;
using Map = stdx::unordered_map<Key, Deque, MapArgs...>;
@@ -232,7 +232,7 @@ private:
Future<void> _onCleared(WithLock, Deque& deque) {
invariant(deque.size());
auto pf = makePromiseFuture<void>();
- deque.back().push_back(pf.promise.share());
+ deque.back().push_back(std::move(pf.promise));
return std::move(pf.future);
}
diff --git a/src/mongo/util/keyed_executor_test.cpp b/src/mongo/util/keyed_executor_test.cpp
index e44bf934d7f..9332891b7e7 100644
--- a/src/mongo/util/keyed_executor_test.cpp
+++ b/src/mongo/util/keyed_executor_test.cpp
@@ -46,7 +46,7 @@ namespace {
class MockExecutor : public OutOfLineExecutor {
public:
- void schedule(stdx::function<void()> func) override {
+ void schedule(unique_function<void()> func) override {
_deque.push_front(std::move(func));
}
@@ -72,7 +72,7 @@ public:
}
private:
- std::deque<stdx::function<void()>> _deque;
+ std::deque<unique_function<void()>> _deque;
};
class ThreadPoolExecutor : public OutOfLineExecutor {
@@ -87,7 +87,7 @@ public:
_threadPool.shutdown();
}
- void schedule(stdx::function<void()> func) override {
+ void schedule(unique_function<void()> func) override {
ASSERT_OK(_threadPool.schedule(std::move(func)));
}
diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h
index 1f2b006c1b1..310b26cdd21 100644
--- a/src/mongo/util/out_of_line_executor.h
+++ b/src/mongo/util/out_of_line_executor.h
@@ -59,8 +59,8 @@ public:
Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
- schedule([ cb = std::forward<Callback>(cb), sp = pf.promise.share() ]() mutable {
- sp.setWith(std::move(cb));
+ schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
+ p.setWith(std::move(cb));
});
return std::move(pf.future);
@@ -69,7 +69,7 @@ public:
/**
* Invokes the callback on the executor. This never happens immediately on the caller's stack.
*/
- virtual void schedule(stdx::function<void()> func) = 0;
+ virtual void schedule(unique_function<void()> func) = 0;
protected:
~OutOfLineExecutor() noexcept {}