diff options
author | Jason Carey <jcarey@argv.me> | 2019-01-23 13:18:49 -0500 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2019-02-05 22:41:49 -0500 |
commit | a23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81 (patch) | |
tree | 1adc2fdb36e6c8babaab134d53f84de3020c2404 /src/mongo/transport | |
parent | 5fd66f15797c45c9bab7b59f9e55e0a2f7ad5cd0 (diff) | |
download | mongo-a23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81.tar.gz |
SERVER-39146 Refactor Baton
Refactor the baton into regular and networking batons while also
cleaning up the basic baton implementation.
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/transport/baton.h | 61 | ||||
-rw-r--r-- | src/mongo/transport/baton_asio_linux.h | 219 | ||||
-rw-r--r-- | src/mongo/transport/mock_session.h | 7 | ||||
-rw-r--r-- | src/mongo/transport/session.h | 18 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 35 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.cpp | 15 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.h | 17 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 17 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.h | 2 |
11 files changed, 229 insertions, 169 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 43032ccb1b7..12b2cf44899 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -14,6 +14,9 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/service_context', + ], ) env.Library( diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index 9dbf570ede2..de981ef6284 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -1,6 +1,6 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -32,9 +32,11 @@ #include <memory> +#include "mongo/db/baton.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/functional.h" #include "mongo/util/future.h" +#include "mongo/util/out_of_line_executor.h" #include "mongo/util/time_support.h" #include "mongo/util/waitable.h" @@ -49,44 +51,17 @@ class Session; class ReactorTimer; /** - * A Baton is basically a networking reactor, with limited functionality and no forward progress - * guarantees. Rather than asynchronously running tasks through one, the baton records the intent - * of those tasks and defers waiting and execution to a later call to run(); + * A NetworkingBaton is basically a networking reactor, with limited functionality and parallel + * forward progress guarantees. Rather than asynchronously running tasks through one, the baton + * records the intent of those tasks and defers waiting and execution to a later call to run(); * - * Baton's provide a mechanism to allow consumers of a transport layer to execute IO themselves, - * rather than having this occur on another thread. This can improve performance by minimizing - * context switches, as well as improving the readability of stack traces by grounding async - * execution on top of a regular client call stack. + * NetworkingBaton's provide a mechanism to allow consumers of a transport layer to execute IO + * themselves, rather than having this occur on another thread. This can improve performance by + * minimizing context switches, as well as improving the readability of stack traces by grounding + * async execution on top of a regular client call stack. */ -class Baton : public Waitable { +class NetworkingBaton : public Baton { public: - virtual ~Baton() = default; - - /** - * Detaches a baton from an associated opCtx. - */ - virtual void detach() = 0; - - /** - * Executes a callback on the baton via schedule. Returns a future which will execute on the - * baton runner. - */ - template <typename Callback> - Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { - auto pf = makePromiseFuture<FutureContinuationResult<Callback>>(); - - schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable { - p.setWith(std::move(cb)); - }); - - return std::move(pf.future); - } - - /** - * Executes a callback on the baton. - */ - virtual void schedule(unique_function<void()> func) = 0; - /** * Adds a session, returning a future which activates on read/write-ability of the session. */ @@ -94,29 +69,31 @@ public: In, Out, }; - virtual Future<void> addSession(Session& session, Type type) = 0; + virtual Future<void> addSession(Session& session, Type type) noexcept = 0; /** * Adds a timer, returning a future which activates after a deadline. */ - virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0; + virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept = 0; /** * Cancels waiting on a session. * * Returns true if the session was in the baton to be cancelled. */ - virtual bool cancelSession(Session& session) = 0; + virtual bool cancelSession(Session& session) noexcept = 0; /** * Cancels waiting on a timer * * Returns true if the timer was in the baton to be cancelled. */ - virtual bool cancelTimer(const ReactorTimer& timer) = 0; -}; + virtual bool cancelTimer(const ReactorTimer& timer) noexcept = 0; -using BatonHandle = std::shared_ptr<Baton>; + NetworkingBaton* networking() noexcept final { + return this; + } +}; } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 63bfe67da3f..55570768ba7 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -30,8 +30,8 @@ #pragma once +#include <map> #include <memory> -#include <set> #include <vector> #include <poll.h> @@ -55,7 +55,7 @@ namespace transport { * * We implement our networking reactor on top of poll + eventfd for wakeups */ -class TransportLayerASIO::BatonASIO : public Baton { +class TransportLayerASIO::BatonASIO : public NetworkingBaton { /** * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for * ::poll). @@ -117,6 +117,8 @@ class TransportLayerASIO::BatonASIO : public Baton { } const int fd; + + static const Client::Decoration<EventFDHolder> getForClient; }; public: @@ -129,75 +131,69 @@ public: invariant(_timers.empty()); } - void detach() override { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_sessions.empty()); - invariant(_scheduled.empty()); - invariant(_timers.empty()); - } + void markKillOnClientDisconnect() noexcept override { + if (_opCtx->getClient() && _opCtx->getClient()->session()) { + addSessionImpl(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) { + if (!s.isOK()) { + return; + } - { - stdx::lock_guard<Client> lk(*_opCtx->getClient()); - invariant(_opCtx->getBaton().get() == this); - _opCtx->setBaton(nullptr); + _opCtx->markKilled(ErrorCodes::ClientDisconnect); + }); } + } - _opCtx = nullptr; + Future<void> addSession(Session& session, Type type) noexcept override { + return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT); } - Future<void> addSession(Session& session, Type type) override { - auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override { auto pf = makePromiseFuture<void>(); + auto id = timer.id(); - _safeExecute([ fd, type, promise = std::move(pf.promise), this ]() mutable { - _sessions[fd] = TransportSession{type, std::move(promise)}; - }); + stdx::unique_lock<stdx::mutex> lk(_mutex); - return std::move(pf.future); - } + if (!_opCtx) { + return Status(ErrorCodes::ShutdownInProgress, + "baton is detached, cannot waitUntil on timer"); + } - Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { - auto pf = makePromiseFuture<void>(); - _safeExecute( - [ timerPtr = &timer, expiration, promise = std::move(pf.promise), this ]() mutable { - auto pair = _timers.insert({ - timerPtr, expiration, std::move(promise), - }); - invariant(pair.second); - _timersById[pair.first->id] = pair.first; - }); + _safeExecute(std::move(lk), + [ id, expiration, promise = std::move(pf.promise), this ]() mutable { + auto iter = _timers.emplace(std::piecewise_construct, + std::forward_as_tuple(expiration), + std::forward_as_tuple(id, std::move(promise))); + _timersById[id] = iter; + }); return std::move(pf.future); } - bool cancelSession(Session& session) override { - const auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + bool cancelSession(Session& session) noexcept override { + const auto id = session.id(); stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_sessions.find(fd) == _sessions.end()) { + if (_sessions.find(id) == _sessions.end()) { return false; } - // TODO: There's an ABA issue here with fds where between previously and before we could - // have removed the fd, then opened and added a new socket with the same fd. We need to - // solve it via using session id's for handles. - _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); }); + _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); }); return true; } - bool cancelTimer(const ReactorTimer& timer) override { + bool cancelTimer(const ReactorTimer& timer) noexcept override { + const auto id = timer.id(); + stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_timersById.find(&timer) == _timersById.end()) { + if (_timersById.find(id) == _timersById.end()) { return false; } - // TODO: Same ABA issue as above, but for pointers. - _safeExecute(std::move(lk), [ timerPtr = &timer, this ] { - auto iter = _timersById.find(timerPtr); + _safeExecute(std::move(lk), [id, this] { + auto iter = _timersById.find(id); if (iter != _timersById.end()) { _timers.erase(iter->second); @@ -208,18 +204,24 @@ public: return true; } - void schedule(unique_function<void()> func) override { + void schedule(unique_function<void(OperationContext*)> func) noexcept override { stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_opCtx) { + func(nullptr); + + return; + } + _scheduled.push_back(std::move(func)); if (_inPoll) { - _efd.notify(); + efd().notify(); } } void notify() noexcept override { - _efd.notify(); + efd().notify(); } /** @@ -263,7 +265,7 @@ public: lk.unlock(); for (auto& job : toRun) { - job(); + job(_opCtx); } lk.lock(); } @@ -280,21 +282,19 @@ public: // If we have a timer, poll no longer than that if (_timers.size()) { - deadline = _timers.begin()->expiration; + deadline = _timers.begin()->first; } std::vector<decltype(_sessions)::iterator> sessions; sessions.reserve(_sessions.size()); std::vector<pollfd> pollSet; + pollSet.reserve(_sessions.size() + 1); - pollSet.push_back(pollfd{_efd.fd, POLLIN, 0}); + pollSet.push_back(pollfd{efd().fd, POLLIN, 0}); for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { - pollSet.push_back( - pollfd{iter->first, - static_cast<short>(iter->second.type == Type::In ? POLLIN : POLLOUT), - 0}); + pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0}); sessions.push_back(iter); } @@ -330,9 +330,9 @@ public: now = clkSource->now(); // Fire expired timers - for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) { - toFulfill.push_back(std::move(iter->promise)); - _timersById.erase(iter->id); + for (auto iter = _timers.begin(); iter != _timers.end() && iter->first < now;) { + toFulfill.push_back(std::move(iter->second.promise)); + _timersById.erase(iter->second.id); iter = _timers.erase(iter); } @@ -343,12 +343,13 @@ public: auto pollIter = pollSet.begin(); if (pollIter->revents) { - _efd.wait(); + efd().wait(); remaining--; } ++pollIter; + for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining; ++sessionIter, ++pollIter) { if (pollIter->revents) { @@ -366,28 +367,75 @@ public: } private: - struct Timer { - const ReactorTimer* id; - Date_t expiration; - mutable Promise<void> promise; // Needs to be mutable to move from it while in std::set. + Future<void> addSessionImpl(Session& session, short type) noexcept { + auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + auto id = session.id(); + auto pf = makePromiseFuture<void>(); + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (!_opCtx) { + return Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot addSession"); + } - struct LessThan { - bool operator()(const Timer& lhs, const Timer& rhs) const { - return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id); + _safeExecute(std::move(lk), + [ id, fd, type, promise = std::move(pf.promise), this ]() mutable { + _sessions[id] = TransportSession{fd, type, std::move(promise)}; + }); + + return std::move(pf.future); + } + + void detachImpl() noexcept override { + decltype(_sessions) sessions; + decltype(_scheduled) scheduled; + decltype(_timers) timers; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + { + stdx::lock_guard<Client> lk(*_opCtx->getClient()); + invariant(_opCtx->getBaton().get() == this); + _opCtx->setBaton(nullptr); } - }; + + _opCtx = nullptr; + + using std::swap; + swap(_sessions, sessions); + swap(_scheduled, scheduled); + swap(_timers, timers); + } + + for (auto& job : scheduled) { + job(nullptr); + } + + for (auto& session : sessions) { + session.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, cannot wait for socket")); + } + + for (auto& pair : timers) { + pair.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, completing timer early")); + } + } + + struct Timer { + Timer(size_t id, Promise<void> promise) : id(id), promise(std::move(promise)) {} + + size_t id; + Promise<void> promise; // Needs to be mutable to move from it while in std::set. }; struct TransportSession { - Type type; + int fd; + short type; Promise<void> promise; }; - template <typename Callback> - void _safeExecute(Callback&& cb) { - return _safeExecute(stdx::unique_lock<stdx::mutex>(_mutex), std::forward<Callback>(cb)); - } - /** * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to * the eventfd. If not, we run inline. @@ -395,37 +443,44 @@ private: template <typename Callback> void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { if (_inPoll) { - _scheduled.push_back([ cb = std::forward<Callback>(cb), this ]() mutable { - stdx::lock_guard<stdx::mutex> lk(_mutex); - cb(); - }); + _scheduled.push_back( + [ cb = std::forward<Callback>(cb), this ](OperationContext*) mutable { + stdx::lock_guard<stdx::mutex> lk(_mutex); + cb(); + }); - _efd.notify(); + efd().notify(); } else { cb(); } } + EventFDHolder& efd() { + return EventFDHolder::getForClient(_opCtx->getClient()); + } + stdx::mutex _mutex; OperationContext* _opCtx; bool _inPoll = false; - EventFDHolder _efd; - // This map stores the sessions we need to poll on. We unwind it into a pollset for every // blocking call to run - stdx::unordered_map<int, TransportSession> _sessions; + stdx::unordered_map<SessionId, TransportSession> _sessions; // The set is used to find the next timer which will fire. The unordered_map looks up the // timers so we can remove them in O(1) - std::set<Timer, Timer::LessThan> _timers; - stdx::unordered_map<const ReactorTimer*, decltype(_timers)::const_iterator> _timersById; + std::multimap<Date_t, Timer> _timers; + stdx::unordered_map<size_t, decltype(_timers)::const_iterator> _timersById; // For tasks that come in via schedule. Or that were deferred because we were in poll - std::vector<unique_function<void()>> _scheduled; + std::vector<unique_function<void(OperationContext*)>> _scheduled; }; +const Client::Decoration<TransportLayerASIO::BatonASIO::EventFDHolder> + TransportLayerASIO::BatonASIO::EventFDHolder::getForClient = + Client::declareDecoration<TransportLayerASIO::BatonASIO::EventFDHolder>(); + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index a5c4b78de82..4e350887446 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -85,7 +85,7 @@ public: return Message(); // Subclasses can do something different. } - Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) override { return Future<Message>::makeReady(sourceMessage()); } @@ -101,12 +101,11 @@ public: return Status::OK(); } - Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& handle = nullptr) override { + Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) override { return Future<void>::makeReady(sinkMessage(message)); } - void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) override {} + void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {} void setTimeout(boost::optional<Milliseconds>) override {} diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 4fa709ed849..28349273362 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -33,6 +33,7 @@ #include <memory> #include "mongo/base/disallow_copying.h" +#include "mongo/db/baton.h" #include "mongo/platform/atomic_word.h" #include "mongo/rpc/message.h" #include "mongo/transport/session_id.h" @@ -46,8 +47,6 @@ namespace transport { class TransportLayer; class Session; -class Baton; -using BatonHandle = std::shared_ptr<Baton>; using SessionHandle = std::shared_ptr<Session>; using ConstSessionHandle = std::shared_ptr<const Session>; @@ -107,7 +106,7 @@ public: * Source (receive) a new Message from the remote host for this Session. */ virtual StatusWith<Message> sourceMessage() = 0; - virtual Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) = 0; + virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0; /** * Sink (send) a Message to the remote host for this Session. @@ -115,15 +114,14 @@ public: * Async version will keep the buffer alive until the operation completes. */ virtual Status sinkMessage(Message message) = 0; - virtual Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& handle = nullptr) = 0; + virtual Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) = 0; /** * Cancel any outstanding async operations. There is no way to cancel synchronous calls. * Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already * completed. */ - virtual void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) = 0; + virtual void cancelAsyncOperations(const BatonHandle& handle = nullptr) = 0; /** * This should only be used to detect when the remote host has disappeared without @@ -155,14 +153,14 @@ public: * * The 'kPending' tag is only for new sessions; callers should not set it directly. */ - virtual void setTags(TagMask tagsToSet); + void setTags(TagMask tagsToSet); /** * Atomically clears all of the session tags specified in the 'tagsToUnset' bit field. If the * 'kPending' tag is set, indicating that no tags have yet been specified for the session, this * function also clears that tag as part of the same atomic operation. */ - virtual void unsetTags(TagMask tagsToUnset); + void unsetTags(TagMask tagsToUnset); /** * Loads the session tags, passes them to 'mutateFunc' and then stores the result of that call @@ -175,9 +173,9 @@ public: * of the 'mutateFunc' call. The 'kPending' tag is only for new sessions; callers should never * try to set it. */ - virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc); + void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc); - virtual TagMask getTags() const; + TagMask getTags() const; protected: Session(); diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index b74b29af1d9..49f56357686 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -133,7 +133,7 @@ public: return sourceMessageImpl().getNoThrow(); } - Future<Message> asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override { ensureAsync(); return sourceMessageImpl(baton); } @@ -150,8 +150,7 @@ public: .getNoThrow(); } - Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& baton = nullptr) override { + Future<void> asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) override { ensureAsync(); return write(asio::buffer(message.buf(), message.size()), baton) .then([this, message /*keep the buffer alive*/]() { @@ -161,10 +160,10 @@ public: }); } - void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override { + void cancelAsyncOperations(const BatonHandle& baton = nullptr) override { LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote; - if (baton) { - baton->cancelSession(*this); + if (baton && baton->networking()) { + baton->networking()->cancelSession(*this); } else { getSocket().cancel(); } @@ -342,7 +341,7 @@ private: return _socket; } - Future<Message> sourceMessageImpl(const transport::BatonHandle& baton = nullptr) { + Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr) { static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value); auto headerBuffer = SharedBuffer::allocate(kHeaderSize); @@ -387,8 +386,7 @@ private: } template <typename MutableBufferSequence> - Future<void> read(const MutableBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL if (_sslSocket) { return opportunisticRead(*_sslSocket, buffers, baton); @@ -413,8 +411,7 @@ private: } template <typename ConstBufferSequence> - Future<void> write(const ConstBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL _ranHandshake = true; if (_sslSocket) { @@ -439,7 +436,7 @@ private: template <typename Stream, typename MutableBufferSequence> Future<void> opportunisticRead(Stream& stream, const MutableBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + const BatonHandle& baton = nullptr) { std::error_code ec; size_t size; @@ -469,8 +466,9 @@ private: asyncBuffers += size; } - if (baton) { - return baton->addSession(*this, Baton::Type::In) + if (baton && baton->networking()) { + return baton->networking() + ->addSession(*this, NetworkingBaton::Type::In) .then([&stream, asyncBuffers, baton, this] { return opportunisticRead(stream, asyncBuffers, baton); }); @@ -494,7 +492,7 @@ private: template <typename ConstBufferSequence> boost::optional<Future<void>> moreToSend(GenericSocket& socket, const ConstBufferSequence& buffers, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return boost::none; } @@ -517,7 +515,7 @@ private: template <typename Stream, typename ConstBufferSequence> Future<void> opportunisticWrite(Stream& stream, const ConstBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + const BatonHandle& baton = nullptr) { std::error_code ec; std::size_t size; @@ -552,8 +550,9 @@ private: return std::move(*more); } - if (baton) { - return baton->addSession(*this, Baton::Type::Out) + if (baton && baton->networking()) { + return baton->networking() + ->addSession(*this, NetworkingBaton::Type::Out) .then([&stream, asyncBuffers, baton, this] { return opportunisticWrite(stream, asyncBuffers, baton); }); diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/transport_layer.cpp index 442cf78b15c..e2b997add0c 100644 --- a/src/mongo/transport/transport_layer.cpp +++ b/src/mongo/transport/transport_layer.cpp @@ -31,11 +31,20 @@ #include "mongo/platform/basic.h" #include "mongo/base/status.h" +#include "mongo/db/operation_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/transport/baton.h" #include "mongo/transport/transport_layer.h" namespace mongo { namespace transport { +namespace { + +AtomicWord<uint64_t> reactorTimerIdCounter(0); + +} // namespace + const Status TransportLayer::SessionUnknownStatus = Status(ErrorCodes::TransportSessionUnknown, "TransportLayer does not own the Session."); @@ -48,5 +57,11 @@ const Status TransportLayer::TicketSessionUnknownStatus = Status( const Status TransportLayer::TicketSessionClosedStatus = Status( ErrorCodes::TransportSessionClosed, "Operation attempted on a closed transport Session."); +ReactorTimer::ReactorTimer() : _id(reactorTimerIdCounter.addAndFetch(1)) {} + +BatonHandle TransportLayer::makeDefaultBaton(OperationContext* opCtx) { + return opCtx->getServiceContext()->makeBaton(opCtx); +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index ca1453cb3a4..0b736ba600f 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -109,17 +109,21 @@ public: enum WhichReactor { kIngress, kEgress, kNewReactor }; virtual ReactorHandle getReactor(WhichReactor which) = 0; - virtual BatonHandle makeBaton(OperationContext* opCtx) { - return nullptr; + virtual BatonHandle makeBaton(OperationContext* opCtx) const { + return makeDefaultBaton(opCtx); } protected: TransportLayer() = default; + +private: + static BatonHandle makeDefaultBaton(OperationContext* opCtx); }; class ReactorTimer { public: - ReactorTimer() = default; + ReactorTimer(); + ReactorTimer(const ReactorTimer&) = delete; ReactorTimer& operator=(const ReactorTimer&) = delete; @@ -128,6 +132,10 @@ public: */ virtual ~ReactorTimer() = default; + size_t id() const { + return _id; + } + /* * Cancel any outstanding future from waitFor/waitUntil. The future will be filled with an * ErrorCodes::CallbackCancelled status. @@ -142,6 +150,9 @@ public: * Calling this implicitly calls cancel(). */ virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0; + +private: + const size_t _id; }; class Reactor { diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 06604606d0c..6f0c02d17fb 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -56,9 +56,11 @@ #endif // session_asio.h has some header dependencies that require it to be the last header. + #ifdef __linux__ #include "mongo/transport/baton_asio_linux.h" #endif + #include "mongo/transport/session_asio.h" namespace mongo { @@ -79,7 +81,7 @@ public: void cancel(const BatonHandle& baton = nullptr) override { // If we have a baton try to cancel that. - if (baton && baton->cancelTimer(*this)) { + if (baton && baton->networking() && baton->networking()->cancelTimer(*this)) { LOG(2) << "Canceled via baton, skipping asio cancel."; return; } @@ -90,8 +92,9 @@ public: Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override { - if (baton) { - return _asyncWait([&] { return baton->waitUntil(*this, expiration); }, baton); + if (baton && baton->networking()) { + return _asyncWait([&] { return baton->networking()->waitUntil(*this, expiration); }, + baton); } else { return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); }); } @@ -875,8 +878,8 @@ SSLParams::SSLModes TransportLayerASIO::_sslMode() const { } #endif -BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) { #ifdef __linux__ +BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) const { auto baton = std::make_shared<BatonASIO>(opCtx); { @@ -885,11 +888,9 @@ BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) { opCtx->setBaton(baton); } - return std::move(baton); -#else - return nullptr; -#endif + return baton; } +#endif } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 77815024374..b5619bcf361 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -136,7 +136,9 @@ public: return _listenerPort; } - BatonHandle makeBaton(OperationContext* opCtx) override; +#ifdef __linux__ + BatonHandle makeBaton(OperationContext* opCtx) const override; +#endif private: class BatonASIO; diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 60cfb58a41e..29ad6457be6 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -90,7 +90,7 @@ public: static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer(); - BatonHandle makeBaton(OperationContext* opCtx) override { + BatonHandle makeBaton(OperationContext* opCtx) const override { stdx::lock_guard<stdx::mutex> lk(_tlsMutex); // TODO: figure out what to do about managers with more than one transport layer. invariant(_tls.size() == 1); |