diff options
author | Mathias Stearn <mathias@10gen.com> | 2018-10-31 15:42:17 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2018-11-15 17:25:11 -0500 |
commit | 3d7ed4fdd5e4840e9599a74ec92e16bc619bebf0 (patch) | |
tree | 2bd06655367af012e17bf80947cdb792fa1b9b44 /src/mongo/executor | |
parent | 1ded7067e2d1a6161b15e5a462f8cba2d755c9a6 (diff) | |
download | mongo-3d7ed4fdd5e4840e9599a74ec92e16bc619bebf0.tar.gz |
SERVER-35682 kill existing SharedPromise type
This required plumbing unique_function into many more places.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 21 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 44 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 15 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 70 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 9 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.cpp | 5 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.h | 1 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 17 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 6 |
12 files changed, 106 insertions, 97 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 63445075780..a6438e3cf7e 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -179,7 +179,7 @@ private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>; - using Request = std::pair<Date_t, SharedPromise<ConnectionHandle>>; + using Request = std::pair<Date_t, Promise<ConnectionHandle>>; struct RequestComparator { bool operator()(const Request& a, const Request& b) { return a.first > b.first; @@ -461,7 +461,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec const auto expiration = _parent->_factory->now() + timeout; auto pf = makePromiseFuture<ConnectionHandle>(); - _requests.push_back(make_pair(expiration, pf.promise.share())); + _requests.push_back(make_pair(expiration, std::move(pf.promise))); std::push_heap(begin(_requests), end(_requests), RequestComparator{}); updateStateInLock(); diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 97320d12bf0..c55a7925bdc 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -38,6 +38,7 @@ #include "mongo/stdx/functional.h" #include "mongo/transport/baton.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/functional.h" #include "mongo/util/future.h" namespace mongo { @@ -57,7 +58,7 @@ class NetworkInterface { public: using Response = RemoteCommandResponse; - using RemoteCommandCompletionFn = stdx::function<void(const TaskExecutor::ResponseStatus&)>; + using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseStatus&)>; virtual ~NetworkInterface(); @@ -145,7 +146,7 @@ public: */ virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton = nullptr) = 0; Future<TaskExecutor::ResponseStatus> startCommand( @@ -154,13 +155,13 @@ public: const transport::BatonHandle& baton = nullptr) { auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>(); - auto status = - startCommand(cbHandle, - request, - [sp = pf.promise.share()](const TaskExecutor::ResponseStatus& rs) mutable { - sp.emplaceValue(rs); - }, - baton); + auto status = startCommand( + cbHandle, + request, + [p = std::move(pf.promise)](const TaskExecutor::ResponseStatus& rs) mutable { + p.emplaceValue(rs); + }, + baton); if (!status.isOK()) { return status; @@ -190,7 +191,7 @@ public: * return true. See that method for why. */ virtual Status setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton = nullptr) = 0; /** diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index bd6cd9bf41e..f43af8bd17a 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -323,9 +323,10 @@ TEST_F(NetworkInterfaceTest, SetAlarm) { Date_t expiration = net().now() + Milliseconds(100); auto makeTimerFuture = [&] { auto pf = makePromiseFuture<Date_t>(); - return std::make_pair( - [ this, promise = pf.promise.share() ]() mutable { promise.emplaceValue(net().now()); }, - std::move(pf.future)); + return std::make_pair([ this, promise = std::move(pf.promise) ]() mutable { + promise.emplaceValue(net().now()); + }, + std::move(pf.future)); }; auto futurePair = makeTimerFuture(); diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 258c531707e..242e81126f6 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -115,7 +115,7 @@ std::string NetworkInterfaceMock::getHostName() { Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; @@ -124,7 +124,7 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, stdx::lock_guard<stdx::mutex> lk(_mutex); const Date_t now = _now_inlock(); - auto op = NetworkOperation(cbHandle, request, now, onFinish); + auto op = NetworkOperation(cbHandle, request, now, std::move(onFinish)); // If we don't have a hook, or we have already 'connected' to this host, enqueue the op. if (!_hook || _connections.count(request.target)) { @@ -180,7 +180,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( } Status NetworkInterfaceMock::setAlarm(const Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; @@ -193,7 +193,7 @@ Status NetworkInterfaceMock::setAlarm(const Date_t when, action(); return Status::OK(); } - _alarms.emplace(when, action); + _alarms.emplace(when, std::move(action)); return Status::OK(); } @@ -460,16 +460,21 @@ void NetworkInterfaceMock::_enqueueOperation_inlock( return a.getNextConsiderationDate() < b.getNextConsiderationDate(); }); + const auto timeout = op.getRequest().timeout; + auto cbh = op.getCallbackHandle(); + _unscheduled.emplace(insertBefore, std::move(op)); - if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) { - invariant(op.getRequest().timeout >= Milliseconds(0)); + if (timeout != RemoteCommandRequest::kNoTimeout) { + invariant(timeout >= Milliseconds(0)); ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0)); std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_scheduled}; - auto action = [ =, cbh = op.getCallbackHandle() ] { - _interruptWithResponse_inlock(cbh, queuesToCheck, rs); - }; - _alarms.emplace(_now_inlock() + op.getRequest().timeout, action); + _alarms.emplace(_now_inlock() + timeout, [ + this, + cbh = std::move(cbh), + queuesToCheck = std::move(queuesToCheck), + rs = std::move(rs) + ] { _interruptWithResponse_inlock(cbh, queuesToCheck, rs); }); } } @@ -504,13 +509,14 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort if (!hookPostconnectCommand) { // If we don't have a post connect command, enqueue the actual command. - _enqueueOperation_inlock(std::move(op)); _connections.emplace(op.getRequest().target); + _enqueueOperation_inlock(std::move(op)); return; } + auto cbh = op.getCallbackHandle(); // The completion handler for the postconnect command schedules the original command. - auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable { + auto postconnectCompletionHandler = [ this, op = std::move(op) ](ResponseStatus rs) mutable { stdx::lock_guard<stdx::mutex> lk(_mutex); if (!rs.isOK()) { op.setResponse(_now_inlock(), rs); @@ -526,11 +532,11 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort return; } - _enqueueOperation_inlock(std::move(op)); _connections.emplace(op.getRequest().target); + _enqueueOperation_inlock(std::move(op)); }; - auto postconnectOp = NetworkOperation(op.getCallbackHandle(), + auto postconnectOp = NetworkOperation(cbh, std::move(*hookPostconnectCommand), _now_inlock(), std::move(postconnectCompletionHandler)); @@ -563,7 +569,7 @@ void NetworkInterfaceMock::signalWorkAvailable() { void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk) { while (!_alarms.empty() && _now_inlock() >= _alarms.top().when) { - auto fn = _alarms.top().action; + auto fn = std::move(_alarms.top().action); _alarms.pop(); lk->unlock(); fn(); @@ -571,7 +577,7 @@ void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<s } while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) { invariant(_currentlyRunning == kNetworkThread); - NetworkOperation op = _scheduled.front(); + NetworkOperation op = std::move(_scheduled.front()); _scheduled.pop_front(); _waitingToRunMask |= kExecutorThread; lk->unlock(); @@ -637,16 +643,14 @@ NetworkInterfaceMock::NetworkOperation::NetworkOperation() NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle, const RemoteCommandRequest& theRequest, Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish) + RemoteCommandCompletionFn onFinish) : _requestDate(theRequestDate), _nextConsiderationDate(theRequestDate), _responseDate(), _cbHandle(cbHandle), _request(theRequest), _response(kUnsetResponse), - _onFinish(onFinish) {} - -NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {} + _onFinish(std::move(onFinish)) {} std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const { return str::stream() << "NetworkOperation -- request:'" << _request.toString() diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index f87b18c27b3..7023101fd59 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -113,7 +113,7 @@ public: virtual std::string getHostName(); virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton = nullptr); /** @@ -129,7 +129,7 @@ public: * Not implemented. */ virtual Status setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton = nullptr); virtual bool onNetworkThread(); @@ -284,7 +284,7 @@ private: * Information describing a scheduled alarm. */ struct AlarmInfo { - using AlarmAction = stdx::function<void()>; + using AlarmAction = unique_function<void()>; AlarmInfo(Date_t inWhen, AlarmAction inAction) : when(inWhen), action(std::move(inAction)) {} bool operator>(const AlarmInfo& rhs) const { @@ -292,7 +292,7 @@ private: } Date_t when; - AlarmAction action; + mutable AlarmAction action; }; /** @@ -435,8 +435,7 @@ public: NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& theRequest, Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish); - ~NetworkOperation(); + RemoteCommandCompletionFn onFinish); /** * Adjusts the stored virtual time at which this entry will be subject to consideration @@ -556,8 +555,8 @@ public: Date_t now() override { return _net->now(); } - Status setAlarm(Date_t when, stdx::function<void()> action) override { - return _net->setAlarm(when, action); + Status setAlarm(Date_t when, unique_function<void()> action) override { + return _net->setAlarm(when, std::move(action)); } private: diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 74681c42308..a395b1f48bb 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -172,7 +172,7 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; @@ -205,7 +205,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) { log() << "Discarding command due to failpoint before acquireConn"; - std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) { + std::move(pf.future).getAsync([onFinish = std::move(onFinish)]( + StatusWith<RemoteCommandResponse> response) mutable { onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0})); }); return Status::OK(); @@ -237,8 +238,9 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa }); }); - auto remainingWork = [ this, state, future = std::move(pf.future), baton, onFinish ]( - StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + auto remainingWork = + [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { makeReadyFutureWith([&] { return _onAcquireConn( state, std::move(future), std::move(*uassertStatusOK(swConn)), baton); @@ -251,7 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa } return error; }) - .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { + .getAsync([ this, state, onFinish = std::move(onFinish) ]( + StatusWith<RemoteCommandResponse> response) { auto duration = now() - state->start; if (!response.isOK()) { onFinish(RemoteCommandResponse(response.getStatus(), duration)); @@ -430,7 +433,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan } Status NetworkInterfaceTL::setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; @@ -454,37 +457,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, } alarmTimer->waitUntil(when, baton) - .getAsync([this, weakTimer, action, when, baton](Status status) { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when - << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action), baton); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); + .getAsync( + [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); } - return; - } + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action), baton); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); + } - if (status.isOK()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); + return; } - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + + if (status.isOK()) { + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 048aaba7f5b..1dbcc7a7678 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -67,13 +67,13 @@ public: Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, + RemoteCommandCompletionFn&& onFinish, const transport::BatonHandle& baton) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, const transport::BatonHandle& baton) override; Status setAlarm(Date_t when, - const stdx::function<void()>& action, + unique_function<void()> action, const transport::BatonHandle& baton) override; bool onNetworkThread() override; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 3c845a18824..e8cff8ee0f6 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -94,7 +94,7 @@ public: * the callback was canceled for any reason (including shutdown). Otherwise, it should have * Status::OK(). */ - using CallbackFn = stdx::function<void(const CallbackArgs&)>; + using CallbackFn = unique_function<void(const CallbackArgs&)>; /** * Type of a callback from a request to run a command on a remote MongoDB node. @@ -175,8 +175,7 @@ public: * * May be called by client threads or callbacks running in the executor. */ - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, - const CallbackFn& work) = 0; + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) = 0; /** * Blocks the calling thread until "event" is signaled. Also returns if the event is never @@ -209,7 +208,7 @@ public: * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0; + virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) = 0; /** * Schedules "work" to be run by the executor no sooner than "when". @@ -224,7 +223,7 @@ public: * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) = 0; /** * Schedules "cb" to be run by the executor with the result of executing the remote command diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index a60b115b923..42373c2b47b 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -307,13 +307,14 @@ void EventChainAndWaitingTest::onGo(const TaskExecutor::CallbackArgs& cbData) { return; } triggerEvent = errorOrTriggerEvent.getValue(); - StatusWith<TaskExecutor::CallbackHandle> cbHandle = executor->onEvent(triggerEvent, triggered2); + StatusWith<TaskExecutor::CallbackHandle> cbHandle = + executor->onEvent(triggerEvent, std::move(triggered2)); if (!cbHandle.isOK()) { status1 = cbHandle.getStatus(); executor->shutdown(); return; } - cbHandle = executor->onEvent(triggerEvent, triggered3); + cbHandle = executor->onEvent(triggerEvent, std::move(triggered3)); if (!cbHandle.isOK()) { status1 = cbHandle.getStatus(); executor->shutdown(); diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h index 5520de68396..8d596b3d3c5 100644 --- a/src/mongo/executor/thread_pool_mock.h +++ b/src/mongo/executor/thread_pool_mock.h @@ -34,6 +34,7 @@ #include <vector> #include "mongo/platform/random.h" +#include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/thread_pool_interface.h" diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index d48a0888bc6..94e547b87f3 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -274,11 +274,11 @@ void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) { } StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event, - const CallbackFn& work) { + CallbackFn work) { if (!event.isValid()) { return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; } - auto wq = makeSingletonWorkQueue(work, nullptr); + auto wq = makeSingletonWorkQueue(std::move(work), nullptr); stdx::unique_lock<stdx::mutex> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); @@ -323,9 +323,8 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { } } -StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( - const CallbackFn& work) { - auto wq = makeSingletonWorkQueue(work, nullptr); +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(CallbackFn work) { + auto wq = makeSingletonWorkQueue(std::move(work), nullptr); WorkQueue temp; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); @@ -336,12 +335,12 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( return cbHandle; } -StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( - Date_t when, const CallbackFn& work) { +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(Date_t when, + CallbackFn work) { if (when <= now()) { - return scheduleWork(work); + return scheduleWork(std::move(work)); } - auto wq = makeSingletonWorkQueue(work, nullptr, when); + auto wq = makeSingletonWorkQueue(std::move(work), nullptr, when); stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq); if (!cbHandle.isOK()) { diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 0fd821baa95..5163ad9d10d 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -75,13 +75,13 @@ public: Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) override; void waitForEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; StatusWith<CallbackHandle> scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, |