diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2019-04-27 15:05:28 -0400 |
---|---|---|
committer | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2019-05-04 14:49:58 -0400 |
commit | 30f602bb9c8799e9b2b0d1c608d13fdfb24d2ce2 (patch) | |
tree | 7a03a34d4fe4589324de2d76fdc82d6df06a35a5 /src/mongo | |
parent | 4fb71c5a1c79b745ef56d53a8264ef5fdd202dda (diff) | |
download | mongo-30f602bb9c8799e9b2b0d1c608d13fdfb24d2ce2.tar.gz |
SERVER-40765 TaskExecutor inherits from OutOfLineExecutor
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/reporter_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/task_executor_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/task_executor_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 22 | ||||
-rw-r--r-- | src/mongo/executor/scoped_task_executor.cpp | 6 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.cpp | 11 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 35 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 16 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_test.cpp | 91 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor.h | 6 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.cpp | 6 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.h | 6 |
14 files changed, 172 insertions, 52 deletions
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index 0c28ff717fb..0b105db318e 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -582,7 +582,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp, TaskExecutorWithFailureInScheduleWork(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWork( - CallbackFn work) override { + CallbackFn&& override) { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } }; @@ -634,7 +634,7 @@ TEST_F(ReporterTest, FailingToScheduleTimeoutShouldMakeReporterInactive) { TaskExecutorWithFailureInScheduleWorkAt(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWorkAt( - Date_t when, CallbackFn work) override { + Date_t when, CallbackFn&&) override { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } }; diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp index fce6381e992..025dc262b76 100644 --- a/src/mongo/db/repl/task_executor_mock.cpp +++ b/src/mongo/db/repl/task_executor_mock.cpp @@ -37,7 +37,8 @@ namespace repl { TaskExecutorMock::TaskExecutorMock(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} -StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork(CallbackFn work) { +StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWork( + CallbackFn&& work) { if (shouldFailScheduleWorkRequest()) { return Status(ErrorCodes::OperationFailed, "failed to schedule work"); } @@ -49,7 +50,7 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWor } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWorkAt( - Date_t when, CallbackFn work) { + Date_t when, CallbackFn&& work) { if (shouldFailScheduleWorkAtRequest()) { return Status(ErrorCodes::OperationFailed, str::stream() << "failed to schedule work at " << when.toString()); diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h index e72b4c91395..455bab08f19 100644 --- a/src/mongo/db/repl/task_executor_mock.h +++ b/src/mongo/db/repl/task_executor_mock.h @@ -46,8 +46,8 @@ public: explicit TaskExecutorMock(executor::TaskExecutor* executor); - StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn&& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) override; StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, const BatonHandle& baton = nullptr) override; diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 7bee0b6a789..e95c954e10d 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -98,13 +98,15 @@ public: // Update the shard identy config string void onConfirmedSet(const State& state) final { - auto connStr = state.connStr; - - auto fun = [ serviceContext = _serviceContext, connStr ](auto args) { - if (ErrorCodes::isCancelationError(args.status.code())) { + Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()->schedule([ + serviceContext = _serviceContext, + connStr = state.connStr + ](Status status) { + if (ErrorCodes::isCancelationError(status.code())) { + LOG(2) << "Unable to schedule confirmed set update due to " << status; return; } - uassertStatusOK(args.status); + uassertStatusOK(status); LOG(0) << "Updating config server with confirmed set " << connStr; Grid::get(serviceContext)->shardRegistry()->updateReplSetHosts(connStr); @@ -125,15 +127,7 @@ public: auto opCtx = tc->makeOperationContext(); ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), connStr); - }; - - auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); - auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus(); - if (ErrorCodes::isCancelationError(schedStatus.code())) { - LOG(2) << "Unable to schedule confirmed set update due to " << schedStatus; - return; - } - uassertStatusOK(schedStatus); + }); } void onPossibleSet(const State& state) final { Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr); diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp index c8663025e12..ba7c1fc7f3b 100644 --- a/src/mongo/executor/scoped_task_executor.cpp +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -105,7 +105,7 @@ public: return _executor->signalEvent(event); } - StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override { + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn&& work) override { return _wrapCallback([&](auto&& x) { return _executor->onEvent(event, std::move(x)); }, std::move(work)); } @@ -120,12 +120,12 @@ public: return _executor->waitForEvent(opCtx, event, deadline); } - StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override { + StatusWith<CallbackHandle> scheduleWork(CallbackFn&& work) override { return _wrapCallback([&](auto&& x) { return _executor->scheduleWork(std::move(x)); }, std::move(work)); } - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override { + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) override { return _wrapCallback( [&](auto&& x) { return _executor->scheduleWorkAt(when, std::move(x)); }, std::move(work)); diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp index 7ad6c3355fd..dea064860c7 100644 --- a/src/mongo/executor/task_executor.cpp +++ b/src/mongo/executor/task_executor.cpp @@ -37,6 +37,17 @@ namespace executor { TaskExecutor::TaskExecutor() = default; TaskExecutor::~TaskExecutor() = default; +void TaskExecutor::schedule(OutOfLineExecutor::Task func) { + auto cb = CallbackFn([func = std::move(func)](const CallbackArgs& args) { func(args.status); }); + auto statusWithCallback = scheduleWork(std::move(cb)); + if (!statusWithCallback.isOK()) { + // The callback was not scheduled or moved from, it is still valid. Run it inline to inform + // it of the error. Construct a CallbackArgs for it, only CallbackArgs::status matters here. + CallbackArgs args(this, {}, statusWithCallback.getStatus(), nullptr); + cb(args); + } +} + TaskExecutor::CallbackState::CallbackState() = default; TaskExecutor::CallbackState::~CallbackState() = default; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 22082712da1..e06048e7269 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -42,6 +42,7 @@ #include "mongo/stdx/functional.h" #include "mongo/transport/baton.h" #include "mongo/util/future.h" +#include "mongo/util/out_of_line_executor.h" #include "mongo/util/time_support.h" namespace mongo { @@ -71,7 +72,7 @@ struct ConnectionPoolStats; * If an event is unsignaled when shutdown is called, the executor will ensure that any threads * blocked in waitForEvent() eventually return. */ -class TaskExecutor { +class TaskExecutor : public OutOfLineExecutor { TaskExecutor(const TaskExecutor&) = delete; TaskExecutor& operator=(const TaskExecutor&) = delete; @@ -168,12 +169,19 @@ public: * Schedules a callback, "work", to run after "event" is signaled. If "event" * has already been signaled, marks "work" as immediately runnable. * + * On success, returns a handle for waiting on or canceling the callback. The provided "work" + * argument is moved from and invalid for use in the caller. On error, returns + * ErrorCodes::ShutdownInProgress, and "work" is still valid. If you intend to call "work" after + * error, make sure it is an actual CallbackFn, not a lambda or other value that implicitly + * converts to CallbackFn, since such a value would be moved from and invalidated during + * conversion with no way to recover it. + * * If "event" has yet to be signaled when "shutdown()" is called, "work" will * be scheduled with a status of ErrorCodes::CallbackCanceled. * * May be called by client threads or callbacks running in the executor. */ - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) = 0; + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn&& work) = 0; /** * Blocks the calling thread until "event" is signaled. Also returns if the event is never @@ -195,33 +203,44 @@ public: const EventHandle& event, Date_t deadline = Date_t::max()) = 0; + + void schedule(OutOfLineExecutor::Task func) final override; + /** * Schedules "work" to be run by the executor ASAP. * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. + * On success, returns a handle for waiting on or canceling the callback. The provided "work" + * argument is moved from and invalid for use in the caller. On error, returns + * ErrorCodes::ShutdownInProgress, and "work" is still valid. If you intend to call "work" after + * error, make sure it is an actual CallbackFn, not a lambda or other value that implicitly + * converts to CallbackFn, since such a value would be moved from and invalidated during + * conversion with no way to recover it. * * May be called by client threads or callbacks running in the executor. * * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) = 0; + virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn&& work) = 0; /** * Schedules "work" to be run by the executor no sooner than "when". * * If "when" is <= now(), then it schedules the "work" to be run ASAP. * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. + * On success, returns a handle for waiting on or canceling the callback. The provided "work" + * argument is moved from and invalid for use in the caller. On error, returns + * ErrorCodes::ShutdownInProgress, and "work" is still valid. If you intend to call "work" after + * error, make sure it is an actual CallbackFn, not a lambda or other value that implicitly + * converts to CallbackFn, since such a value would be moved from and invalidated during + * conversion with no way to recover it. * * May be called by client threads or callbacks running in the executor. * * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) = 0; + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) = 0; /** * Schedules "cb" to be run by the executor with the result of executing the remote command diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index e02f80882c2..73fca1533d9 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -278,17 +278,20 @@ void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) { } StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event, - CallbackFn work) { + CallbackFn&& work) { if (!event.isValid()) { return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; } - auto wq = makeSingletonWorkQueue(std::move(work), nullptr); + // Unsure if we'll succeed yet, so pass an empty CallbackFn. + auto wq = makeSingletonWorkQueue({}, nullptr); stdx::unique_lock<stdx::mutex> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); if (!cbHandle.isOK()) { return cbHandle; } + // Success, invalidate "work" by moving it into the queue. + eventState->waiters.back()->callback = std::move(work); if (eventState->isSignaledFlag) { scheduleIntoPool_inlock(&eventState->waiters, std::move(lk)); } @@ -327,20 +330,23 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { } } -StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(CallbackFn work) { - auto wq = makeSingletonWorkQueue(std::move(work), nullptr); +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(CallbackFn&& work) { + // Unsure if we'll succeed yet, so pass an empty CallbackFn. + auto wq = makeSingletonWorkQueue({}, nullptr); WorkQueue temp; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); if (!cbHandle.isOK()) { return cbHandle; } + // Success, invalidate "work" by moving it into the queue. + temp.back()->callback = std::move(work); scheduleIntoPool_inlock(&temp, std::move(lk)); return cbHandle; } StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(Date_t when, - CallbackFn work) { + CallbackFn&& work) { if (when <= now()) { return scheduleWork(std::move(work)); } diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 099746c8e13..89010b451e1 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -77,13 +77,13 @@ public: Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn&& work) override; StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) override; void waitForEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn&& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) override; StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, const BatonHandle& baton = nullptr) override; diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp index ce63f225038..b487cc87a41 100644 --- a/src/mongo/executor/thread_pool_task_executor_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -75,6 +75,63 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) { ASSERT_EQUALS(startTime + Milliseconds(200), net->now()); } +TEST_F(ThreadPoolExecutorTest, Schedule) { + auto& executor = getExecutor(); + launchExecutorThread(); + auto status1 = getDetectableErrorStatus(); + unittest::Barrier barrier{2}; + executor.schedule([&](Status status) { + status1 = status; + barrier.countDownAndWait(); + }); + barrier.countDownAndWait(); + ASSERT_OK(status1); +} + +TEST_F(ThreadPoolExecutorTest, ScheduleAfterShutdown) { + auto& executor = getExecutor(); + auto status1 = getDetectableErrorStatus(); + executor.shutdown(); + executor.schedule([&](Status status) { status1 = status; }); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status1); +} + +TEST_F(ThreadPoolExecutorTest, OnEvent) { + auto& executor = getExecutor(); + launchExecutorThread(); + auto status1 = getDetectableErrorStatus(); + auto event = executor.makeEvent().getValue(); + unittest::Barrier barrier{2}; + TaskExecutor::CallbackFn cb = [&](const TaskExecutor::CallbackArgs& args) { + status1 = args.status; + barrier.countDownAndWait(); + }; + ASSERT_OK(executor.onEvent(event, std::move(cb)).getStatus()); + // Callback was moved from. + ASSERT(!cb); + executor.signalEvent(event); + barrier.countDownAndWait(); + ASSERT_OK(status1); +} + +TEST_F(ThreadPoolExecutorTest, OnEventAfterShutdown) { + auto& executor = getExecutor(); + auto status1 = getDetectableErrorStatus(); + auto event = executor.makeEvent().getValue(); + TaskExecutor::CallbackFn cb = [&](const TaskExecutor::CallbackArgs& args) { + status1 = args.status; + }; + executor.shutdown(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, + executor.onEvent(event, std::move(cb)).getStatus()); + + // Callback was not moved from, it is still valid and we can call it to set status1. + ASSERT(static_cast<bool>(cb)); + TaskExecutor::CallbackArgs args(&executor, {}, Status::OK()); + cb(args); + ASSERT_OK(status1); +} + bool sharedCallbackStateDestroyed = false; class SharedCallbackState { SharedCallbackState(const SharedCallbackState&) = delete; @@ -121,7 +178,7 @@ TEST_F(ThreadPoolExecutorTest, thread_local bool amRunningRecursively = false; -TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { +TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleWorkRaceDoesNotCrash) { // This test works by scheduling a work item in the ThreadPoolTaskExecutor that blocks waiting // to be signaled by this thread. Once that work item is scheduled, this thread enables a // FailPoint that causes future calls of ThreadPoolTaskExecutor::scheduleIntoPool_inlock to spin @@ -168,6 +225,38 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2); } +TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { + // Same as above, with schedule() instead of scheduleWork(). + unittest::Barrier barrier{2}; + auto status1 = getDetectableErrorStatus(); + auto status2 = getDetectableErrorStatus(); + auto& executor = getExecutor(); + launchExecutorThread(); + + executor.schedule([&](Status status) { + status1 = status; + if (!status1.isOK()) + return; + barrier.countDownAndWait(); + amRunningRecursively = true; + executor.schedule([&status2](Status status) { + ASSERT_FALSE(amRunningRecursively); + status2 = status; + }); + amRunningRecursively = false; + }); + + auto fpTPTE1 = getGlobalFailPointRegistry()->getFailPoint( + "scheduleIntoPoolSpinsUntilThreadPoolTaskExecutorShutsDown"); + fpTPTE1->setMode(FailPoint::alwaysOn); + barrier.countDownAndWait(); + MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1)); + executor.shutdown(); + executor.join(); + ASSERT_OK(status1); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2); +} + } // namespace } // namespace executor } // namespace mongo diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp index 1f723c2b1d5..d8327cfc6ed 100644 --- a/src/mongo/s/sharding_task_executor.cpp +++ b/src/mongo/s/sharding_task_executor.cpp @@ -87,7 +87,7 @@ void ShardingTaskExecutor::signalEvent(const EventHandle& event) { } StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::onEvent(const EventHandle& event, - CallbackFn work) { + CallbackFn&& work) { return _executor->onEvent(event, std::move(work)); } @@ -101,12 +101,12 @@ StatusWith<stdx::cv_status> ShardingTaskExecutor::waitForEvent(OperationContext* return _executor->waitForEvent(opCtx, event, deadline); } -StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork(CallbackFn work) { +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork(CallbackFn&& work) { return _executor->scheduleWork(std::move(work)); } StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(Date_t when, - CallbackFn work) { + CallbackFn&& work) { return _executor->scheduleWorkAt(when, std::move(work)); } diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h index b9d879070c6..a7d487506c6 100644 --- a/src/mongo/s/sharding_task_executor.h +++ b/src/mongo/s/sharding_task_executor.h @@ -61,13 +61,13 @@ public: Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn&& work) override; void waitForEvent(const EventHandle& event) override; StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) override; - StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn&& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) override; StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, const BatonHandle& baton = nullptr) override; diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index e092e8645bd..5cae0ade222 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -75,7 +75,7 @@ void TaskExecutorProxy::signalEvent(const EventHandle& event) { } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::onEvent( - const EventHandle& event, CallbackFn work) { + const EventHandle& event, CallbackFn&& work) { return _executor->onEvent(event, std::move(work)); } @@ -90,12 +90,12 @@ StatusWith<stdx::cv_status> TaskExecutorProxy::waitForEvent(OperationContext* op } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWork( - CallbackFn work) { + CallbackFn&& work) { return _executor->scheduleWork(std::move(work)); } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWorkAt( - Date_t when, CallbackFn work) { + Date_t when, CallbackFn&& work) { return _executor->scheduleWorkAt(when, std::move(work)); } diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index 436c3a72216..17b2d59b746 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -58,13 +58,13 @@ public: Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn&& work) override; void waitForEvent(const EventHandle& event) override; StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) override; - StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn&& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) override; StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, const BatonHandle& baton = nullptr) override; |