diff options
-rw-r--r-- | src/mongo/transport/baton_asio_linux.h | 159 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 2 |
2 files changed, 96 insertions, 65 deletions
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 1d89194ee98..57762c9af4b 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -42,7 +42,9 @@ #include "mongo/stdx/unordered_map.h" #include "mongo/transport/baton.h" #include "mongo/transport/session_asio.h" +#include "mongo/util/concepts.h" #include "mongo/util/errno_util.h" +#include "mongo/util/functional.h" #include "mongo/util/future.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/time_support.h" @@ -57,6 +59,8 @@ namespace transport { */ class TransportLayerASIO::BatonASIO : public NetworkingBaton { static const inline auto kDetached = Status(ErrorCodes::ShutdownInProgress, "Baton detached"); + static const inline auto kCanceled = + Status(ErrorCodes::CallbackCanceled, "Baton wait canceled"); /** * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for @@ -155,42 +159,48 @@ public: return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT); } - Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override { + Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override try { auto pf = makePromiseFuture<void>(); auto id = timer.id(); - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock lk(_mutex); - if (!_opCtx) { - return kDetached; - } - - _safeExecute(std::move(lk), - [id, expiration, promise = std::move(pf.promise), this]() mutable { - auto iter = _timers.emplace(std::piecewise_construct, - std::forward_as_tuple(expiration), - std::forward_as_tuple(id, std::move(promise))); - _timersById[id] = iter; - }); + _safeExecute(std::move(lk), [ expiration, timer = Timer{id, std::move(pf.promise)}, + this ](stdx::unique_lock<Mutex>) mutable noexcept { + auto iter = _timers.emplace(expiration, std::move(timer)); + _timersById[iter->second.id] = iter; + }); return std::move(pf.future); + } catch (const DBException& ex) { + return ex.toStatus(); } bool canWait() noexcept override { - stdx::lock_guard<Latch> lk(_mutex); + stdx::lock_guard lk(_mutex); return _opCtx; } bool cancelSession(Session& session) noexcept override { const auto id = session.id(); - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock lk(_mutex); if (_sessions.find(id) == _sessions.end()) { return false; } - _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); }); + _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept { + auto iter = _sessions.find(id); + if (iter == _sessions.end()) { + return; + } + auto session = std::exchange(iter->second, {}); + _sessions.erase(iter); + lk.unlock(); + + session.promise.setError(kCanceled); + }); return true; } @@ -198,34 +208,50 @@ public: bool cancelTimer(const ReactorTimer& timer) noexcept override { const auto id = timer.id(); - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock lk(_mutex); if (_timersById.find(id) == _timersById.end()) { return false; } - _safeExecute(std::move(lk), [id, this] { + _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept { auto iter = _timersById.find(id); - if (iter != _timersById.end()) { - _timers.erase(iter->second); - _timersById.erase(iter); + if (iter == _timersById.end()) { + return; } + + auto timer = std::exchange(iter->second->second, {}); + _timers.erase(iter->second); + _timersById.erase(iter); + lk.unlock(); + + timer.promise.setError(kCanceled); }); return true; } void schedule(Task func) noexcept override { - stdx::lock_guard<Latch> lk(_mutex); + stdx::unique_lock lk(_mutex); if (!_opCtx) { + lk.unlock(); func(kDetached); return; } - _scheduled.push_back(std::move(func)); + _scheduled.push_back( + [ this, func = std::move(func) ](stdx::unique_lock<Mutex> lk) mutable noexcept { + auto status = Status::OK(); + if (!_opCtx) { + status = kDetached; + } + lk.unlock(); + + func(status); + }); if (_inPoll) { efd().notify(); @@ -267,22 +293,19 @@ public: promise.emplaceValue(); } - stdx::unique_lock<Latch> lk(_mutex); + auto lk = stdx::unique_lock(_mutex); while (_scheduled.size()) { - auto toRun = std::exchange(_scheduled, {}); + auto scheduled = std::exchange(_scheduled, {}); + for (auto& job : scheduled) { + job(std::move(lk)); + job = nullptr; - lk.unlock(); - while (toRun.size()) { - // While there are jobs to run, run and dtor in sequence - auto& job = toRun.back(); - job(Status::OK()); - toRun.pop_back(); + lk = stdx::unique_lock(_mutex); } - lk.lock(); } }); - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock lk(_mutex); // If anything was scheduled, run it now. No need to poll if (_scheduled.size()) { @@ -325,11 +348,8 @@ public: rval = ::poll(_pollSet.data(), _pollSet.size(), deadline ? Milliseconds(*deadline - now).count() : -1); - - const auto pollGuard = makeGuard([&] { - lk.lock(); - _inPoll = false; - }); + lk.lock(); + _inPoll = false; // If poll failed, it better be in EINTR if (rval < 0 && errno != EINTR) { @@ -380,32 +400,31 @@ public: } private: - Future<void> addSessionImpl(Session& session, short type) noexcept { + Future<void> addSessionImpl(Session& session, short type) noexcept try { auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); auto id = session.id(); auto pf = makePromiseFuture<void>(); - stdx::unique_lock<Latch> lk(_mutex); - - if (!_opCtx) { - return kDetached; - } + stdx::unique_lock lk(_mutex); _safeExecute(std::move(lk), - [id, fd, type, promise = std::move(pf.promise), this]() mutable { - _sessions[id] = TransportSession{fd, type, std::move(promise)}; + [ id, session = TransportSession{fd, type, std::move(pf.promise)}, + this ](stdx::unique_lock<Mutex>) mutable noexcept { + auto ret = _sessions.emplace(id, std::move(session)); + invariant(ret.second); }); - return std::move(pf.future); + } catch (const DBException& ex) { + return ex.toStatus(); } void detachImpl() noexcept override { - decltype(_sessions) sessions; decltype(_scheduled) scheduled; + decltype(_sessions) sessions; decltype(_timers) timers; { - stdx::lock_guard<Latch> lk(_mutex); + stdx::lock_guard lk(_mutex); invariant(_opCtx->getBaton().get() == this); _opCtx->setBaton(nullptr); @@ -413,13 +432,14 @@ private: _opCtx = nullptr; using std::swap; - swap(_sessions, sessions); swap(_scheduled, scheduled); + swap(_sessions, sessions); swap(_timers, timers); } for (auto& job : scheduled) { - job(kDetached); + job(stdx::unique_lock(_mutex)); + job = nullptr; } for (auto& session : sessions) { @@ -432,8 +452,6 @@ private: } struct Timer { - Timer(size_t id, Promise<void> promise) : id(id), promise(std::move(promise)) {} - size_t id; Promise<void> promise; // Needs to be mutable to move from it while in std::set. }; @@ -444,21 +462,34 @@ private: Promise<void> promise; }; + // Internally, the BatonASIO thinks in terms of synchronized units of work. This is because + // a Baton effectively represents a green thread with the potential to add or remove work (i.e. + // Jobs) at any time. Jobs with external notifications (OutOfLineExecutor::Tasks, + // TransportSession:promise, ReactorTimer::promise) are expected to release their lock before + // generating those notifications. + using Job = unique_function<void(stdx::unique_lock<Mutex>)>; + /** - * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to - * the eventfd. If not, we run inline. + * Invoke a job with exclusive access to the Baton internals. + * + * If we are currently _inPoll, the polling thread owns the Baton and thus we tell it to wake up + * and run our job. If we are not _inPoll, take exclusive access and run our job on the local + * thread. Note that _safeExecute() will throw if the Baton has been detached. */ - template <typename Callback> - void _safeExecute(stdx::unique_lock<Latch> lk, Callback&& cb) { + TEMPLATE(typename Callback) + REQUIRES(std::is_nothrow_invocable_v<Callback, stdx::unique_lock<Mutex>>) + void _safeExecute(stdx::unique_lock<Mutex> lk, Callback&& job) { + if (!_opCtx) { + // If we're detached, no job can safely execute. + uassertStatusOK(kDetached); + } + if (_inPoll) { - _scheduled.push_back([cb = std::forward<Callback>(cb), this](Status) mutable { - stdx::lock_guard<Latch> lk(_mutex); - cb(); - }); + _scheduled.push_back(std::forward<Callback>(job)); efd().notify(); } else { - cb(); + job(std::move(lk)); } } @@ -479,10 +510,10 @@ private: // The set is used to find the next timer which will fire. The unordered_map looks up the // timers so we can remove them in O(1) std::multimap<Date_t, Timer> _timers; - stdx::unordered_map<size_t, decltype(_timers)::const_iterator> _timersById; + stdx::unordered_map<size_t, decltype(_timers)::iterator> _timersById; // For tasks that come in via schedule. Or that were deferred because we were in poll - std::vector<Task> _scheduled; + std::vector<Job> _scheduled; // We hold the two following values at the object level to save on allocations when a baton is // waited on many times over the course of its lifetime. diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index e4a40043c0d..08042c94fe5 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -510,7 +510,7 @@ private: networkingBaton && networkingBaton->canWait()) { return networkingBaton->addSession(*this, NetworkingBaton::Type::In) .onError([](Status error) { - if (ErrorCodes::isCancelationError(error)) { + if (ErrorCodes::isShutdownError(error)) { // If the baton has detached, it will cancel its polling. We catch that // error here and return Status::OK so that we invoke // opportunisticRead() again and switch to asio::async_read() below. |