summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/transport/baton_asio_linux.h159
-rw-r--r--src/mongo/transport/session_asio.h2
2 files changed, 96 insertions, 65 deletions
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 1d89194ee98..57762c9af4b 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -42,7 +42,9 @@
#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/baton.h"
#include "mongo/transport/session_asio.h"
+#include "mongo/util/concepts.h"
#include "mongo/util/errno_util.h"
+#include "mongo/util/functional.h"
#include "mongo/util/future.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/time_support.h"
@@ -57,6 +59,8 @@ namespace transport {
*/
class TransportLayerASIO::BatonASIO : public NetworkingBaton {
static const inline auto kDetached = Status(ErrorCodes::ShutdownInProgress, "Baton detached");
+ static const inline auto kCanceled =
+ Status(ErrorCodes::CallbackCanceled, "Baton wait canceled");
/**
* We use this internal reactor timer to exit run_until calls (by forcing an early timeout for
@@ -155,42 +159,48 @@ public:
return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT);
}
- Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override {
+ Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override try {
auto pf = makePromiseFuture<void>();
auto id = timer.id();
- stdx::unique_lock<Latch> lk(_mutex);
+ stdx::unique_lock lk(_mutex);
- if (!_opCtx) {
- return kDetached;
- }
-
- _safeExecute(std::move(lk),
- [id, expiration, promise = std::move(pf.promise), this]() mutable {
- auto iter = _timers.emplace(std::piecewise_construct,
- std::forward_as_tuple(expiration),
- std::forward_as_tuple(id, std::move(promise)));
- _timersById[id] = iter;
- });
+ _safeExecute(std::move(lk), [ expiration, timer = Timer{id, std::move(pf.promise)},
+ this ](stdx::unique_lock<Mutex>) mutable noexcept {
+ auto iter = _timers.emplace(expiration, std::move(timer));
+ _timersById[iter->second.id] = iter;
+ });
return std::move(pf.future);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
bool canWait() noexcept override {
- stdx::lock_guard<Latch> lk(_mutex);
+ stdx::lock_guard lk(_mutex);
return _opCtx;
}
bool cancelSession(Session& session) noexcept override {
const auto id = session.id();
- stdx::unique_lock<Latch> lk(_mutex);
+ stdx::unique_lock lk(_mutex);
if (_sessions.find(id) == _sessions.end()) {
return false;
}
- _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); });
+ _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept {
+ auto iter = _sessions.find(id);
+ if (iter == _sessions.end()) {
+ return;
+ }
+ auto session = std::exchange(iter->second, {});
+ _sessions.erase(iter);
+ lk.unlock();
+
+ session.promise.setError(kCanceled);
+ });
return true;
}
@@ -198,34 +208,50 @@ public:
bool cancelTimer(const ReactorTimer& timer) noexcept override {
const auto id = timer.id();
- stdx::unique_lock<Latch> lk(_mutex);
+ stdx::unique_lock lk(_mutex);
if (_timersById.find(id) == _timersById.end()) {
return false;
}
- _safeExecute(std::move(lk), [id, this] {
+ _safeExecute(std::move(lk), [ id, this ](stdx::unique_lock<Mutex> lk) noexcept {
auto iter = _timersById.find(id);
- if (iter != _timersById.end()) {
- _timers.erase(iter->second);
- _timersById.erase(iter);
+ if (iter == _timersById.end()) {
+ return;
}
+
+ auto timer = std::exchange(iter->second->second, {});
+ _timers.erase(iter->second);
+ _timersById.erase(iter);
+ lk.unlock();
+
+ timer.promise.setError(kCanceled);
});
return true;
}
void schedule(Task func) noexcept override {
- stdx::lock_guard<Latch> lk(_mutex);
+ stdx::unique_lock lk(_mutex);
if (!_opCtx) {
+ lk.unlock();
func(kDetached);
return;
}
- _scheduled.push_back(std::move(func));
+ _scheduled.push_back(
+ [ this, func = std::move(func) ](stdx::unique_lock<Mutex> lk) mutable noexcept {
+ auto status = Status::OK();
+ if (!_opCtx) {
+ status = kDetached;
+ }
+ lk.unlock();
+
+ func(status);
+ });
if (_inPoll) {
efd().notify();
@@ -267,22 +293,19 @@ public:
promise.emplaceValue();
}
- stdx::unique_lock<Latch> lk(_mutex);
+ auto lk = stdx::unique_lock(_mutex);
while (_scheduled.size()) {
- auto toRun = std::exchange(_scheduled, {});
+ auto scheduled = std::exchange(_scheduled, {});
+ for (auto& job : scheduled) {
+ job(std::move(lk));
+ job = nullptr;
- lk.unlock();
- while (toRun.size()) {
- // While there are jobs to run, run and dtor in sequence
- auto& job = toRun.back();
- job(Status::OK());
- toRun.pop_back();
+ lk = stdx::unique_lock(_mutex);
}
- lk.lock();
}
});
- stdx::unique_lock<Latch> lk(_mutex);
+ stdx::unique_lock lk(_mutex);
// If anything was scheduled, run it now. No need to poll
if (_scheduled.size()) {
@@ -325,11 +348,8 @@ public:
rval = ::poll(_pollSet.data(),
_pollSet.size(),
deadline ? Milliseconds(*deadline - now).count() : -1);
-
- const auto pollGuard = makeGuard([&] {
- lk.lock();
- _inPoll = false;
- });
+ lk.lock();
+ _inPoll = false;
// If poll failed, it better be in EINTR
if (rval < 0 && errno != EINTR) {
@@ -380,32 +400,31 @@ public:
}
private:
- Future<void> addSessionImpl(Session& session, short type) noexcept {
+ Future<void> addSessionImpl(Session& session, short type) noexcept try {
auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
auto id = session.id();
auto pf = makePromiseFuture<void>();
- stdx::unique_lock<Latch> lk(_mutex);
-
- if (!_opCtx) {
- return kDetached;
- }
+ stdx::unique_lock lk(_mutex);
_safeExecute(std::move(lk),
- [id, fd, type, promise = std::move(pf.promise), this]() mutable {
- _sessions[id] = TransportSession{fd, type, std::move(promise)};
+ [ id, session = TransportSession{fd, type, std::move(pf.promise)},
+ this ](stdx::unique_lock<Mutex>) mutable noexcept {
+ auto ret = _sessions.emplace(id, std::move(session));
+ invariant(ret.second);
});
-
return std::move(pf.future);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
void detachImpl() noexcept override {
- decltype(_sessions) sessions;
decltype(_scheduled) scheduled;
+ decltype(_sessions) sessions;
decltype(_timers) timers;
{
- stdx::lock_guard<Latch> lk(_mutex);
+ stdx::lock_guard lk(_mutex);
invariant(_opCtx->getBaton().get() == this);
_opCtx->setBaton(nullptr);
@@ -413,13 +432,14 @@ private:
_opCtx = nullptr;
using std::swap;
- swap(_sessions, sessions);
swap(_scheduled, scheduled);
+ swap(_sessions, sessions);
swap(_timers, timers);
}
for (auto& job : scheduled) {
- job(kDetached);
+ job(stdx::unique_lock(_mutex));
+ job = nullptr;
}
for (auto& session : sessions) {
@@ -432,8 +452,6 @@ private:
}
struct Timer {
- Timer(size_t id, Promise<void> promise) : id(id), promise(std::move(promise)) {}
-
size_t id;
Promise<void> promise; // Needs to be mutable to move from it while in std::set.
};
@@ -444,21 +462,34 @@ private:
Promise<void> promise;
};
+ // Internally, the BatonASIO thinks in terms of synchronized units of work. This is because
+ // a Baton effectively represents a green thread with the potential to add or remove work (i.e.
+ // Jobs) at any time. Jobs with external notifications (OutOfLineExecutor::Tasks,
+ // TransportSession:promise, ReactorTimer::promise) are expected to release their lock before
+ // generating those notifications.
+ using Job = unique_function<void(stdx::unique_lock<Mutex>)>;
+
/**
- * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to
- * the eventfd. If not, we run inline.
+ * Invoke a job with exclusive access to the Baton internals.
+ *
+ * If we are currently _inPoll, the polling thread owns the Baton and thus we tell it to wake up
+ * and run our job. If we are not _inPoll, take exclusive access and run our job on the local
+ * thread. Note that _safeExecute() will throw if the Baton has been detached.
*/
- template <typename Callback>
- void _safeExecute(stdx::unique_lock<Latch> lk, Callback&& cb) {
+ TEMPLATE(typename Callback)
+ REQUIRES(std::is_nothrow_invocable_v<Callback, stdx::unique_lock<Mutex>>)
+ void _safeExecute(stdx::unique_lock<Mutex> lk, Callback&& job) {
+ if (!_opCtx) {
+ // If we're detached, no job can safely execute.
+ uassertStatusOK(kDetached);
+ }
+
if (_inPoll) {
- _scheduled.push_back([cb = std::forward<Callback>(cb), this](Status) mutable {
- stdx::lock_guard<Latch> lk(_mutex);
- cb();
- });
+ _scheduled.push_back(std::forward<Callback>(job));
efd().notify();
} else {
- cb();
+ job(std::move(lk));
}
}
@@ -479,10 +510,10 @@ private:
// The set is used to find the next timer which will fire. The unordered_map looks up the
// timers so we can remove them in O(1)
std::multimap<Date_t, Timer> _timers;
- stdx::unordered_map<size_t, decltype(_timers)::const_iterator> _timersById;
+ stdx::unordered_map<size_t, decltype(_timers)::iterator> _timersById;
// For tasks that come in via schedule. Or that were deferred because we were in poll
- std::vector<Task> _scheduled;
+ std::vector<Job> _scheduled;
// We hold the two following values at the object level to save on allocations when a baton is
// waited on many times over the course of its lifetime.
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index e4a40043c0d..08042c94fe5 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -510,7 +510,7 @@ private:
networkingBaton && networkingBaton->canWait()) {
return networkingBaton->addSession(*this, NetworkingBaton::Type::In)
.onError([](Status error) {
- if (ErrorCodes::isCancelationError(error)) {
+ if (ErrorCodes::isShutdownError(error)) {
// If the baton has detached, it will cancel its polling. We catch that
// error here and return Status::OK so that we invoke
// opportunisticRead() again and switch to asio::async_read() below.