summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2018-10-31 15:42:17 -0400
committerMathias Stearn <mathias@10gen.com>2018-11-15 17:25:11 -0500
commit3d7ed4fdd5e4840e9599a74ec92e16bc619bebf0 (patch)
tree2bd06655367af012e17bf80947cdb792fa1b9b44 /src/mongo/executor
parent1ded7067e2d1a6161b15e5a462f8cba2d755c9a6 (diff)
downloadmongo-3d7ed4fdd5e4840e9599a74ec92e16bc619bebf0.tar.gz
SERVER-35682 kill existing SharedPromise type
This required plumbing unique_function into many more places.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/connection_pool.cpp4
-rw-r--r--src/mongo/executor/network_interface.h21
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp7
-rw-r--r--src/mongo/executor/network_interface_mock.cpp44
-rw-r--r--src/mongo/executor/network_interface_mock.h15
-rw-r--r--src/mongo/executor/network_interface_tl.cpp70
-rw-r--r--src/mongo/executor/network_interface_tl.h4
-rw-r--r--src/mongo/executor/task_executor.h9
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp5
-rw-r--r--src/mongo/executor/thread_pool_mock.h1
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp17
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h6
12 files changed, 106 insertions, 97 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 63445075780..a6438e3cf7e 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -179,7 +179,7 @@ private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>;
- using Request = std::pair<Date_t, SharedPromise<ConnectionHandle>>;
+ using Request = std::pair<Date_t, Promise<ConnectionHandle>>;
struct RequestComparator {
bool operator()(const Request& a, const Request& b) {
return a.first > b.first;
@@ -461,7 +461,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
const auto expiration = _parent->_factory->now() + timeout;
auto pf = makePromiseFuture<ConnectionHandle>();
- _requests.push_back(make_pair(expiration, pf.promise.share()));
+ _requests.push_back(make_pair(expiration, std::move(pf.promise)));
std::push_heap(begin(_requests), end(_requests), RequestComparator{});
updateStateInLock();
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 97320d12bf0..c55a7925bdc 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/transport/baton.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/functional.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -57,7 +58,7 @@ class NetworkInterface {
public:
using Response = RemoteCommandResponse;
- using RemoteCommandCompletionFn = stdx::function<void(const TaskExecutor::ResponseStatus&)>;
+ using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseStatus&)>;
virtual ~NetworkInterface();
@@ -145,7 +146,7 @@ public:
*/
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton = nullptr) = 0;
Future<TaskExecutor::ResponseStatus> startCommand(
@@ -154,13 +155,13 @@ public:
const transport::BatonHandle& baton = nullptr) {
auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>();
- auto status =
- startCommand(cbHandle,
- request,
- [sp = pf.promise.share()](const TaskExecutor::ResponseStatus& rs) mutable {
- sp.emplaceValue(rs);
- },
- baton);
+ auto status = startCommand(
+ cbHandle,
+ request,
+ [p = std::move(pf.promise)](const TaskExecutor::ResponseStatus& rs) mutable {
+ p.emplaceValue(rs);
+ },
+ baton);
if (!status.isOK()) {
return status;
@@ -190,7 +191,7 @@ public:
* return true. See that method for why.
*/
virtual Status setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton = nullptr) = 0;
/**
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index bd6cd9bf41e..f43af8bd17a 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -323,9 +323,10 @@ TEST_F(NetworkInterfaceTest, SetAlarm) {
Date_t expiration = net().now() + Milliseconds(100);
auto makeTimerFuture = [&] {
auto pf = makePromiseFuture<Date_t>();
- return std::make_pair(
- [ this, promise = pf.promise.share() ]() mutable { promise.emplaceValue(net().now()); },
- std::move(pf.future));
+ return std::make_pair([ this, promise = std::move(pf.promise) ]() mutable {
+ promise.emplaceValue(net().now());
+ },
+ std::move(pf.future));
};
auto futurePair = makeTimerFuture();
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 258c531707e..242e81126f6 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -115,7 +115,7 @@ std::string NetworkInterfaceMock::getHostName() {
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
@@ -124,7 +124,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
stdx::lock_guard<stdx::mutex> lk(_mutex);
const Date_t now = _now_inlock();
- auto op = NetworkOperation(cbHandle, request, now, onFinish);
+ auto op = NetworkOperation(cbHandle, request, now, std::move(onFinish));
// If we don't have a hook, or we have already 'connected' to this host, enqueue the op.
if (!_hook || _connections.count(request.target)) {
@@ -180,7 +180,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock(
}
Status NetworkInterfaceMock::setAlarm(const Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
@@ -193,7 +193,7 @@ Status NetworkInterfaceMock::setAlarm(const Date_t when,
action();
return Status::OK();
}
- _alarms.emplace(when, action);
+ _alarms.emplace(when, std::move(action));
return Status::OK();
}
@@ -460,16 +460,21 @@ void NetworkInterfaceMock::_enqueueOperation_inlock(
return a.getNextConsiderationDate() < b.getNextConsiderationDate();
});
+ const auto timeout = op.getRequest().timeout;
+ auto cbh = op.getCallbackHandle();
+
_unscheduled.emplace(insertBefore, std::move(op));
- if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) {
- invariant(op.getRequest().timeout >= Milliseconds(0));
+ if (timeout != RemoteCommandRequest::kNoTimeout) {
+ invariant(timeout >= Milliseconds(0));
ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0));
std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_scheduled};
- auto action = [ =, cbh = op.getCallbackHandle() ] {
- _interruptWithResponse_inlock(cbh, queuesToCheck, rs);
- };
- _alarms.emplace(_now_inlock() + op.getRequest().timeout, action);
+ _alarms.emplace(_now_inlock() + timeout, [
+ this,
+ cbh = std::move(cbh),
+ queuesToCheck = std::move(queuesToCheck),
+ rs = std::move(rs)
+ ] { _interruptWithResponse_inlock(cbh, queuesToCheck, rs); });
}
}
@@ -504,13 +509,14 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
if (!hookPostconnectCommand) {
// If we don't have a post connect command, enqueue the actual command.
- _enqueueOperation_inlock(std::move(op));
_connections.emplace(op.getRequest().target);
+ _enqueueOperation_inlock(std::move(op));
return;
}
+ auto cbh = op.getCallbackHandle();
// The completion handler for the postconnect command schedules the original command.
- auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable {
+ auto postconnectCompletionHandler = [ this, op = std::move(op) ](ResponseStatus rs) mutable {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!rs.isOK()) {
op.setResponse(_now_inlock(), rs);
@@ -526,11 +532,11 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
return;
}
- _enqueueOperation_inlock(std::move(op));
_connections.emplace(op.getRequest().target);
+ _enqueueOperation_inlock(std::move(op));
};
- auto postconnectOp = NetworkOperation(op.getCallbackHandle(),
+ auto postconnectOp = NetworkOperation(cbh,
std::move(*hookPostconnectCommand),
_now_inlock(),
std::move(postconnectCompletionHandler));
@@ -563,7 +569,7 @@ void NetworkInterfaceMock::signalWorkAvailable() {
void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) {
while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) {
- auto fn = _alarms.top().action;
+ auto fn = std::move(_alarms.top().action);
_alarms.pop();
lk->unlock();
fn();
@@ -571,7 +577,7 @@ void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<s
}
while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
invariant(_currentlyRunning == kNetworkThread);
- NetworkOperation op = _scheduled.front();
+ NetworkOperation op = std::move(_scheduled.front());
_scheduled.pop_front();
_waitingToRunMask |= kExecutorThread;
lk->unlock();
@@ -637,16 +643,14 @@ NetworkInterfaceMock::NetworkOperation::NetworkOperation()
NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle,
const RemoteCommandRequest& theRequest,
Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish)
+ RemoteCommandCompletionFn onFinish)
: _requestDate(theRequestDate),
_nextConsiderationDate(theRequestDate),
_responseDate(),
_cbHandle(cbHandle),
_request(theRequest),
_response(kUnsetResponse),
- _onFinish(onFinish) {}
-
-NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
+ _onFinish(std::move(onFinish)) {}
std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const {
return str::stream() << "NetworkOperation -- request:'" << _request.toString()
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index f87b18c27b3..7023101fd59 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -113,7 +113,7 @@ public:
virtual std::string getHostName();
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton = nullptr);
/**
@@ -129,7 +129,7 @@ public:
* Not implemented.
*/
virtual Status setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton = nullptr);
virtual bool onNetworkThread();
@@ -284,7 +284,7 @@ private:
* Information describing a scheduled alarm.
*/
struct AlarmInfo {
- using AlarmAction = stdx::function<void()>;
+ using AlarmAction = unique_function<void()>;
AlarmInfo(Date_t inWhen, AlarmAction inAction)
: when(inWhen), action(std::move(inAction)) {}
bool operator>(const AlarmInfo& rhs) const {
@@ -292,7 +292,7 @@ private:
}
Date_t when;
- AlarmAction action;
+ mutable AlarmAction action;
};
/**
@@ -435,8 +435,7 @@ public:
NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& theRequest,
Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish);
- ~NetworkOperation();
+ RemoteCommandCompletionFn onFinish);
/**
* Adjusts the stored virtual time at which this entry will be subject to consideration
@@ -556,8 +555,8 @@ public:
Date_t now() override {
return _net->now();
}
- Status setAlarm(Date_t when, stdx::function<void()> action) override {
- return _net->setAlarm(when, action);
+ Status setAlarm(Date_t when, unique_function<void()> action) override {
+ return _net->setAlarm(when, std::move(action));
}
private:
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 74681c42308..a395b1f48bb 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -172,7 +172,7 @@ Date_t NetworkInterfaceTL::now() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
@@ -205,7 +205,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) {
log() << "Discarding command due to failpoint before acquireConn";
- std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) {
+ std::move(pf.future).getAsync([onFinish = std::move(onFinish)](
+ StatusWith<RemoteCommandResponse> response) mutable {
onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0}));
});
return Status::OK();
@@ -237,8 +238,9 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
});
});
- auto remainingWork = [ this, state, future = std::move(pf.future), baton, onFinish ](
- StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ auto remainingWork =
+ [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
return _onAcquireConn(
state, std::move(future), std::move(*uassertStatusOK(swConn)), baton);
@@ -251,7 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
}
return error;
})
- .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
+ .getAsync([ this, state, onFinish = std::move(onFinish) ](
+ StatusWith<RemoteCommandResponse> response) {
auto duration = now() - state->start;
if (!response.isOK()) {
onFinish(RemoteCommandResponse(response.getStatus(), duration));
@@ -430,7 +433,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
}
Status NetworkInterfaceTL::setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
@@ -454,37 +457,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when,
}
alarmTimer->waitUntil(when, baton)
- .getAsync([this, weakTimer, action, when, baton](Status status) {
- auto alarmTimer = weakTimer.lock();
- if (!alarmTimer) {
- return;
- } else {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgressAlarms.erase(alarmTimer);
- }
-
- auto nowVal = now();
- if (nowVal < when) {
- warning() << "Alarm returned early. Expected at: " << when
- << ", fired at: " << nowVal;
- const auto status = setAlarm(when, std::move(action), baton);
- if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(50785, status);
+ .getAsync(
+ [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
}
- return;
- }
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when
+ << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action), baton);
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
+ }
- if (status.isOK()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ return;
}
- } else if (status != ErrorCodes::CallbackCanceled) {
- warning() << "setAlarm() received an error: " << status;
- }
- });
+
+ if (status.isOK()) {
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
return Status::OK();
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 048aaba7f5b..1dbcc7a7678 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -67,13 +67,13 @@ public:
Date_t now() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
+ RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const transport::BatonHandle& baton) override;
Status setAlarm(Date_t when,
- const stdx::function<void()>& action,
+ unique_function<void()> action,
const transport::BatonHandle& baton) override;
bool onNetworkThread() override;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 3c845a18824..e8cff8ee0f6 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -94,7 +94,7 @@ public:
* the callback was canceled for any reason (including shutdown). Otherwise, it should have
* Status::OK().
*/
- using CallbackFn = stdx::function<void(const CallbackArgs&)>;
+ using CallbackFn = unique_function<void(const CallbackArgs&)>;
/**
* Type of a callback from a request to run a command on a remote MongoDB node.
@@ -175,8 +175,7 @@ public:
*
* May be called by client threads or callbacks running in the executor.
*/
- virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
- const CallbackFn& work) = 0;
+ virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) = 0;
/**
* Blocks the calling thread until "event" is signaled. Also returns if the event is never
@@ -209,7 +208,7 @@ public:
* Contract: Implementations should guarantee that callback should be called *after* doing any
* processing related to the callback.
*/
- virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0;
+ virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) = 0;
/**
* Schedules "work" to be run by the executor no sooner than "when".
@@ -224,7 +223,7 @@ public:
* Contract: Implementations should guarantee that callback should be called *after* doing any
* processing related to the callback.
*/
- virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0;
+ virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) = 0;
/**
* Schedules "cb" to be run by the executor with the result of executing the remote command
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index a60b115b923..42373c2b47b 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -307,13 +307,14 @@ void EventChainAndWaitingTest::onGo(const TaskExecutor::CallbackArgs& cbData) {
return;
}
triggerEvent = errorOrTriggerEvent.getValue();
- StatusWith<TaskExecutor::CallbackHandle> cbHandle = executor->onEvent(triggerEvent, triggered2);
+ StatusWith<TaskExecutor::CallbackHandle> cbHandle =
+ executor->onEvent(triggerEvent, std::move(triggered2));
if (!cbHandle.isOK()) {
status1 = cbHandle.getStatus();
executor->shutdown();
return;
}
- cbHandle = executor->onEvent(triggerEvent, triggered3);
+ cbHandle = executor->onEvent(triggerEvent, std::move(triggered3));
if (!cbHandle.isOK()) {
status1 = cbHandle.getStatus();
executor->shutdown();
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
index 5520de68396..8d596b3d3c5 100644
--- a/src/mongo/executor/thread_pool_mock.h
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -34,6 +34,7 @@
#include <vector>
#include "mongo/platform/random.h"
+#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index d48a0888bc6..94e547b87f3 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -274,11 +274,11 @@ void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) {
}
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event,
- const CallbackFn& work) {
+ CallbackFn work) {
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
- auto wq = makeSingletonWorkQueue(work, nullptr);
+ auto wq = makeSingletonWorkQueue(std::move(work), nullptr);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
@@ -323,9 +323,8 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
}
}
-StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
- const CallbackFn& work) {
- auto wq = makeSingletonWorkQueue(work, nullptr);
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(CallbackFn work) {
+ auto wq = makeSingletonWorkQueue(std::move(work), nullptr);
WorkQueue temp;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
@@ -336,12 +335,12 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
return cbHandle;
}
-StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
- Date_t when, const CallbackFn& work) {
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(Date_t when,
+ CallbackFn work) {
if (when <= now()) {
- return scheduleWork(work);
+ return scheduleWork(std::move(work));
}
- auto wq = makeSingletonWorkQueue(work, nullptr, when);
+ auto wq = makeSingletonWorkQueue(std::move(work), nullptr, when);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 0fd821baa95..5163ad9d10d 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -75,13 +75,13 @@ public:
Date_t now() override;
StatusWith<EventHandle> makeEvent() override;
void signalEvent(const EventHandle& event) override;
- StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override;
StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
const EventHandle& event,
Date_t deadline) override;
void waitForEvent(const EventHandle& event) override;
- StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
StatusWith<CallbackHandle> scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,