diff options
Diffstat (limited to 'src/mongo/transport/baton_asio_linux.h')
-rw-r--r-- | src/mongo/transport/baton_asio_linux.h | 111 |
1 files changed, 61 insertions, 50 deletions
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 40829bc43e7..8f18707c12a 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -54,6 +54,23 @@ namespace transport { * We implement our networking reactor on top of poll + eventfd for wakeups */ class TransportLayerASIO::BatonASIO : public Baton { + /** + * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for + * ::poll). + * + * Its methods are all unreachable because we never actually use its timer-ness (we just need + * its address for baton book keeping). + */ + class InternalReactorTimer : public ReactorTimer { + public: + void cancel(const BatonHandle& baton = nullptr) override { + MONGO_UNREACHABLE; + } + + Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override { + MONGO_UNREACHABLE; + } + }; /** * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the @@ -138,10 +155,6 @@ public: return std::move(pf.future); } - Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) override { - return waitUntil(timer, Date_t::now() + timeout); - } - Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { auto pf = makePromiseFuture<void>(); _safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] { @@ -202,7 +215,33 @@ public: } } - bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) override { + void notify() noexcept override { + schedule([] {}); + } + + /** + * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we + * create a regular waitUntil baton event off the timer, with the passed deadline). + */ + Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept override { + InternalReactorTimer irt; + auto future = waitUntil(irt, deadline); + + run(clkSource); + + // If the future is ready our timer has fired, in which case we timed out + if (future.isReady()) { + future.get(); + + return Waitable::TimeoutState::Timeout; + } else { + cancelTimer(irt); + + return Waitable::TimeoutState::NoTimeout; + } + } + + void run(ClockSource* clkSource) noexcept override { std::vector<SharedPromise<void>> toFulfill; // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks @@ -227,44 +266,18 @@ public: } }); - // Note that it's important to check for interrupt without the lock, because markKilled - // calls schedule, which will deadlock if we're holding the lock when calling this. - if (opCtx) { - opCtx->checkForInterrupt(); - } - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (opCtx) { - invariant(opCtx == _opCtx); - } - - auto now = Date_t::now(); - - // If our deadline has passed, return that we've already failed - if (deadline && *deadline <= now) { - return false; - } // If anything was scheduled, run it now. No need to poll if (_scheduled.size()) { - return true; + return; } - boost::optional<Milliseconds> timeout; + boost::optional<Date_t> deadline; // If we have a timer, poll no longer than that if (_timers.size()) { - timeout = _timers.begin()->expiration - now; - } - - if (deadline) { - auto deadlineTimeout = *deadline - now; - - // If we didn't have a timer with a deadline, or our deadline is sooner than that - // timer - if (!timeout || (deadlineTimeout < *timeout)) { - timeout = deadlineTimeout; - } + deadline = _timers.begin()->expiration; } std::vector<decltype(_sessions)::iterator> sessions; @@ -282,13 +295,22 @@ public: sessions.push_back(iter); } + auto now = clkSource->now(); + int rval = 0; // If we don't have a timeout, or we have a timeout that's unexpired, run poll. - if (!timeout || (*timeout > Milliseconds(0))) { + if (!deadline || (*deadline > now)) { + if (deadline && !clkSource->tracksSystemClock()) { + invariant(clkSource->setAlarm(*deadline, [this] { notify(); })); + + deadline.reset(); + } + _inPoll = true; lk.unlock(); - rval = - ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count()); + rval = ::poll(pollSet.data(), + pollSet.size(), + deadline ? Milliseconds(*deadline - now).count() : -1); const auto pollGuard = MakeGuard([&] { lk.lock(); @@ -300,20 +322,9 @@ public: severe() << "error in poll: " << errnoWithDescription(errno); fassertFailed(50834); } - - // Note that it's important to check for interrupt without the lock, because markKilled - // calls schedule, which will deadlock if we're holding the lock when calling this. - if (opCtx) { - opCtx->checkForInterrupt(); - } } - now = Date_t::now(); - - // If our deadline passed while in poll, we've failed - if (deadline && now > *deadline) { - return false; - } + now = clkSource->now(); // Fire expired timers for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) { @@ -348,7 +359,7 @@ public: invariant(remaining == 0); } - return true; + return; } private: |