summaryrefslogtreecommitdiff
path: root/src/mongo/transport/baton_asio_linux.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/baton_asio_linux.h')
-rw-r--r--src/mongo/transport/baton_asio_linux.h111
1 files changed, 61 insertions, 50 deletions
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 40829bc43e7..8f18707c12a 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -54,6 +54,23 @@ namespace transport {
* We implement our networking reactor on top of poll + eventfd for wakeups
*/
class TransportLayerASIO::BatonASIO : public Baton {
+ /**
+ * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for
+ * ::poll).
+ *
+ * Its methods are all unreachable because we never actually use its timer-ness (we just need
+ * its address for baton book keeping).
+ */
+ class InternalReactorTimer : public ReactorTimer {
+ public:
+ void cancel(const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+ };
/**
* RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the
@@ -138,10 +155,6 @@ public:
return std::move(pf.future);
}
- Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) override {
- return waitUntil(timer, Date_t::now() + timeout);
- }
-
Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
auto pf = makePromiseFuture<void>();
_safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] {
@@ -202,7 +215,33 @@ public:
}
}
- bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) override {
+ void notify() noexcept override {
+ schedule([] {});
+ }
+
+ /**
+ * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we
+ * create a regular waitUntil baton event off the timer, with the passed deadline).
+ */
+ Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept override {
+ InternalReactorTimer irt;
+ auto future = waitUntil(irt, deadline);
+
+ run(clkSource);
+
+ // If the future is ready our timer has fired, in which case we timed out
+ if (future.isReady()) {
+ future.get();
+
+ return Waitable::TimeoutState::Timeout;
+ } else {
+ cancelTimer(irt);
+
+ return Waitable::TimeoutState::NoTimeout;
+ }
+ }
+
+ void run(ClockSource* clkSource) noexcept override {
std::vector<SharedPromise<void>> toFulfill;
// We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks
@@ -227,44 +266,18 @@ public:
}
});
- // Note that it's important to check for interrupt without the lock, because markKilled
- // calls schedule, which will deadlock if we're holding the lock when calling this.
- if (opCtx) {
- opCtx->checkForInterrupt();
- }
-
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (opCtx) {
- invariant(opCtx == _opCtx);
- }
-
- auto now = Date_t::now();
-
- // If our deadline has passed, return that we've already failed
- if (deadline && *deadline <= now) {
- return false;
- }
// If anything was scheduled, run it now. No need to poll
if (_scheduled.size()) {
- return true;
+ return;
}
- boost::optional<Milliseconds> timeout;
+ boost::optional<Date_t> deadline;
// If we have a timer, poll no longer than that
if (_timers.size()) {
- timeout = _timers.begin()->expiration - now;
- }
-
- if (deadline) {
- auto deadlineTimeout = *deadline - now;
-
- // If we didn't have a timer with a deadline, or our deadline is sooner than that
- // timer
- if (!timeout || (deadlineTimeout < *timeout)) {
- timeout = deadlineTimeout;
- }
+ deadline = _timers.begin()->expiration;
}
std::vector<decltype(_sessions)::iterator> sessions;
@@ -282,13 +295,22 @@ public:
sessions.push_back(iter);
}
+ auto now = clkSource->now();
+
int rval = 0;
// If we don't have a timeout, or we have a timeout that's unexpired, run poll.
- if (!timeout || (*timeout > Milliseconds(0))) {
+ if (!deadline || (*deadline > now)) {
+ if (deadline && !clkSource->tracksSystemClock()) {
+ invariant(clkSource->setAlarm(*deadline, [this] { notify(); }));
+
+ deadline.reset();
+ }
+
_inPoll = true;
lk.unlock();
- rval =
- ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count());
+ rval = ::poll(pollSet.data(),
+ pollSet.size(),
+ deadline ? Milliseconds(*deadline - now).count() : -1);
const auto pollGuard = MakeGuard([&] {
lk.lock();
@@ -300,20 +322,9 @@ public:
severe() << "error in poll: " << errnoWithDescription(errno);
fassertFailed(50834);
}
-
- // Note that it's important to check for interrupt without the lock, because markKilled
- // calls schedule, which will deadlock if we're holding the lock when calling this.
- if (opCtx) {
- opCtx->checkForInterrupt();
- }
}
- now = Date_t::now();
-
- // If our deadline passed while in poll, we've failed
- if (deadline && now > *deadline) {
- return false;
- }
+ now = clkSource->now();
// Fire expired timers
for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) {
@@ -348,7 +359,7 @@ public:
invariant(remaining == 0);
}
- return true;
+ return;
}
private: