diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2022-02-15 18:02:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-15 20:16:44 +0000 |
commit | 4850d93a16dde8ae2659d6cd02a90434127e9d93 (patch) | |
tree | 6a72e461ee4bdf74c77a0ad48c264ab6226e0d5d /src/mongo/transport | |
parent | 9e3c9529f0162e2ad8a36ff56a842fe73143b7bb (diff) | |
download | mongo-4850d93a16dde8ae2659d6cd02a90434127e9d93.tar.gz |
SERVER-63282 Refactor and simplify `BatonASIO`
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/baton_asio_linux.cpp | 390 | ||||
-rw-r--r-- | src/mongo/transport/baton_asio_linux.h | 146 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_test.cpp | 117 |
3 files changed, 370 insertions, 283 deletions
diff --git a/src/mongo/transport/baton_asio_linux.cpp b/src/mongo/transport/baton_asio_linux.cpp index 50e46601102..cc3106a812a 100644 --- a/src/mongo/transport/baton_asio_linux.cpp +++ b/src/mongo/transport/baton_asio_linux.cpp @@ -29,9 +29,13 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork +#include <sys/eventfd.h> + #include "mongo/transport/baton_asio_linux.h" #include "mongo/base/checked_cast.h" +#include "mongo/base/error_codes.h" +#include "mongo/base/status.h" #include "mongo/db/operation_context.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" @@ -41,11 +45,6 @@ namespace mongo { namespace transport { - -const Client::Decoration<TransportLayerASIO::BatonASIO::EventFDHolder> - TransportLayerASIO::BatonASIO::EventFDHolder::getForClient = - Client::declareDecoration<TransportLayerASIO::BatonASIO::EventFDHolder>(); - namespace { MONGO_FAIL_POINT_DEFINE(blockBatonASIOBeforePoll); @@ -59,13 +58,68 @@ Status getCanceledError() { } /** - * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for - * ::poll). - * - * Its methods are all unreachable because we never actually use its timer-ness (we just need its - * address for baton book keeping). + * RAII type that wraps up an `eventfd` and reading/writing to it. + * We don't use the counter portion and only use the file descriptor (i.e., `fd`) to notify and + * interrupt the client thread blocked polling (see `BatonASIO::run`). + */ +struct EventFDHolder { + EventFDHolder(const EventFDHolder&) = delete; + EventFDHolder& operator=(const EventFDHolder&) = delete; + + EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) { + // On error, -1 is returned and `errno` is set + if (fd < 0) { + const auto savedErrno = errno; + const auto errorCode = savedErrno == EMFILE || savedErrno == ENFILE + ? ErrorCodes::TooManyFilesOpen + : ErrorCodes::UnknownError; + Status status(errorCode, + fmt::format("error in creating eventfd: {}, errno: {}", + errnoWithDescription(savedErrno), + savedErrno)); + LOGV2_ERROR(6328201, "Unable to create eventfd object", "error"_attr = status); + iasserted(status); + } + } + + ~EventFDHolder() { + ::close(fd); + } + + void notify() { + while (::eventfd_write(fd, 1) != 0) { + const auto savedErrno = errno; + if (savedErrno == EINTR) + continue; + LOGV2_FATAL(6328202, "eventfd write failed", "fd"_attr = fd, "errno"_attr = savedErrno); + } + } + + void wait() { + // If we have activity on the `eventfd`, pull the count out. + ::eventfd_t u; + while (::eventfd_read(fd, &u) != 0) { + const auto savedErrno = errno; + if (savedErrno == EINTR) + continue; + LOGV2_FATAL(6328203, "eventfd read failed", "fd"_attr = fd, "errno"_attr = savedErrno); + } + } + + const int fd; +}; + +const auto getEventFDForClient = Client::declareDecoration<EventFDHolder>(); + +EventFDHolder& efd(OperationContext* opCtx) { + return getEventFDForClient(opCtx->getClient()); +} + +/** + * This is only used by `run_until()`, and provides a unique timer id. This unique id is supplied by + * `ReactorTimer`, and used by `waitUntil()` for internal bookkeeping. */ -class InternalReactorTimer : public ReactorTimer { +class DummyTimer final : public ReactorTimer { public: void cancel(const BatonHandle& baton = nullptr) override { MONGO_UNREACHABLE; @@ -79,74 +133,61 @@ public: } // namespace void TransportLayerASIO::BatonASIO::schedule(Task func) noexcept { - stdx::unique_lock lk(_mutex); - - if (!_opCtx) { + auto task = [this, func = std::move(func)](stdx::unique_lock<Mutex> lk) mutable { + auto status = _opCtx ? Status::OK() : getDetachedError(); lk.unlock(); - func(getDetachedError()); + func(std::move(status)); + }; + stdx::unique_lock lk(_mutex); + if (!_opCtx) { + // Run the task inline if the baton is detached. + task(std::move(lk)); return; } - _scheduled.push_back( - [ this, func = std::move(func) ](stdx::unique_lock<Mutex> lk) mutable noexcept { - auto status = Status::OK(); - if (!_opCtx) { - status = getDetachedError(); - } - lk.unlock(); - - func(status); - }); - - if (_inPoll) { - _efd().notify(); - } + _scheduled.push_back(std::move(task)); + if (_inPoll) + notify(); } void TransportLayerASIO::BatonASIO::notify() noexcept { - _efd().notify(); + efd(_opCtx).notify(); } -/** - * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we create - * a regular waitUntil baton event off the timer, with the passed deadline). - */ Waitable::TimeoutState TransportLayerASIO::BatonASIO::run_until(ClockSource* clkSource, Date_t deadline) noexcept { - InternalReactorTimer irt; - auto future = waitUntil(irt, deadline); + // Set up a timer on the baton with the specified deadline. This synthetic timer is used by + // `_poll()`, which is called through `run()`, to enforce a deadline for the blocking `::poll`. + DummyTimer timer; + auto future = waitUntil(timer, deadline); run(clkSource); - // If the future is ready our timer has fired, in which case we timed out + // If the future is ready, our timer interrupted `run()`, in which case we timed out. if (future.isReady()) { future.get(); - return Waitable::TimeoutState::Timeout; } else { - cancelTimer(irt); - + cancelTimer(timer); return Waitable::TimeoutState::NoTimeout; } } void TransportLayerASIO::BatonASIO::run(ClockSource* clkSource) noexcept { - std::vector<Promise<void>> toFulfill; - - // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks + // On the way out, fulfill promises and run scheduled jobs without holding the lock. + std::list<Promise<void>> toFulfill; const ScopeGuard guard([&] { for (auto& promise : toFulfill) { promise.emplaceValue(); } auto lk = stdx::unique_lock(_mutex); - while (_scheduled.size()) { + while (!_scheduled.empty()) { auto scheduled = std::exchange(_scheduled, {}); for (auto& job : scheduled) { job(std::move(lk)); job = nullptr; - lk = stdx::unique_lock(_mutex); } } @@ -154,109 +195,28 @@ void TransportLayerASIO::BatonASIO::run(ClockSource* clkSource) noexcept { stdx::unique_lock lk(_mutex); - // If anything was scheduled, run it now. No need to poll - if (_scheduled.size()) { + // If anything was scheduled, run it now and skip polling and processing timers. + if (!_scheduled.empty()) return; - } - boost::optional<Date_t> deadline; - - // If we have a timer, poll no longer than that - if (_timers.size()) { - deadline = _timers.begin()->first; - } - - _pollSessions.clear(); - _pollSet.clear(); - _pollSessions.reserve(_sessions.size()); - _pollSet.reserve(_sessions.size() + 1); - - _pollSet.push_back(pollfd{_efd().fd, POLLIN, 0}); - - for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { - _pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0}); - _pollSessions.push_back(iter); - } - - auto now = clkSource->now(); - - int rval = 0; - // If we don't have a timeout, or we have a timeout that's unexpired, run poll. - if (!deadline || (*deadline > now)) { - if (deadline && !clkSource->tracksSystemClock()) { - invariant( - clkSource->setAlarm(*deadline, [this, anchor = shared_from_this()] { notify(); })); - - deadline.reset(); - } - - _inPoll = true; - lk.unlock(); - blockBatonASIOBeforePoll.pauseWhileSet(); - rval = ::poll(_pollSet.data(), - _pollSet.size(), - deadline ? Milliseconds(*deadline - now).count() : -1); - auto savedErrno = errno; - lk.lock(); - _inPoll = false; - - // If poll failed, it better be in EINTR - if (rval < 0 && savedErrno != EINTR) { - LOGV2_FATAL(50834, - "error in poll: {error}", - "error in poll", - "error"_attr = errnoWithDescription(savedErrno)); - } - } - - now = clkSource->now(); + toFulfill.splice(toFulfill.end(), _poll(lk, clkSource)); // Fire expired timers - for (auto iter = _timers.begin(); iter != _timers.end() && iter->first <= now;) { - toFulfill.push_back(std::move(iter->second.promise)); - _timersById.erase(iter->second.id); - iter = _timers.erase(iter); - } - - // If poll found some activity - if (rval > 0) { - size_t remaining = rval; - - auto pollIter = _pollSet.begin(); - - if (pollIter->revents) { - _efd().wait(); - - remaining--; - } - - ++pollIter; - - for (auto sessionIter = _pollSessions.begin(); - sessionIter != _pollSessions.end() && remaining; - ++sessionIter, ++pollIter) { - if (pollIter->revents) { - toFulfill.push_back(std::move((*sessionIter)->second.promise)); - _sessions.erase(*sessionIter); - - remaining--; - } - } - - invariant(remaining == 0); + const auto now = clkSource->now(); + for (auto it = _timers.begin(); it != _timers.end() && it->first <= now; + it = _timers.erase(it)) { + toFulfill.push_back(std::move(it->second.promise)); + _timersById.erase(it->second.id); } - - return; } void TransportLayerASIO::BatonASIO::markKillOnClientDisconnect() noexcept { - if (_opCtx->getClient() && _opCtx->getClient()->session()) { - _addSession(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) { - if (!s.isOK()) { - return; - } - - _opCtx->markKilled(ErrorCodes::ClientDisconnect); + auto client = _opCtx->getClient(); + invariant(client); + if (auto session = client->session()) { + _addSession(*session, POLLRDHUP).getAsync([this](Status status) { + if (status.isOK()) + _opCtx->markKilled(ErrorCodes::ClientDisconnect); }); } } @@ -265,19 +225,15 @@ Future<void> TransportLayerASIO::BatonASIO::addSession(Session& session, Type ty return _addSession(session, type == Type::In ? POLLIN : POLLOUT); } -Future<void> TransportLayerASIO::BatonASIO::waitUntil(const ReactorTimer& timer, +Future<void> TransportLayerASIO::BatonASIO::waitUntil(const ReactorTimer& reactorTimer, Date_t expiration) noexcept try { auto pf = makePromiseFuture<void>(); - auto id = timer.id(); - - stdx::unique_lock lk(_mutex); - - _safeExecute(std::move(lk), [ expiration, timer = Timer{id, std::move(pf.promise)}, - this ](stdx::unique_lock<Mutex>) mutable noexcept { - auto iter = _timers.emplace(expiration, std::move(timer)); - _timersById[iter->second.id] = iter; - }); - + _safeExecute(stdx::unique_lock(_mutex), + [this, expiration, timer = Timer{reactorTimer.id(), std::move(pf.promise)}]( + stdx::unique_lock<Mutex>) mutable { + auto iter = _timers.emplace(expiration, std::move(timer)); + _timersById[iter->second.id] = iter; + }); return std::move(pf.future); } catch (const DBException& ex) { return ex.toStatus(); @@ -287,17 +243,15 @@ bool TransportLayerASIO::BatonASIO::cancelSession(Session& session) noexcept { const auto id = session.id(); stdx::unique_lock lk(_mutex); - - if (_sessions.find(id) == _sessions.end()) { + if (_sessions.find(id) == _sessions.end()) return false; - } - _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept { + _safeExecute(std::move(lk), [this, id](stdx::unique_lock<Mutex> lk) { auto iter = _sessions.find(id); - if (iter == _sessions.end()) { + if (iter == _sessions.end()) return; - } - auto session = std::exchange(iter->second, {}); + + TransportSession session = std::exchange(iter->second, {}); _sessions.erase(iter); lk.unlock(); @@ -311,19 +265,15 @@ bool TransportLayerASIO::BatonASIO::cancelTimer(const ReactorTimer& timer) noexc const auto id = timer.id(); stdx::unique_lock lk(_mutex); - - if (_timersById.find(id) == _timersById.end()) { + if (_timersById.find(id) == _timersById.end()) return false; - } - _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept { + _safeExecute(std::move(lk), [this, id](stdx::unique_lock<Mutex> lk) { auto iter = _timersById.find(id); - - if (iter == _timersById.end()) { + if (iter == _timersById.end()) return; - } - auto timer = std::exchange(iter->second->second, {}); + Timer timer = std::exchange(iter->second->second, {}); _timers.erase(iter->second); _timersById.erase(iter); lk.unlock(); @@ -339,22 +289,106 @@ bool TransportLayerASIO::BatonASIO::canWait() noexcept { return _opCtx; } -TransportLayerASIO::BatonASIO::EventFDHolder& TransportLayerASIO::BatonASIO::_efd() { - return EventFDHolder::getForClient(_opCtx->getClient()); +void TransportLayerASIO::BatonASIO::_safeExecute(stdx::unique_lock<Mutex> lk, + TransportLayerASIO::BatonASIO::Job job) { + if (!_opCtx) { + // If we're detached, no job can safely execute. + iasserted(getDetachedError()); + } + + if (_inPoll) { + _scheduled.push_back(std::move(job)); + notify(); + } else { + job(std::move(lk)); + } } -Future<void> TransportLayerASIO::BatonASIO::_addSession(Session& session, short type) noexcept try { - auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); - auto id = session.id(); - auto pf = makePromiseFuture<void>(); +std::list<Promise<void>> TransportLayerASIO::BatonASIO::_poll(stdx::unique_lock<Mutex>& lk, + ClockSource* clkSource) { + const auto now = clkSource->now(); - stdx::unique_lock lk(_mutex); + // If we have a timer, then use it to enforce a timeout for polling. + boost::optional<Date_t> deadline; + if (!_timers.empty()) { + deadline = _timers.begin()->first; + + // Don't poll if we have already passed the deadline. + if (*deadline <= now) + return {}; + } + + if (deadline && !clkSource->tracksSystemClock()) { + // The clock source and `::poll` may track time differently, so use the clock source to + // enforce the timeout. + invariant(clkSource->setAlarm(*deadline, [self = shared_from_this()] { self->notify(); })); + deadline.reset(); + } + + _pollSet.clear(); + _pollSet.reserve(_sessions.size() + 1); + _pollSet.push_back({efd(_opCtx).fd, POLLIN, 0}); + + _pollSessions.clear(); + _pollSessions.reserve(_sessions.size()); + + for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { + _pollSet.push_back({iter->second.fd, iter->second.events, 0}); + _pollSessions.push_back(iter); + } - _safeExecute(std::move(lk), - [ id, session = TransportSession{fd, type, std::move(pf.promise)}, - this ](stdx::unique_lock<Mutex>) mutable noexcept { - auto ret = _sessions.emplace(id, std::move(session)); - invariant(ret.second); + int events = [&] { + _inPoll = true; + lk.unlock(); + + const ScopeGuard guard([&] { + lk.lock(); + _inPoll = false; + }); + + blockBatonASIOBeforePoll.pauseWhileSet(); + int timeout = deadline ? Milliseconds(*deadline - now).count() : -1; + int events = ::poll(_pollSet.data(), _pollSet.size(), timeout); + const auto savedErrno = errno; + if (events < 0 && savedErrno != EINTR) { + LOGV2_FATAL(50834, "error in poll", "error"_attr = errnoWithDescription(savedErrno)); + } + return events; + }(); + + if (events <= 0) + return {}; // Polling was timed out or interrupted. + + auto psit = _pollSet.begin(); + // Consume the notification on the `eventfd` object if there is any. + if (psit->revents) { + efd(_opCtx).wait(); + --events; + } + ++psit; + + std::list<Promise<void>> promises; + for (auto sit = _pollSessions.begin(); events && sit != _pollSessions.end(); ++sit, ++psit) { + if (psit->revents) { + promises.push_back(std::move((*sit)->second.promise)); + _sessions.erase(*sit); + --events; + } + } + + invariant(events == 0, "Failed to process all events after going through registered sessions"); + return promises; +} + +Future<void> TransportLayerASIO::BatonASIO::_addSession(Session& session, short events) try { + auto pf = makePromiseFuture<void>(); + TransportSession ts{checked_cast<ASIOSession&>(session).getSocket().native_handle(), + events, + std::move(pf.promise)}; + _safeExecute(stdx::unique_lock(_mutex), + [this, id = session.id(), ts = std::move(ts)](stdx::unique_lock<Mutex>) mutable { + invariant(_sessions.emplace(id, std::move(ts)).second, + "Adding session to baton failed"); }); return std::move(pf.future); } catch (const DBException& ex) { diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index f6401231f72..f554c43668e 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -29,15 +29,13 @@ #pragma once +#include <list> #include <map> #include <memory> #include <vector> #include <poll.h> -#include <sys/eventfd.h> -#include "mongo/base/error_codes.h" -#include "mongo/base/status.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/transport/baton.h" @@ -56,60 +54,6 @@ namespace transport { * We implement our networking reactor on top of poll + eventfd for wakeups */ class TransportLayerASIO::BatonASIO : public NetworkingBaton { - /** - * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the - * counter portion, just the notify/wakeup - */ - struct EventFDHolder { - EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) { - if (fd < 0) { - auto savedErrno = errno; - std::string reason = str::stream() - << "error in creating eventfd: " << errnoWithDescription(savedErrno); - - auto code = (savedErrno == EMFILE || savedErrno == ENFILE) - ? ErrorCodes::TooManyFilesOpen - : ErrorCodes::UnknownError; - - uasserted(code, reason); - } - } - - ~EventFDHolder() { - ::close(fd); - } - - EventFDHolder(const EventFDHolder&) = delete; - EventFDHolder& operator=(const EventFDHolder&) = delete; - - // Writes to the underlying eventfd - void notify() { - while (true) { - if (::eventfd_write(fd, 1) == 0) { - break; - } - - invariant(errno == EINTR); - } - } - - void wait() { - while (true) { - // If we have activity on the eventfd, pull the count out - uint64_t u; - if (::eventfd_read(fd, &u) == 0) { - break; - } - - invariant(errno == EINTR); - } - } - - const int fd; - - static const Client::Decoration<EventFDHolder> getForClient; - }; - public: BatonASIO(OperationContext* opCtx) : _opCtx(opCtx) {} @@ -146,50 +90,50 @@ public: private: struct Timer { - size_t id; - Promise<void> promise; // Needs to be mutable to move from it while in std::set. + size_t id; // Stores the unique identifier for the timer, provided by `ReactorTimer`. + Promise<void> promise; }; struct TransportSession { int fd; - short type; + short events; // Events to consider while polling for this session (e.g., `POLLIN`). Promise<void> promise; }; - // Internally, the BatonASIO thinks in terms of synchronized units of work. This is because - // a Baton effectively represents a green thread with the potential to add or remove work (i.e. - // Jobs) at any time. Jobs with external notifications (OutOfLineExecutor::Tasks, - // TransportSession:promise, ReactorTimer::promise) are expected to release their lock before - // generating those notifications. + /* + * Internally, `BatonASIO` thinks in terms of synchronized units of work. This is because a + * baton effectively represents a green thread with the potential to add or remove work (i.e., + * jobs) at any time. Thus, scheduled jobs must release their lock before executing any task + * external to the baton (e.g., `OutOfLineExecutor::Task`, `TransportSession:promise`, and + * `ReactorTimer::promise`). + */ using Job = unique_function<void(stdx::unique_lock<Mutex>)>; /** - * Invoke a job with exclusive access to the Baton internals. + * Invokes a job with exclusive access to the baton's internals. + * + * If the baton is currently polling (i.e., `_inPoll` is `true`), the polling thread owns the + * baton, so we schedule the job and notify the polling thread to wake up and run the job. + * + * Otherwise, take exclusive access and run the job on the current thread. + * + * Note that `_safeExecute()` will throw if the baton has been detached. * - * If we are currently _inPoll, the polling thread owns the Baton and thus we tell it to wake up - * and run our job. If we are not _inPoll, take exclusive access and run our job on the local - * thread. Note that _safeExecute() will throw if the Baton has been detached. + * Also note that the job may not run inline, and may get scheduled to run by the baton, so it + * should never throw. */ - TEMPLATE(typename Callback) - REQUIRES(std::is_nothrow_invocable_v<Callback, stdx::unique_lock<Mutex>>) - void _safeExecute(stdx::unique_lock<Mutex> lk, Callback&& job) { - if (!_opCtx) { - // If we're detached, no job can safely execute. - uassertStatusOK({ErrorCodes::ShutdownInProgress, "Baton detached"}); - } - - if (_inPoll) { - _scheduled.push_back(std::forward<Callback>(job)); - - _efd().notify(); - } else { - job(std::move(lk)); - } - } + void _safeExecute(stdx::unique_lock<Mutex> lk, Job job); - EventFDHolder& _efd(); + /** + * Blocks polling on the registered sessions until one of the following happens: + * - `notify()` is called, either directly or through other methods (e.g., `schedule()`). + * - One of the timers scheduled on this baton times out. + * - There is an event for at least one of the registered sessions (e.g., data is available). + * Returns the list of promises that must be fulfilled as the result of polling. + */ + std::list<Promise<void>> _poll(stdx::unique_lock<Mutex>&, ClockSource*); - Future<void> _addSession(Session& session, short type) noexcept; + Future<void> _addSession(Session& session, short events); void detachImpl() noexcept override; @@ -199,25 +143,27 @@ private: bool _inPoll = false; - // This map stores the sessions we need to poll on. We unwind it into a pollset for every - // blocking call to run + // Stores the sessions we need to poll on. stdx::unordered_map<SessionId, TransportSession> _sessions; - // The set is used to find the next timer which will fire. The unordered_map looks up the - // timers so we can remove them in O(1) + /** + * We use two structures to maintain timers: + * - `_timers` keeps a sorted list of timers according to their expiration date. + * - `_timersById` allows using the unique timer id to find and cancel a timer in constant time. + */ std::multimap<Date_t, Timer> _timers; - stdx::unordered_map<size_t, decltype(_timers)::iterator> _timersById; + stdx::unordered_map<size_t, std::multimap<Date_t, Timer>::iterator> _timersById; - // For tasks that come in via schedule. Or that were deferred because we were in poll + // Tasks scheduled for deferred execution. std::vector<Job> _scheduled; - // We hold the two following values at the object level to save on allocations when a baton is - // waited on many times over the course of its lifetime. - - // Holds the pollset for ::poll - std::vector<pollfd> _pollSet; - - // Mirrors the above pollset with mappings back to _sessions + /* + * We hold the following values at the object level to save on allocations when a baton is + * waited on many times over the course of its lifetime: + * `_pollSet`: the poll set for `::poll`. + * `_pollSessions`: maps members of `_pollSet` to their corresponding session in `_sessions`. + */ + std::vector<::pollfd> _pollSet; std::vector<decltype(_sessions)::iterator> _pollSessions; }; diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index a3e1c850e16..daf1cd1a256 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -62,6 +62,7 @@ #include "mongo/util/synchronized_value.h" #include "mongo/util/thread_context.h" #include "mongo/util/time_support.h" +#include "mongo/util/waitable.h" namespace mongo { namespace { @@ -788,6 +789,19 @@ class BatonASIOLinuxTest : public LockerNoopServiceContextTest { Promise<transport::SessionHandle> _promise; }; + // Used for setting and canceling timers on the networking baton. Does not offer any timer + // functionality, and is only used for its unique id. + class DummyTimer final : public transport::ReactorTimer { + public: + void cancel(const BatonHandle& baton = nullptr) override { + MONGO_UNREACHABLE; + } + + Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override { + MONGO_UNREACHABLE; + } + }; + public: void setUp() override { auto pf = makePromiseFuture<transport::SessionHandle>(); @@ -815,6 +829,10 @@ public: return *_connThread; } + std::unique_ptr<transport::ReactorTimer> makeDummyTimer() const { + return std::make_unique<DummyTimer>(); + } + private: ServiceContext::UniqueClient _client; std::unique_ptr<ConnectionThread> _connThread; @@ -897,7 +915,7 @@ TEST_F(BatonASIOLinuxTest, AddAndRemoveSessionWhileInPoll) { auto opCtx = client().makeOperationContext(); Notification<bool> cancelSessionResult; - stdx::thread thread([&] { + JoinThread thread([&] { auto baton = opCtx->getBaton()->networking(); auto session = client().session(); @@ -910,14 +928,103 @@ TEST_F(BatonASIOLinuxTest, AddAndRemoveSessionWhileInPoll) { cancelSessionResult.set(baton->cancelSession(*session)); }); - ScopeGuard joinGuard = [&] { thread.join(); }; - // TODO SERVER-61192 Change the following to `ASSERT_TRUE` once the underlying issue is fixed. ASSERT_FALSE(cancelSessionResult.get(opCtx.get())); } -// TODO SERVER-61192 Test setting and canceling timers on the baton. -// TODO SERVER-61192 Test `run`, `notify`, and `run_until` on `BatonASIO`. +TEST_F(BatonASIOLinuxTest, WaitAndNotify) { + // Exercises the underlying `wait` and `notify` functionality through `BatonASIO::run` and + // `BatonASIO::schedule`, respectively. Here is how this is done: + // 1) The main thread starts polling (from inside `run`) when waiting on the notification. + // 2) Once the main thread is ready to poll, `thread` notifies it through `baton->schedule`. + // 3) `schedule` calls into `notify` internally, which should interrupt the polling. + // 4) Once polling is interrupted, `baton` runs the scheduled job and sets the notification. + auto opCtx = client().makeOperationContext(); + + Notification<void> notification; + JoinThread thread([&] { + auto baton = opCtx->getBaton()->networking(); + FailPointEnableBlock fp("blockBatonASIOBeforePoll"); + fp->waitForTimesEntered(1); + baton->schedule([&](Status) { notification.set(); }); + }); + + notification.get(opCtx.get()); +} + +void blockIfBatonPolls(Client& client, + std::function<void(const BatonHandle&, Notification<void>&)> modifyBaton) { + Notification<void> notification; + auto opCtx = client.makeOperationContext(); + + FailPointEnableBlock fp("blockBatonASIOBeforePoll"); + + modifyBaton(opCtx->getBaton(), notification); + + // This will internally call into `BatonASIO::run()`, which will block forever (since the + // failpoint is enabled) if the baton starts polling. + notification.get(opCtx.get()); +} + +TEST_F(BatonASIOLinuxTest, BatonWithPendingTasksNeverPolls) { + blockIfBatonPolls(client(), [](const BatonHandle& baton, Notification<void>& notification) { + baton->schedule([&](Status) { notification.set(); }); + }); +} + +TEST_F(BatonASIOLinuxTest, BatonWithAnExpiredTimerNeverPolls) { + auto timer = makeDummyTimer(); + blockIfBatonPolls(client(), [&](const BatonHandle& baton, Notification<void>& notification) { + // Batons use the precise clock source internally. We use the current time (i.e., `now()`) + // as the deadline to schedule an expired timer on the baton. + auto clkSource = getServiceContext()->getPreciseClockSource(); + baton->networking()->waitUntil(*timer, clkSource->now()).getAsync([&](Status) { + notification.set(); + }); + }); +} + +TEST_F(BatonASIOLinuxTest, NotifyInterruptsRunUntilBeforeTimeout) { + auto opCtx = client().makeOperationContext(); + JoinThread thread([&] { + auto baton = opCtx->getBaton(); + FailPointEnableBlock fp("blockBatonASIOBeforePoll"); + fp->waitForTimesEntered(1); + baton->notify(); + }); + + auto clkSource = getServiceContext()->getPreciseClockSource(); + const auto state = opCtx->getBaton()->run_until(clkSource, Date_t::max()); + ASSERT(state == Waitable::TimeoutState::NoTimeout); +} + +TEST_F(BatonASIOLinuxTest, RunUntilProperlyTimesout) { + auto opCtx = client().makeOperationContext(); + auto clkSource = getServiceContext()->getPreciseClockSource(); + const auto state = opCtx->getBaton()->run_until(clkSource, clkSource->now() + Milliseconds(1)); + ASSERT(state == Waitable::TimeoutState::Timeout); +} + +TEST_F(BatonASIOLinuxTest, AddAndRemoveTimerWhileInPoll) { + auto opCtx = client().makeOperationContext(); + Notification<bool> cancelTimerResult; + + JoinThread thread([&] { + auto baton = opCtx->getBaton()->networking(); + + FailPointEnableBlock fp("blockBatonASIOBeforePoll"); + fp->waitForTimesEntered(1); + + // This thread is an external observer to the baton, so the expected behavior is for + // `cancelTimer` to happen after `waitUntil`, thus canceling the timer must return `true`. + auto timer = makeDummyTimer(); + baton->waitUntil(*timer, Date_t::max()).getAsync([](Status) {}); + cancelTimerResult.set(baton->cancelTimer(*timer)); + }); + + // TODO SERVER-61192 Change the following to `ASSERT_TRUE` once the underlying issue is fixed. + ASSERT_FALSE(cancelTimerResult.get(opCtx.get())); +} #endif // __linux__ |