/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include #include #include "mongo/base/checked_cast.h" #include "mongo/db/operation_context.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/transport/baton.h" #include "mongo/transport/session_asio.h" #include "mongo/util/errno_util.h" #include "mongo/util/future.h" #include "mongo/util/time_support.h" namespace mongo { namespace transport { /** * TransportLayerASIO Baton implementation for linux. * * We implement our networking reactor on top of poll + eventfd for wakeups */ class TransportLayerASIO::BatonASIO : public NetworkingBaton { /** * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for * ::poll). * * Its methods are all unreachable because we never actually use its timer-ness (we just need * its address for baton book keeping). */ class InternalReactorTimer : public ReactorTimer { public: void cancel(const BatonHandle& baton = nullptr) override { MONGO_UNREACHABLE; } Future waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override { MONGO_UNREACHABLE; } }; /** * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the * counter portion, just the notify/wakeup */ struct EventFDHolder { EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) { if (fd < 0) { severe() << "error in eventfd: " << errnoWithDescription(errno); fassertFailed(50833); } } ~EventFDHolder() { ::close(fd); } EventFDHolder(const EventFDHolder&) = delete; EventFDHolder& operator=(const EventFDHolder&) = delete; // Writes to the underlying eventfd void notify() { while (true) { if (eventfd_write(fd, 1) == 0) { break; } invariant(errno == EINTR); } } void wait() { while (true) { // If we have activity on the eventfd, pull the count out uint64_t u; if (::eventfd_read(fd, &u) == 0) { break; } invariant(errno == EINTR); } } const int fd; static const Client::Decoration getForClient; }; public: BatonASIO(OperationContext* opCtx) : _opCtx(opCtx) {} ~BatonASIO() { invariant(!_opCtx); invariant(_sessions.empty()); invariant(_scheduled.empty()); invariant(_timers.empty()); } void markKillOnClientDisconnect() noexcept override { if (_opCtx->getClient() && _opCtx->getClient()->session()) { addSessionImpl(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) { if (!s.isOK()) { return; } _opCtx->markKilled(ErrorCodes::ClientDisconnect); }); } } Future addSession(Session& session, Type type) noexcept override { return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT); } Future waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override { auto pf = makePromiseFuture(); auto id = timer.id(); stdx::unique_lock lk(_mutex); if (!_opCtx) { return Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot waitUntil on timer"); } _safeExecute(std::move(lk), [ id, expiration, promise = std::move(pf.promise), this ]() mutable { auto iter = _timers.emplace(std::piecewise_construct, std::forward_as_tuple(expiration), std::forward_as_tuple(id, std::move(promise))); _timersById[id] = iter; }); return std::move(pf.future); } bool cancelSession(Session& session) noexcept override { const auto id = session.id(); stdx::unique_lock lk(_mutex); if (_sessions.find(id) == _sessions.end()) { return false; } _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); }); return true; } bool cancelTimer(const ReactorTimer& timer) noexcept override { const auto id = timer.id(); stdx::unique_lock lk(_mutex); if (_timersById.find(id) == _timersById.end()) { return false; } _safeExecute(std::move(lk), [id, this] { auto iter = _timersById.find(id); if (iter != _timersById.end()) { _timers.erase(iter->second); _timersById.erase(iter); } }); return true; } void schedule(unique_function func) noexcept override { stdx::lock_guard lk(_mutex); if (!_opCtx) { func(nullptr); return; } _scheduled.push_back(std::move(func)); if (_inPoll) { efd().notify(); } } void notify() noexcept override { efd().notify(); } /** * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we * create a regular waitUntil baton event off the timer, with the passed deadline). */ Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept override { InternalReactorTimer irt; auto future = waitUntil(irt, deadline); run(clkSource); // If the future is ready our timer has fired, in which case we timed out if (future.isReady()) { future.get(); return Waitable::TimeoutState::Timeout; } else { cancelTimer(irt); return Waitable::TimeoutState::NoTimeout; } } void run(ClockSource* clkSource) noexcept override { std::vector> toFulfill; // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks const auto guard = makeGuard([&] { for (auto& promise : toFulfill) { promise.emplaceValue(); } stdx::unique_lock lk(_mutex); while (_scheduled.size()) { decltype(_scheduled) toRun; { using std::swap; swap(_scheduled, toRun); } lk.unlock(); for (auto& job : toRun) { job(_opCtx); } lk.lock(); } }); stdx::unique_lock lk(_mutex); // If anything was scheduled, run it now. No need to poll if (_scheduled.size()) { return; } boost::optional deadline; // If we have a timer, poll no longer than that if (_timers.size()) { deadline = _timers.begin()->first; } std::vector sessions; sessions.reserve(_sessions.size()); std::vector pollSet; pollSet.reserve(_sessions.size() + 1); pollSet.push_back(pollfd{efd().fd, POLLIN, 0}); for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0}); sessions.push_back(iter); } auto now = clkSource->now(); int rval = 0; // If we don't have a timeout, or we have a timeout that's unexpired, run poll. if (!deadline || (*deadline > now)) { if (deadline && !clkSource->tracksSystemClock()) { invariant(clkSource->setAlarm(*deadline, [this] { notify(); })); deadline.reset(); } _inPoll = true; lk.unlock(); rval = ::poll(pollSet.data(), pollSet.size(), deadline ? Milliseconds(*deadline - now).count() : -1); const auto pollGuard = makeGuard([&] { lk.lock(); _inPoll = false; }); // If poll failed, it better be in EINTR if (rval < 0 && errno != EINTR) { severe() << "error in poll: " << errnoWithDescription(errno); fassertFailed(50834); } } now = clkSource->now(); // Fire expired timers for (auto iter = _timers.begin(); iter != _timers.end() && iter->first < now;) { toFulfill.push_back(std::move(iter->second.promise)); _timersById.erase(iter->second.id); iter = _timers.erase(iter); } // If poll found some activity if (rval > 0) { size_t remaining = rval; auto pollIter = pollSet.begin(); if (pollIter->revents) { efd().wait(); remaining--; } ++pollIter; for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining; ++sessionIter, ++pollIter) { if (pollIter->revents) { toFulfill.push_back(std::move((*sessionIter)->second.promise)); _sessions.erase(*sessionIter); remaining--; } } invariant(remaining == 0); } return; } private: Future addSessionImpl(Session& session, short type) noexcept { auto fd = checked_cast(session).getSocket().native_handle(); auto id = session.id(); auto pf = makePromiseFuture(); stdx::unique_lock lk(_mutex); if (!_opCtx) { return Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot addSession"); } _safeExecute(std::move(lk), [ id, fd, type, promise = std::move(pf.promise), this ]() mutable { _sessions[id] = TransportSession{fd, type, std::move(promise)}; }); return std::move(pf.future); } void detachImpl() noexcept override { decltype(_sessions) sessions; decltype(_scheduled) scheduled; decltype(_timers) timers; { stdx::lock_guard lk(_mutex); { stdx::lock_guard lk(*_opCtx->getClient()); invariant(_opCtx->getBaton().get() == this); _opCtx->setBaton(nullptr); } _opCtx = nullptr; using std::swap; swap(_sessions, sessions); swap(_scheduled, scheduled); swap(_timers, timers); } for (auto& job : scheduled) { job(nullptr); } for (auto& session : sessions) { session.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot wait for socket")); } for (auto& pair : timers) { pair.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, "baton is detached, completing timer early")); } } struct Timer { Timer(size_t id, Promise promise) : id(id), promise(std::move(promise)) {} size_t id; Promise promise; // Needs to be mutable to move from it while in std::set. }; struct TransportSession { int fd; short type; Promise promise; }; /** * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to * the eventfd. If not, we run inline. */ template void _safeExecute(stdx::unique_lock lk, Callback&& cb) { if (_inPoll) { _scheduled.push_back( [ cb = std::forward(cb), this ](OperationContext*) mutable { stdx::lock_guard lk(_mutex); cb(); }); efd().notify(); } else { cb(); } } EventFDHolder& efd() { return EventFDHolder::getForClient(_opCtx->getClient()); } stdx::mutex _mutex; OperationContext* _opCtx; bool _inPoll = false; // This map stores the sessions we need to poll on. We unwind it into a pollset for every // blocking call to run stdx::unordered_map _sessions; // The set is used to find the next timer which will fire. The unordered_map looks up the // timers so we can remove them in O(1) std::multimap _timers; stdx::unordered_map _timersById; // For tasks that come in via schedule. Or that were deferred because we were in poll std::vector> _scheduled; }; const Client::Decoration TransportLayerASIO::BatonASIO::EventFDHolder::getForClient = Client::declareDecoration(); } // namespace transport } // namespace mongo