diff options
author | Jason Carey <jcarey@argv.me> | 2019-01-23 13:18:49 -0500 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2019-02-05 22:41:49 -0500 |
commit | a23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81 (patch) | |
tree | 1adc2fdb36e6c8babaab134d53f84de3020c2404 /src/mongo/executor | |
parent | 5fd66f15797c45c9bab7b59f9e55e0a2f7ad5cd0 (diff) | |
download | mongo-a23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81.tar.gz |
SERVER-39146 Refactor Baton
Refactor the baton into regular and networking batons while also
cleaning up the basic baton implementation.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/network_interface.h | 15 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 9 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 8 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 90 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 54 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 11 |
8 files changed, 97 insertions, 102 deletions
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 62012162bf6..34685392f1e 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -144,12 +144,11 @@ public: virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; - Future<TaskExecutor::ResponseStatus> startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - const transport::BatonHandle& baton = nullptr) { + Future<TaskExecutor::ResponseStatus> startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const BatonHandle& baton = nullptr) { auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>(); auto status = startCommand( @@ -171,7 +170,7 @@ public: * completed. */ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; /** * Sets an alarm, which schedules "action" to run no sooner than "when". @@ -187,9 +186,7 @@ public: * Any callbacks invoked from setAlarm must observe onNetworkThread to * return true. See that method for why. */ - virtual Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton = nullptr) = 0; + virtual Status setAlarm(Date_t when, unique_function<void()> action) = 0; /** * Returns true if called from a thread dedicated to networking. I.e. not a diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index f700f6c211e..641f1095530 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -108,7 +108,7 @@ std::string NetworkInterfaceMock::getHostName() { Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } @@ -140,8 +140,7 @@ void NetworkInterfaceMock::setHandshakeReplyForHost( } } -void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, - const transport::BatonHandle& baton) { +void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const BatonHandle& baton) { invariant(!inShutdown()); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -171,9 +170,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( } } -Status NetworkInterfaceMock::setAlarm(const Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) { +Status NetworkInterfaceMock::setAlarm(const Date_t when, unique_function<void()> action) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 937bed41525..c0f1693a660 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -114,7 +114,7 @@ public: Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton = nullptr) override; + const BatonHandle& baton = nullptr) override; /** * If the network operation is in the _unscheduled or _processing queues, moves the operation @@ -123,14 +123,12 @@ public: * called after the task has already completed, but its callback has not yet been run. */ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr) override; + const BatonHandle& baton = nullptr) override; /** * Not implemented. */ - Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton = nullptr) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; bool onNetworkThread() override; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 512b3fee58b..5ca24991e8c 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -173,7 +173,7 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } @@ -280,14 +280,19 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (baton) { // If we have a baton, we want to get back to the baton thread immediately after we get a // connection - std::move(connFuture).getAsync([ - baton, - rw = std::move(remainingWork) - ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { - baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable { - std::move(rw)(std::move(swConn)); + std::move(connFuture) + .getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + baton->schedule([ rw = std::move(rw), + swConn = std::move(swConn) ](OperationContext * opCtx) mutable { + if (opCtx) { + std::move(rw)(std::move(swConn)); + } else { + std::move(rw)(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, failing operation")); + } + }); }); - }); } else { // otherwise we're happy to run inline std::move(connFuture) @@ -306,7 +311,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { conn->indicateSuccess(); return future; @@ -416,7 +421,7 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH } void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { @@ -445,19 +450,13 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan } } -Status NetworkInterfaceTL::setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) { +Status NetworkInterfaceTL::setAlarm(Date_t when, unique_function<void()> action) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } if (when <= now()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } + _reactor->schedule(transport::Reactor::kPost, std::move(action)); return Status::OK(); } @@ -469,39 +468,34 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, _inProgressAlarms.insert(alarmTimer); } - alarmTimer->waitUntil(when, baton) - .getAsync( - [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when - << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action), baton); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); - } + alarmTimer->waitUntil(when, nullptr) + .getAsync([ this, weakTimer, action = std::move(action), when ](Status status) mutable { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); + } - return; + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action)); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); } - if (status.isOK()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + return; + } + + if (status.isOK()) { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 0b0c99cd03e..5cd5dd6115b 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -68,13 +68,11 @@ public: Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) override; + const BatonHandle& baton) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton) override; - Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) override; + const BatonHandle& baton) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; bool onNetworkThread() override; @@ -117,7 +115,7 @@ private: Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, - const transport::BatonHandle& baton); + const BatonHandle& baton); std::string _instanceName; ServiceContext* _svcCtx; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 1e3051e17b4..37c2669f306 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -239,7 +239,7 @@ public: virtual StatusWith<CallbackHandle> scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 1af20aa8692..4800de47db1 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -67,14 +67,14 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState public: static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return std::make_shared<CallbackState>(std::move(cb), readyDate, baton); } /** * Do not call directly. Use make. */ - CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton) + CallbackState(CallbackFn&& cb, Date_t theReadyDate, const BatonHandle& baton) : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {} virtual ~CallbackState() = default; @@ -102,7 +102,7 @@ public: bool isNetworkOperation = false; AtomicWord<bool> isFinished{false}; boost::optional<stdx::condition_variable> finishedCondition; - transport::BatonHandle baton; + BatonHandle baton; }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -350,21 +350,23 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( return cbHandle; } lk.unlock(); - _net->setAlarm(when, - [this, cbHandle] { - auto cbState = - checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); - if (cbState->canceled.load()) { - return; - } - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (cbState->canceled.load()) { - return; - } - scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); - }, - nullptr) - .transitional_ignore(); + + auto status = _net->setAlarm(when, [this, cbHandle] { + auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); + if (cbState->canceled.load()) { + return; + } + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (cbState->canceled.load()) { + return; + } + scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); + }); + + if (!status.isOK()) { + cancel(cbHandle.getValue()); + return status; + } return cbHandle; } @@ -406,7 +408,7 @@ const auto initialSyncPauseCmds = StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (MONGO_FAIL_POINT(initialSyncFuzzerSynchronizationPoint1)) { // We are only going to pause on these failpoints if the command issued is for the @@ -537,7 +539,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback } ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue( - CallbackFn work, const transport::BatonHandle& baton, Date_t when) { + CallbackFn work, const BatonHandle& baton, Date_t when) { WorkQueue result; result.emplace_front(CallbackState::make(std::move(work), when, baton)); result.front()->iter = result.begin(); @@ -592,7 +594,17 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, for (const auto& cbState : todo) { if (cbState->baton) { - cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); }); + cbState->baton->schedule([this, cbState](OperationContext* opCtx) { + if (opCtx) { + runCallback(std::move(cbState)); + return; + } + + cbState->canceled.store(1); + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress); + }); } else { const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 1558eaf7f01..1832a021d2c 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -74,7 +74,7 @@ public: void startup() override; void shutdown() override; void join() override; - void appendDiagnosticBSON(BSONObjBuilder* b) const; + void appendDiagnosticBSON(BSONObjBuilder* b) const override; Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; @@ -85,10 +85,9 @@ public: void waitForEvent(const EventHandle& event) override; StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; @@ -138,7 +137,7 @@ private: * called outside of _mutex. */ static WorkQueue makeSingletonWorkQueue(CallbackFn work, - const transport::BatonHandle& baton, + const BatonHandle& baton, Date_t when = {}); /** |