/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #pragma once #include #include #include #include #include #include "mongo/base/checked_cast.h" #include "mongo/db/operation_context.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/transport/baton.h" #include "mongo/transport/session_asio.h" #include "mongo/util/errno_util.h" #include "mongo/util/future.h" #include "mongo/util/time_support.h" namespace mongo { namespace transport { /** * TransportLayerASIO Baton implementation for linux. * * We implement our networking reactor on top of poll + eventfd for wakeups */ class TransportLayerASIO::BatonASIO : public Baton { /** * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the * counter portion, just the notify/wakeup */ struct EventFDHolder { EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) { if (fd < 0) { severe() << "error in eventfd: " << errnoWithDescription(errno); fassertFailed(50833); } } ~EventFDHolder() { ::close(fd); } // Writes to the underlying eventfd void notify() { while (true) { if (eventfd_write(fd, 1) == 0) { break; } invariant(errno == EINTR); } } void wait() { while (true) { // If we have activity on the eventfd, pull the count out uint64_t u; if (::eventfd_read(fd, &u) == 0) { break; } invariant(errno == EINTR); } } const int fd; }; public: BatonASIO(OperationContext* opCtx) : _opCtx(opCtx) {} ~BatonASIO() { invariant(!_opCtx); invariant(_sessions.empty()); invariant(_scheduled.empty()); invariant(_timers.empty()); } void detach() override { { stdx::lock_guard lk(_mutex); invariant(_sessions.empty()); invariant(_scheduled.empty()); invariant(_timers.empty()); } { stdx::lock_guard lk(*_opCtx->getClient()); invariant(_opCtx->getBaton().get() == this); _opCtx->setBaton(nullptr); } _opCtx = nullptr; } Future addSession(Session& session, Type type) override { auto fd = checked_cast(session).getSocket().native_handle(); auto pf = makePromiseFuture(); _safeExecute([ fd, type, sp = pf.promise.share(), this ] { _sessions[fd] = TransportSession{type, sp}; }); return std::move(pf.future); } Future waitFor(const ReactorTimer& timer, Milliseconds timeout) override { return waitUntil(timer, Date_t::now() + timeout); } Future waitUntil(const ReactorTimer& timer, Date_t expiration) override { auto pf = makePromiseFuture(); _safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] { auto pair = _timers.insert({ timerPtr, expiration, sp, }); invariant(pair.second); _timersById[pair.first->id] = pair.first; }); return std::move(pf.future); } bool cancelSession(Session& session) override { const auto fd = checked_cast(session).getSocket().native_handle(); stdx::unique_lock lk(_mutex); if (_sessions.find(fd) == _sessions.end()) { return false; } // TODO: There's an ABA issue here with fds where between previously and before we could // have removed the fd, then opened and added a new socket with the same fd. We need to // solve it via using session id's for handles. _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); }); return true; } bool cancelTimer(const ReactorTimer& timer) override { stdx::unique_lock lk(_mutex); if (_timersById.find(&timer) == _timersById.end()) { return false; } // TODO: Same ABA issue as above, but for pointers. _safeExecute(std::move(lk), [ timerPtr = &timer, this ] { auto iter = _timersById.find(timerPtr); if (iter != _timersById.end()) { _timers.erase(iter->second); _timersById.erase(iter); } }); return true; } void schedule(stdx::function func) override { stdx::lock_guard lk(_mutex); _scheduled.push_back(std::move(func)); if (_inPoll) { _efd.notify(); } } bool run(OperationContext* opCtx, boost::optional deadline) override { std::vector> toFulfill; // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks const auto guard = MakeGuard([&] { for (auto& promise : toFulfill) { promise.emplaceValue(); } stdx::unique_lock lk(_mutex); while (_scheduled.size()) { decltype(_scheduled) toRun; { using std::swap; swap(_scheduled, toRun); } lk.unlock(); for (auto& job : toRun) { job(); } lk.lock(); } }); // Note that it's important to check for interrupt without the lock, because markKilled // calls schedule, which will deadlock if we're holding the lock when calling this. if (opCtx) { opCtx->checkForInterrupt(); } stdx::unique_lock lk(_mutex); if (opCtx) { invariant(opCtx == _opCtx); } auto now = Date_t::now(); // If our deadline has passed, return that we've already failed if (deadline && *deadline <= now) { return false; } // If anything was scheduled, run it now. No need to poll if (_scheduled.size()) { return true; } boost::optional timeout; // If we have a timer, poll no longer than that if (_timers.size()) { timeout = _timers.begin()->expiration - now; } if (deadline) { auto deadlineTimeout = *deadline - now; // If we didn't have a timer with a deadline, or our deadline is sooner than that // timer if (!timeout || (deadlineTimeout < *timeout)) { timeout = deadlineTimeout; } } std::vector sessions; sessions.reserve(_sessions.size()); std::vector pollSet; pollSet.reserve(_sessions.size() + 1); pollSet.push_back(pollfd{_efd.fd, POLLIN, 0}); for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { pollSet.push_back( pollfd{iter->first, static_cast(iter->second.type == Type::In ? POLLIN : POLLOUT), 0}); sessions.push_back(iter); } int rval = 0; // If we don't have a timeout, or we have a timeout that's unexpired, run poll. if (!timeout || (*timeout > Milliseconds(0))) { _inPoll = true; lk.unlock(); rval = ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count()); const auto pollGuard = MakeGuard([&] { lk.lock(); _inPoll = false; }); // If poll failed, it better be in EINTR if (rval < 0 && errno != EINTR) { severe() << "error in poll: " << errnoWithDescription(errno); fassertFailed(50834); } // Note that it's important to check for interrupt without the lock, because markKilled // calls schedule, which will deadlock if we're holding the lock when calling this. if (opCtx) { opCtx->checkForInterrupt(); } } now = Date_t::now(); // If our deadline passed while in poll, we've failed if (deadline && now > *deadline) { return false; } // Fire expired timers for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) { toFulfill.push_back(std::move(iter->promise)); _timersById.erase(iter->id); iter = _timers.erase(iter); } // If poll found some activity if (rval > 0) { size_t remaining = rval; auto pollIter = pollSet.begin(); if (pollIter->revents) { _efd.wait(); remaining--; } ++pollIter; for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining; ++sessionIter, ++pollIter) { if (pollIter->revents) { toFulfill.push_back(std::move((*sessionIter)->second.promise)); _sessions.erase(*sessionIter); remaining--; } } invariant(remaining == 0); } return true; } private: struct Timer { const ReactorTimer* id; Date_t expiration; SharedPromise promise; struct LessThan { bool operator()(const Timer& lhs, const Timer& rhs) const { return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id); } }; }; struct TransportSession { Type type; SharedPromise promise; }; template void _safeExecute(Callback&& cb) { return _safeExecute(stdx::unique_lock(_mutex), std::forward(cb)); } /** * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to * the eventfd. If not, we run inline. */ template void _safeExecute(stdx::unique_lock lk, Callback&& cb) { if (_inPoll) { _scheduled.push_back([cb, this] { stdx::lock_guard lk(_mutex); cb(); }); _efd.notify(); } else { cb(); } } stdx::mutex _mutex; OperationContext* _opCtx; bool _inPoll = false; EventFDHolder _efd; // This map stores the sessions we need to poll on. We unwind it into a pollset for every // blocking call to run stdx::unordered_map _sessions; // The set is used to find the next timer which will fire. The unordered_map looks up the // timers so we can remove them in O(1) std::set _timers; stdx::unordered_map _timersById; // For tasks that come in via schedule. Or that were deferred because we were in poll std::vector> _scheduled; }; } // namespace transport } // namespace mongo