summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2022-02-15 18:02:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-15 20:16:44 +0000
commit4850d93a16dde8ae2659d6cd02a90434127e9d93 (patch)
tree6a72e461ee4bdf74c77a0ad48c264ab6226e0d5d /src/mongo/transport
parent9e3c9529f0162e2ad8a36ff56a842fe73143b7bb (diff)
downloadmongo-4850d93a16dde8ae2659d6cd02a90434127e9d93.tar.gz
SERVER-63282 Refactor and simplify `BatonASIO`
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/baton_asio_linux.cpp390
-rw-r--r--src/mongo/transport/baton_asio_linux.h146
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp117
3 files changed, 370 insertions, 283 deletions
diff --git a/src/mongo/transport/baton_asio_linux.cpp b/src/mongo/transport/baton_asio_linux.cpp
index 50e46601102..cc3106a812a 100644
--- a/src/mongo/transport/baton_asio_linux.cpp
+++ b/src/mongo/transport/baton_asio_linux.cpp
@@ -29,9 +29,13 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
+#include <sys/eventfd.h>
+
#include "mongo/transport/baton_asio_linux.h"
#include "mongo/base/checked_cast.h"
+#include "mongo/base/error_codes.h"
+#include "mongo/base/status.h"
#include "mongo/db/operation_context.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
@@ -41,11 +45,6 @@
namespace mongo {
namespace transport {
-
-const Client::Decoration<TransportLayerASIO::BatonASIO::EventFDHolder>
- TransportLayerASIO::BatonASIO::EventFDHolder::getForClient =
- Client::declareDecoration<TransportLayerASIO::BatonASIO::EventFDHolder>();
-
namespace {
MONGO_FAIL_POINT_DEFINE(blockBatonASIOBeforePoll);
@@ -59,13 +58,68 @@ Status getCanceledError() {
}
/**
- * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for
- * ::poll).
- *
- * Its methods are all unreachable because we never actually use its timer-ness (we just need its
- * address for baton book keeping).
+ * RAII type that wraps up an `eventfd` and reading/writing to it.
+ * We don't use the counter portion and only use the file descriptor (i.e., `fd`) to notify and
+ * interrupt the client thread blocked polling (see `BatonASIO::run`).
+ */
+struct EventFDHolder {
+ EventFDHolder(const EventFDHolder&) = delete;
+ EventFDHolder& operator=(const EventFDHolder&) = delete;
+
+ EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) {
+ // On error, -1 is returned and `errno` is set
+ if (fd < 0) {
+ const auto savedErrno = errno;
+ const auto errorCode = savedErrno == EMFILE || savedErrno == ENFILE
+ ? ErrorCodes::TooManyFilesOpen
+ : ErrorCodes::UnknownError;
+ Status status(errorCode,
+ fmt::format("error in creating eventfd: {}, errno: {}",
+ errnoWithDescription(savedErrno),
+ savedErrno));
+ LOGV2_ERROR(6328201, "Unable to create eventfd object", "error"_attr = status);
+ iasserted(status);
+ }
+ }
+
+ ~EventFDHolder() {
+ ::close(fd);
+ }
+
+ void notify() {
+ while (::eventfd_write(fd, 1) != 0) {
+ const auto savedErrno = errno;
+ if (savedErrno == EINTR)
+ continue;
+ LOGV2_FATAL(6328202, "eventfd write failed", "fd"_attr = fd, "errno"_attr = savedErrno);
+ }
+ }
+
+ void wait() {
+ // If we have activity on the `eventfd`, pull the count out.
+ ::eventfd_t u;
+ while (::eventfd_read(fd, &u) != 0) {
+ const auto savedErrno = errno;
+ if (savedErrno == EINTR)
+ continue;
+ LOGV2_FATAL(6328203, "eventfd read failed", "fd"_attr = fd, "errno"_attr = savedErrno);
+ }
+ }
+
+ const int fd;
+};
+
+const auto getEventFDForClient = Client::declareDecoration<EventFDHolder>();
+
+EventFDHolder& efd(OperationContext* opCtx) {
+ return getEventFDForClient(opCtx->getClient());
+}
+
+/**
+ * This is only used by `run_until()`, and provides a unique timer id. This unique id is supplied by
+ * `ReactorTimer`, and used by `waitUntil()` for internal bookkeeping.
*/
-class InternalReactorTimer : public ReactorTimer {
+class DummyTimer final : public ReactorTimer {
public:
void cancel(const BatonHandle& baton = nullptr) override {
MONGO_UNREACHABLE;
@@ -79,74 +133,61 @@ public:
} // namespace
void TransportLayerASIO::BatonASIO::schedule(Task func) noexcept {
- stdx::unique_lock lk(_mutex);
-
- if (!_opCtx) {
+ auto task = [this, func = std::move(func)](stdx::unique_lock<Mutex> lk) mutable {
+ auto status = _opCtx ? Status::OK() : getDetachedError();
lk.unlock();
- func(getDetachedError());
+ func(std::move(status));
+ };
+ stdx::unique_lock lk(_mutex);
+ if (!_opCtx) {
+ // Run the task inline if the baton is detached.
+ task(std::move(lk));
return;
}
- _scheduled.push_back(
- [ this, func = std::move(func) ](stdx::unique_lock<Mutex> lk) mutable noexcept {
- auto status = Status::OK();
- if (!_opCtx) {
- status = getDetachedError();
- }
- lk.unlock();
-
- func(status);
- });
-
- if (_inPoll) {
- _efd().notify();
- }
+ _scheduled.push_back(std::move(task));
+ if (_inPoll)
+ notify();
}
void TransportLayerASIO::BatonASIO::notify() noexcept {
- _efd().notify();
+ efd(_opCtx).notify();
}
-/**
- * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we create
- * a regular waitUntil baton event off the timer, with the passed deadline).
- */
Waitable::TimeoutState TransportLayerASIO::BatonASIO::run_until(ClockSource* clkSource,
Date_t deadline) noexcept {
- InternalReactorTimer irt;
- auto future = waitUntil(irt, deadline);
+ // Set up a timer on the baton with the specified deadline. This synthetic timer is used by
+ // `_poll()`, which is called through `run()`, to enforce a deadline for the blocking `::poll`.
+ DummyTimer timer;
+ auto future = waitUntil(timer, deadline);
run(clkSource);
- // If the future is ready our timer has fired, in which case we timed out
+ // If the future is ready, our timer interrupted `run()`, in which case we timed out.
if (future.isReady()) {
future.get();
-
return Waitable::TimeoutState::Timeout;
} else {
- cancelTimer(irt);
-
+ cancelTimer(timer);
return Waitable::TimeoutState::NoTimeout;
}
}
void TransportLayerASIO::BatonASIO::run(ClockSource* clkSource) noexcept {
- std::vector<Promise<void>> toFulfill;
-
- // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks
+ // On the way out, fulfill promises and run scheduled jobs without holding the lock.
+ std::list<Promise<void>> toFulfill;
const ScopeGuard guard([&] {
for (auto& promise : toFulfill) {
promise.emplaceValue();
}
auto lk = stdx::unique_lock(_mutex);
- while (_scheduled.size()) {
+ while (!_scheduled.empty()) {
auto scheduled = std::exchange(_scheduled, {});
for (auto& job : scheduled) {
job(std::move(lk));
job = nullptr;
-
lk = stdx::unique_lock(_mutex);
}
}
@@ -154,109 +195,28 @@ void TransportLayerASIO::BatonASIO::run(ClockSource* clkSource) noexcept {
stdx::unique_lock lk(_mutex);
- // If anything was scheduled, run it now. No need to poll
- if (_scheduled.size()) {
+ // If anything was scheduled, run it now and skip polling and processing timers.
+ if (!_scheduled.empty())
return;
- }
- boost::optional<Date_t> deadline;
-
- // If we have a timer, poll no longer than that
- if (_timers.size()) {
- deadline = _timers.begin()->first;
- }
-
- _pollSessions.clear();
- _pollSet.clear();
- _pollSessions.reserve(_sessions.size());
- _pollSet.reserve(_sessions.size() + 1);
-
- _pollSet.push_back(pollfd{_efd().fd, POLLIN, 0});
-
- for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) {
- _pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0});
- _pollSessions.push_back(iter);
- }
-
- auto now = clkSource->now();
-
- int rval = 0;
- // If we don't have a timeout, or we have a timeout that's unexpired, run poll.
- if (!deadline || (*deadline > now)) {
- if (deadline && !clkSource->tracksSystemClock()) {
- invariant(
- clkSource->setAlarm(*deadline, [this, anchor = shared_from_this()] { notify(); }));
-
- deadline.reset();
- }
-
- _inPoll = true;
- lk.unlock();
- blockBatonASIOBeforePoll.pauseWhileSet();
- rval = ::poll(_pollSet.data(),
- _pollSet.size(),
- deadline ? Milliseconds(*deadline - now).count() : -1);
- auto savedErrno = errno;
- lk.lock();
- _inPoll = false;
-
- // If poll failed, it better be in EINTR
- if (rval < 0 && savedErrno != EINTR) {
- LOGV2_FATAL(50834,
- "error in poll: {error}",
- "error in poll",
- "error"_attr = errnoWithDescription(savedErrno));
- }
- }
-
- now = clkSource->now();
+ toFulfill.splice(toFulfill.end(), _poll(lk, clkSource));
// Fire expired timers
- for (auto iter = _timers.begin(); iter != _timers.end() && iter->first <= now;) {
- toFulfill.push_back(std::move(iter->second.promise));
- _timersById.erase(iter->second.id);
- iter = _timers.erase(iter);
- }
-
- // If poll found some activity
- if (rval > 0) {
- size_t remaining = rval;
-
- auto pollIter = _pollSet.begin();
-
- if (pollIter->revents) {
- _efd().wait();
-
- remaining--;
- }
-
- ++pollIter;
-
- for (auto sessionIter = _pollSessions.begin();
- sessionIter != _pollSessions.end() && remaining;
- ++sessionIter, ++pollIter) {
- if (pollIter->revents) {
- toFulfill.push_back(std::move((*sessionIter)->second.promise));
- _sessions.erase(*sessionIter);
-
- remaining--;
- }
- }
-
- invariant(remaining == 0);
+ const auto now = clkSource->now();
+ for (auto it = _timers.begin(); it != _timers.end() && it->first <= now;
+ it = _timers.erase(it)) {
+ toFulfill.push_back(std::move(it->second.promise));
+ _timersById.erase(it->second.id);
}
-
- return;
}
void TransportLayerASIO::BatonASIO::markKillOnClientDisconnect() noexcept {
- if (_opCtx->getClient() && _opCtx->getClient()->session()) {
- _addSession(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) {
- if (!s.isOK()) {
- return;
- }
-
- _opCtx->markKilled(ErrorCodes::ClientDisconnect);
+ auto client = _opCtx->getClient();
+ invariant(client);
+ if (auto session = client->session()) {
+ _addSession(*session, POLLRDHUP).getAsync([this](Status status) {
+ if (status.isOK())
+ _opCtx->markKilled(ErrorCodes::ClientDisconnect);
});
}
}
@@ -265,19 +225,15 @@ Future<void> TransportLayerASIO::BatonASIO::addSession(Session& session, Type ty
return _addSession(session, type == Type::In ? POLLIN : POLLOUT);
}
-Future<void> TransportLayerASIO::BatonASIO::waitUntil(const ReactorTimer& timer,
+Future<void> TransportLayerASIO::BatonASIO::waitUntil(const ReactorTimer& reactorTimer,
Date_t expiration) noexcept try {
auto pf = makePromiseFuture<void>();
- auto id = timer.id();
-
- stdx::unique_lock lk(_mutex);
-
- _safeExecute(std::move(lk), [ expiration, timer = Timer{id, std::move(pf.promise)},
- this ](stdx::unique_lock<Mutex>) mutable noexcept {
- auto iter = _timers.emplace(expiration, std::move(timer));
- _timersById[iter->second.id] = iter;
- });
-
+ _safeExecute(stdx::unique_lock(_mutex),
+ [this, expiration, timer = Timer{reactorTimer.id(), std::move(pf.promise)}](
+ stdx::unique_lock<Mutex>) mutable {
+ auto iter = _timers.emplace(expiration, std::move(timer));
+ _timersById[iter->second.id] = iter;
+ });
return std::move(pf.future);
} catch (const DBException& ex) {
return ex.toStatus();
@@ -287,17 +243,15 @@ bool TransportLayerASIO::BatonASIO::cancelSession(Session& session) noexcept {
const auto id = session.id();
stdx::unique_lock lk(_mutex);
-
- if (_sessions.find(id) == _sessions.end()) {
+ if (_sessions.find(id) == _sessions.end())
return false;
- }
- _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept {
+ _safeExecute(std::move(lk), [this, id](stdx::unique_lock<Mutex> lk) {
auto iter = _sessions.find(id);
- if (iter == _sessions.end()) {
+ if (iter == _sessions.end())
return;
- }
- auto session = std::exchange(iter->second, {});
+
+ TransportSession session = std::exchange(iter->second, {});
_sessions.erase(iter);
lk.unlock();
@@ -311,19 +265,15 @@ bool TransportLayerASIO::BatonASIO::cancelTimer(const ReactorTimer& timer) noexc
const auto id = timer.id();
stdx::unique_lock lk(_mutex);
-
- if (_timersById.find(id) == _timersById.end()) {
+ if (_timersById.find(id) == _timersById.end())
return false;
- }
- _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept {
+ _safeExecute(std::move(lk), [this, id](stdx::unique_lock<Mutex> lk) {
auto iter = _timersById.find(id);
-
- if (iter == _timersById.end()) {
+ if (iter == _timersById.end())
return;
- }
- auto timer = std::exchange(iter->second->second, {});
+ Timer timer = std::exchange(iter->second->second, {});
_timers.erase(iter->second);
_timersById.erase(iter);
lk.unlock();
@@ -339,22 +289,106 @@ bool TransportLayerASIO::BatonASIO::canWait() noexcept {
return _opCtx;
}
-TransportLayerASIO::BatonASIO::EventFDHolder& TransportLayerASIO::BatonASIO::_efd() {
- return EventFDHolder::getForClient(_opCtx->getClient());
+void TransportLayerASIO::BatonASIO::_safeExecute(stdx::unique_lock<Mutex> lk,
+ TransportLayerASIO::BatonASIO::Job job) {
+ if (!_opCtx) {
+ // If we're detached, no job can safely execute.
+ iasserted(getDetachedError());
+ }
+
+ if (_inPoll) {
+ _scheduled.push_back(std::move(job));
+ notify();
+ } else {
+ job(std::move(lk));
+ }
}
-Future<void> TransportLayerASIO::BatonASIO::_addSession(Session& session, short type) noexcept try {
- auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
- auto id = session.id();
- auto pf = makePromiseFuture<void>();
+std::list<Promise<void>> TransportLayerASIO::BatonASIO::_poll(stdx::unique_lock<Mutex>& lk,
+ ClockSource* clkSource) {
+ const auto now = clkSource->now();
- stdx::unique_lock lk(_mutex);
+ // If we have a timer, then use it to enforce a timeout for polling.
+ boost::optional<Date_t> deadline;
+ if (!_timers.empty()) {
+ deadline = _timers.begin()->first;
+
+ // Don't poll if we have already passed the deadline.
+ if (*deadline <= now)
+ return {};
+ }
+
+ if (deadline && !clkSource->tracksSystemClock()) {
+ // The clock source and `::poll` may track time differently, so use the clock source to
+ // enforce the timeout.
+ invariant(clkSource->setAlarm(*deadline, [self = shared_from_this()] { self->notify(); }));
+ deadline.reset();
+ }
+
+ _pollSet.clear();
+ _pollSet.reserve(_sessions.size() + 1);
+ _pollSet.push_back({efd(_opCtx).fd, POLLIN, 0});
+
+ _pollSessions.clear();
+ _pollSessions.reserve(_sessions.size());
+
+ for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) {
+ _pollSet.push_back({iter->second.fd, iter->second.events, 0});
+ _pollSessions.push_back(iter);
+ }
- _safeExecute(std::move(lk),
- [ id, session = TransportSession{fd, type, std::move(pf.promise)},
- this ](stdx::unique_lock<Mutex>) mutable noexcept {
- auto ret = _sessions.emplace(id, std::move(session));
- invariant(ret.second);
+ int events = [&] {
+ _inPoll = true;
+ lk.unlock();
+
+ const ScopeGuard guard([&] {
+ lk.lock();
+ _inPoll = false;
+ });
+
+ blockBatonASIOBeforePoll.pauseWhileSet();
+ int timeout = deadline ? Milliseconds(*deadline - now).count() : -1;
+ int events = ::poll(_pollSet.data(), _pollSet.size(), timeout);
+ const auto savedErrno = errno;
+ if (events < 0 && savedErrno != EINTR) {
+ LOGV2_FATAL(50834, "error in poll", "error"_attr = errnoWithDescription(savedErrno));
+ }
+ return events;
+ }();
+
+ if (events <= 0)
+ return {}; // Polling was timed out or interrupted.
+
+ auto psit = _pollSet.begin();
+ // Consume the notification on the `eventfd` object if there is any.
+ if (psit->revents) {
+ efd(_opCtx).wait();
+ --events;
+ }
+ ++psit;
+
+ std::list<Promise<void>> promises;
+ for (auto sit = _pollSessions.begin(); events && sit != _pollSessions.end(); ++sit, ++psit) {
+ if (psit->revents) {
+ promises.push_back(std::move((*sit)->second.promise));
+ _sessions.erase(*sit);
+ --events;
+ }
+ }
+
+ invariant(events == 0, "Failed to process all events after going through registered sessions");
+ return promises;
+}
+
+Future<void> TransportLayerASIO::BatonASIO::_addSession(Session& session, short events) try {
+ auto pf = makePromiseFuture<void>();
+ TransportSession ts{checked_cast<ASIOSession&>(session).getSocket().native_handle(),
+ events,
+ std::move(pf.promise)};
+ _safeExecute(stdx::unique_lock(_mutex),
+ [this, id = session.id(), ts = std::move(ts)](stdx::unique_lock<Mutex>) mutable {
+ invariant(_sessions.emplace(id, std::move(ts)).second,
+ "Adding session to baton failed");
});
return std::move(pf.future);
} catch (const DBException& ex) {
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index f6401231f72..f554c43668e 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -29,15 +29,13 @@
#pragma once
+#include <list>
#include <map>
#include <memory>
#include <vector>
#include <poll.h>
-#include <sys/eventfd.h>
-#include "mongo/base/error_codes.h"
-#include "mongo/base/status.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/baton.h"
@@ -56,60 +54,6 @@ namespace transport {
* We implement our networking reactor on top of poll + eventfd for wakeups
*/
class TransportLayerASIO::BatonASIO : public NetworkingBaton {
- /**
- * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the
- * counter portion, just the notify/wakeup
- */
- struct EventFDHolder {
- EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) {
- if (fd < 0) {
- auto savedErrno = errno;
- std::string reason = str::stream()
- << "error in creating eventfd: " << errnoWithDescription(savedErrno);
-
- auto code = (savedErrno == EMFILE || savedErrno == ENFILE)
- ? ErrorCodes::TooManyFilesOpen
- : ErrorCodes::UnknownError;
-
- uasserted(code, reason);
- }
- }
-
- ~EventFDHolder() {
- ::close(fd);
- }
-
- EventFDHolder(const EventFDHolder&) = delete;
- EventFDHolder& operator=(const EventFDHolder&) = delete;
-
- // Writes to the underlying eventfd
- void notify() {
- while (true) {
- if (::eventfd_write(fd, 1) == 0) {
- break;
- }
-
- invariant(errno == EINTR);
- }
- }
-
- void wait() {
- while (true) {
- // If we have activity on the eventfd, pull the count out
- uint64_t u;
- if (::eventfd_read(fd, &u) == 0) {
- break;
- }
-
- invariant(errno == EINTR);
- }
- }
-
- const int fd;
-
- static const Client::Decoration<EventFDHolder> getForClient;
- };
-
public:
BatonASIO(OperationContext* opCtx) : _opCtx(opCtx) {}
@@ -146,50 +90,50 @@ public:
private:
struct Timer {
- size_t id;
- Promise<void> promise; // Needs to be mutable to move from it while in std::set.
+ size_t id; // Stores the unique identifier for the timer, provided by `ReactorTimer`.
+ Promise<void> promise;
};
struct TransportSession {
int fd;
- short type;
+ short events; // Events to consider while polling for this session (e.g., `POLLIN`).
Promise<void> promise;
};
- // Internally, the BatonASIO thinks in terms of synchronized units of work. This is because
- // a Baton effectively represents a green thread with the potential to add or remove work (i.e.
- // Jobs) at any time. Jobs with external notifications (OutOfLineExecutor::Tasks,
- // TransportSession:promise, ReactorTimer::promise) are expected to release their lock before
- // generating those notifications.
+ /*
+ * Internally, `BatonASIO` thinks in terms of synchronized units of work. This is because a
+ * baton effectively represents a green thread with the potential to add or remove work (i.e.,
+ * jobs) at any time. Thus, scheduled jobs must release their lock before executing any task
+ * external to the baton (e.g., `OutOfLineExecutor::Task`, `TransportSession:promise`, and
+ * `ReactorTimer::promise`).
+ */
using Job = unique_function<void(stdx::unique_lock<Mutex>)>;
/**
- * Invoke a job with exclusive access to the Baton internals.
+ * Invokes a job with exclusive access to the baton's internals.
+ *
+ * If the baton is currently polling (i.e., `_inPoll` is `true`), the polling thread owns the
+ * baton, so we schedule the job and notify the polling thread to wake up and run the job.
+ *
+ * Otherwise, take exclusive access and run the job on the current thread.
+ *
+ * Note that `_safeExecute()` will throw if the baton has been detached.
*
- * If we are currently _inPoll, the polling thread owns the Baton and thus we tell it to wake up
- * and run our job. If we are not _inPoll, take exclusive access and run our job on the local
- * thread. Note that _safeExecute() will throw if the Baton has been detached.
+ * Also note that the job may not run inline, and may get scheduled to run by the baton, so it
+ * should never throw.
*/
- TEMPLATE(typename Callback)
- REQUIRES(std::is_nothrow_invocable_v<Callback, stdx::unique_lock<Mutex>>)
- void _safeExecute(stdx::unique_lock<Mutex> lk, Callback&& job) {
- if (!_opCtx) {
- // If we're detached, no job can safely execute.
- uassertStatusOK({ErrorCodes::ShutdownInProgress, "Baton detached"});
- }
-
- if (_inPoll) {
- _scheduled.push_back(std::forward<Callback>(job));
-
- _efd().notify();
- } else {
- job(std::move(lk));
- }
- }
+ void _safeExecute(stdx::unique_lock<Mutex> lk, Job job);
- EventFDHolder& _efd();
+ /**
+ * Blocks polling on the registered sessions until one of the following happens:
+ * - `notify()` is called, either directly or through other methods (e.g., `schedule()`).
+ * - One of the timers scheduled on this baton times out.
+ * - There is an event for at least one of the registered sessions (e.g., data is available).
+ * Returns the list of promises that must be fulfilled as the result of polling.
+ */
+ std::list<Promise<void>> _poll(stdx::unique_lock<Mutex>&, ClockSource*);
- Future<void> _addSession(Session& session, short type) noexcept;
+ Future<void> _addSession(Session& session, short events);
void detachImpl() noexcept override;
@@ -199,25 +143,27 @@ private:
bool _inPoll = false;
- // This map stores the sessions we need to poll on. We unwind it into a pollset for every
- // blocking call to run
+ // Stores the sessions we need to poll on.
stdx::unordered_map<SessionId, TransportSession> _sessions;
- // The set is used to find the next timer which will fire. The unordered_map looks up the
- // timers so we can remove them in O(1)
+ /**
+ * We use two structures to maintain timers:
+ * - `_timers` keeps a sorted list of timers according to their expiration date.
+ * - `_timersById` allows using the unique timer id to find and cancel a timer in constant time.
+ */
std::multimap<Date_t, Timer> _timers;
- stdx::unordered_map<size_t, decltype(_timers)::iterator> _timersById;
+ stdx::unordered_map<size_t, std::multimap<Date_t, Timer>::iterator> _timersById;
- // For tasks that come in via schedule. Or that were deferred because we were in poll
+ // Tasks scheduled for deferred execution.
std::vector<Job> _scheduled;
- // We hold the two following values at the object level to save on allocations when a baton is
- // waited on many times over the course of its lifetime.
-
- // Holds the pollset for ::poll
- std::vector<pollfd> _pollSet;
-
- // Mirrors the above pollset with mappings back to _sessions
+ /*
+ * We hold the following values at the object level to save on allocations when a baton is
+ * waited on many times over the course of its lifetime:
+ * `_pollSet`: the poll set for `::poll`.
+ * `_pollSessions`: maps members of `_pollSet` to their corresponding session in `_sessions`.
+ */
+ std::vector<::pollfd> _pollSet;
std::vector<decltype(_sessions)::iterator> _pollSessions;
};
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index a3e1c850e16..daf1cd1a256 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -62,6 +62,7 @@
#include "mongo/util/synchronized_value.h"
#include "mongo/util/thread_context.h"
#include "mongo/util/time_support.h"
+#include "mongo/util/waitable.h"
namespace mongo {
namespace {
@@ -788,6 +789,19 @@ class BatonASIOLinuxTest : public LockerNoopServiceContextTest {
Promise<transport::SessionHandle> _promise;
};
+ // Used for setting and canceling timers on the networking baton. Does not offer any timer
+ // functionality, and is only used for its unique id.
+ class DummyTimer final : public transport::ReactorTimer {
+ public:
+ void cancel(const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+ };
+
public:
void setUp() override {
auto pf = makePromiseFuture<transport::SessionHandle>();
@@ -815,6 +829,10 @@ public:
return *_connThread;
}
+ std::unique_ptr<transport::ReactorTimer> makeDummyTimer() const {
+ return std::make_unique<DummyTimer>();
+ }
+
private:
ServiceContext::UniqueClient _client;
std::unique_ptr<ConnectionThread> _connThread;
@@ -897,7 +915,7 @@ TEST_F(BatonASIOLinuxTest, AddAndRemoveSessionWhileInPoll) {
auto opCtx = client().makeOperationContext();
Notification<bool> cancelSessionResult;
- stdx::thread thread([&] {
+ JoinThread thread([&] {
auto baton = opCtx->getBaton()->networking();
auto session = client().session();
@@ -910,14 +928,103 @@ TEST_F(BatonASIOLinuxTest, AddAndRemoveSessionWhileInPoll) {
cancelSessionResult.set(baton->cancelSession(*session));
});
- ScopeGuard joinGuard = [&] { thread.join(); };
-
// TODO SERVER-61192 Change the following to `ASSERT_TRUE` once the underlying issue is fixed.
ASSERT_FALSE(cancelSessionResult.get(opCtx.get()));
}
-// TODO SERVER-61192 Test setting and canceling timers on the baton.
-// TODO SERVER-61192 Test `run`, `notify`, and `run_until` on `BatonASIO`.
+TEST_F(BatonASIOLinuxTest, WaitAndNotify) {
+ // Exercises the underlying `wait` and `notify` functionality through `BatonASIO::run` and
+ // `BatonASIO::schedule`, respectively. Here is how this is done:
+ // 1) The main thread starts polling (from inside `run`) when waiting on the notification.
+ // 2) Once the main thread is ready to poll, `thread` notifies it through `baton->schedule`.
+ // 3) `schedule` calls into `notify` internally, which should interrupt the polling.
+ // 4) Once polling is interrupted, `baton` runs the scheduled job and sets the notification.
+ auto opCtx = client().makeOperationContext();
+
+ Notification<void> notification;
+ JoinThread thread([&] {
+ auto baton = opCtx->getBaton()->networking();
+ FailPointEnableBlock fp("blockBatonASIOBeforePoll");
+ fp->waitForTimesEntered(1);
+ baton->schedule([&](Status) { notification.set(); });
+ });
+
+ notification.get(opCtx.get());
+}
+
+void blockIfBatonPolls(Client& client,
+ std::function<void(const BatonHandle&, Notification<void>&)> modifyBaton) {
+ Notification<void> notification;
+ auto opCtx = client.makeOperationContext();
+
+ FailPointEnableBlock fp("blockBatonASIOBeforePoll");
+
+ modifyBaton(opCtx->getBaton(), notification);
+
+ // This will internally call into `BatonASIO::run()`, which will block forever (since the
+ // failpoint is enabled) if the baton starts polling.
+ notification.get(opCtx.get());
+}
+
+TEST_F(BatonASIOLinuxTest, BatonWithPendingTasksNeverPolls) {
+ blockIfBatonPolls(client(), [](const BatonHandle& baton, Notification<void>& notification) {
+ baton->schedule([&](Status) { notification.set(); });
+ });
+}
+
+TEST_F(BatonASIOLinuxTest, BatonWithAnExpiredTimerNeverPolls) {
+ auto timer = makeDummyTimer();
+ blockIfBatonPolls(client(), [&](const BatonHandle& baton, Notification<void>& notification) {
+ // Batons use the precise clock source internally. We use the current time (i.e., `now()`)
+ // as the deadline to schedule an expired timer on the baton.
+ auto clkSource = getServiceContext()->getPreciseClockSource();
+ baton->networking()->waitUntil(*timer, clkSource->now()).getAsync([&](Status) {
+ notification.set();
+ });
+ });
+}
+
+TEST_F(BatonASIOLinuxTest, NotifyInterruptsRunUntilBeforeTimeout) {
+ auto opCtx = client().makeOperationContext();
+ JoinThread thread([&] {
+ auto baton = opCtx->getBaton();
+ FailPointEnableBlock fp("blockBatonASIOBeforePoll");
+ fp->waitForTimesEntered(1);
+ baton->notify();
+ });
+
+ auto clkSource = getServiceContext()->getPreciseClockSource();
+ const auto state = opCtx->getBaton()->run_until(clkSource, Date_t::max());
+ ASSERT(state == Waitable::TimeoutState::NoTimeout);
+}
+
+TEST_F(BatonASIOLinuxTest, RunUntilProperlyTimesout) {
+ auto opCtx = client().makeOperationContext();
+ auto clkSource = getServiceContext()->getPreciseClockSource();
+ const auto state = opCtx->getBaton()->run_until(clkSource, clkSource->now() + Milliseconds(1));
+ ASSERT(state == Waitable::TimeoutState::Timeout);
+}
+
+TEST_F(BatonASIOLinuxTest, AddAndRemoveTimerWhileInPoll) {
+ auto opCtx = client().makeOperationContext();
+ Notification<bool> cancelTimerResult;
+
+ JoinThread thread([&] {
+ auto baton = opCtx->getBaton()->networking();
+
+ FailPointEnableBlock fp("blockBatonASIOBeforePoll");
+ fp->waitForTimesEntered(1);
+
+ // This thread is an external observer to the baton, so the expected behavior is for
+ // `cancelTimer` to happen after `waitUntil`, thus canceling the timer must return `true`.
+ auto timer = makeDummyTimer();
+ baton->waitUntil(*timer, Date_t::max()).getAsync([](Status) {});
+ cancelTimerResult.set(baton->cancelTimer(*timer));
+ });
+
+ // TODO SERVER-61192 Change the following to `ASSERT_TRUE` once the underlying issue is fixed.
+ ASSERT_FALSE(cancelTimerResult.get(opCtx.get()));
+}
#endif // __linux__