summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/SConscript3
-rw-r--r--src/mongo/transport/baton.h61
-rw-r--r--src/mongo/transport/baton_asio_linux.h219
-rw-r--r--src/mongo/transport/mock_session.h7
-rw-r--r--src/mongo/transport/session.h18
-rw-r--r--src/mongo/transport/session_asio.h35
-rw-r--r--src/mongo/transport/transport_layer.cpp15
-rw-r--r--src/mongo/transport/transport_layer.h17
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp17
-rw-r--r--src/mongo/transport/transport_layer_asio.h4
-rw-r--r--src/mongo/transport/transport_layer_manager.h2
11 files changed, 229 insertions, 169 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 43032ccb1b7..12b2cf44899 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -14,6 +14,9 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/service_context',
+ ],
)
env.Library(
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index 9dbf570ede2..de981ef6284 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -1,6 +1,6 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -32,9 +32,11 @@
#include <memory>
+#include "mongo/db/baton.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
+#include "mongo/util/out_of_line_executor.h"
#include "mongo/util/time_support.h"
#include "mongo/util/waitable.h"
@@ -49,44 +51,17 @@ class Session;
class ReactorTimer;
/**
- * A Baton is basically a networking reactor, with limited functionality and no forward progress
- * guarantees. Rather than asynchronously running tasks through one, the baton records the intent
- * of those tasks and defers waiting and execution to a later call to run();
+ * A NetworkingBaton is basically a networking reactor, with limited functionality and parallel
+ * forward progress guarantees. Rather than asynchronously running tasks through one, the baton
+ * records the intent of those tasks and defers waiting and execution to a later call to run();
*
- * Baton's provide a mechanism to allow consumers of a transport layer to execute IO themselves,
- * rather than having this occur on another thread. This can improve performance by minimizing
- * context switches, as well as improving the readability of stack traces by grounding async
- * execution on top of a regular client call stack.
+ * NetworkingBaton's provide a mechanism to allow consumers of a transport layer to execute IO
+ * themselves, rather than having this occur on another thread. This can improve performance by
+ * minimizing context switches, as well as improving the readability of stack traces by grounding
+ * async execution on top of a regular client call stack.
*/
-class Baton : public Waitable {
+class NetworkingBaton : public Baton {
public:
- virtual ~Baton() = default;
-
- /**
- * Detaches a baton from an associated opCtx.
- */
- virtual void detach() = 0;
-
- /**
- * Executes a callback on the baton via schedule. Returns a future which will execute on the
- * baton runner.
- */
- template <typename Callback>
- Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
- auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
-
- schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
- p.setWith(std::move(cb));
- });
-
- return std::move(pf.future);
- }
-
- /**
- * Executes a callback on the baton.
- */
- virtual void schedule(unique_function<void()> func) = 0;
-
/**
* Adds a session, returning a future which activates on read/write-ability of the session.
*/
@@ -94,29 +69,31 @@ public:
In,
Out,
};
- virtual Future<void> addSession(Session& session, Type type) = 0;
+ virtual Future<void> addSession(Session& session, Type type) noexcept = 0;
/**
* Adds a timer, returning a future which activates after a deadline.
*/
- virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0;
+ virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept = 0;
/**
* Cancels waiting on a session.
*
* Returns true if the session was in the baton to be cancelled.
*/
- virtual bool cancelSession(Session& session) = 0;
+ virtual bool cancelSession(Session& session) noexcept = 0;
/**
* Cancels waiting on a timer
*
* Returns true if the timer was in the baton to be cancelled.
*/
- virtual bool cancelTimer(const ReactorTimer& timer) = 0;
-};
+ virtual bool cancelTimer(const ReactorTimer& timer) noexcept = 0;
-using BatonHandle = std::shared_ptr<Baton>;
+ NetworkingBaton* networking() noexcept final {
+ return this;
+ }
+};
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 63bfe67da3f..55570768ba7 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -30,8 +30,8 @@
#pragma once
+#include <map>
#include <memory>
-#include <set>
#include <vector>
#include <poll.h>
@@ -55,7 +55,7 @@ namespace transport {
*
* We implement our networking reactor on top of poll + eventfd for wakeups
*/
-class TransportLayerASIO::BatonASIO : public Baton {
+class TransportLayerASIO::BatonASIO : public NetworkingBaton {
/**
* We use this internal reactor timer to exit run_until calls (by forcing an early timeout for
* ::poll).
@@ -117,6 +117,8 @@ class TransportLayerASIO::BatonASIO : public Baton {
}
const int fd;
+
+ static const Client::Decoration<EventFDHolder> getForClient;
};
public:
@@ -129,75 +131,69 @@ public:
invariant(_timers.empty());
}
- void detach() override {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_sessions.empty());
- invariant(_scheduled.empty());
- invariant(_timers.empty());
- }
+ void markKillOnClientDisconnect() noexcept override {
+ if (_opCtx->getClient() && _opCtx->getClient()->session()) {
+ addSessionImpl(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) {
+ if (!s.isOK()) {
+ return;
+ }
- {
- stdx::lock_guard<Client> lk(*_opCtx->getClient());
- invariant(_opCtx->getBaton().get() == this);
- _opCtx->setBaton(nullptr);
+ _opCtx->markKilled(ErrorCodes::ClientDisconnect);
+ });
}
+ }
- _opCtx = nullptr;
+ Future<void> addSession(Session& session, Type type) noexcept override {
+ return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT);
}
- Future<void> addSession(Session& session, Type type) override {
- auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+ Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override {
auto pf = makePromiseFuture<void>();
+ auto id = timer.id();
- _safeExecute([ fd, type, promise = std::move(pf.promise), this ]() mutable {
- _sessions[fd] = TransportSession{type, std::move(promise)};
- });
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
- return std::move(pf.future);
- }
+ if (!_opCtx) {
+ return Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, cannot waitUntil on timer");
+ }
- Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
- auto pf = makePromiseFuture<void>();
- _safeExecute(
- [ timerPtr = &timer, expiration, promise = std::move(pf.promise), this ]() mutable {
- auto pair = _timers.insert({
- timerPtr, expiration, std::move(promise),
- });
- invariant(pair.second);
- _timersById[pair.first->id] = pair.first;
- });
+ _safeExecute(std::move(lk),
+ [ id, expiration, promise = std::move(pf.promise), this ]() mutable {
+ auto iter = _timers.emplace(std::piecewise_construct,
+ std::forward_as_tuple(expiration),
+ std::forward_as_tuple(id, std::move(promise)));
+ _timersById[id] = iter;
+ });
return std::move(pf.future);
}
- bool cancelSession(Session& session) override {
- const auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+ bool cancelSession(Session& session) noexcept override {
+ const auto id = session.id();
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_sessions.find(fd) == _sessions.end()) {
+ if (_sessions.find(id) == _sessions.end()) {
return false;
}
- // TODO: There's an ABA issue here with fds where between previously and before we could
- // have removed the fd, then opened and added a new socket with the same fd. We need to
- // solve it via using session id's for handles.
- _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); });
+ _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); });
return true;
}
- bool cancelTimer(const ReactorTimer& timer) override {
+ bool cancelTimer(const ReactorTimer& timer) noexcept override {
+ const auto id = timer.id();
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_timersById.find(&timer) == _timersById.end()) {
+ if (_timersById.find(id) == _timersById.end()) {
return false;
}
- // TODO: Same ABA issue as above, but for pointers.
- _safeExecute(std::move(lk), [ timerPtr = &timer, this ] {
- auto iter = _timersById.find(timerPtr);
+ _safeExecute(std::move(lk), [id, this] {
+ auto iter = _timersById.find(id);
if (iter != _timersById.end()) {
_timers.erase(iter->second);
@@ -208,18 +204,24 @@ public:
return true;
}
- void schedule(unique_function<void()> func) override {
+ void schedule(unique_function<void(OperationContext*)> func) noexcept override {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_opCtx) {
+ func(nullptr);
+
+ return;
+ }
+
_scheduled.push_back(std::move(func));
if (_inPoll) {
- _efd.notify();
+ efd().notify();
}
}
void notify() noexcept override {
- _efd.notify();
+ efd().notify();
}
/**
@@ -263,7 +265,7 @@ public:
lk.unlock();
for (auto& job : toRun) {
- job();
+ job(_opCtx);
}
lk.lock();
}
@@ -280,21 +282,19 @@ public:
// If we have a timer, poll no longer than that
if (_timers.size()) {
- deadline = _timers.begin()->expiration;
+ deadline = _timers.begin()->first;
}
std::vector<decltype(_sessions)::iterator> sessions;
sessions.reserve(_sessions.size());
std::vector<pollfd> pollSet;
+
pollSet.reserve(_sessions.size() + 1);
- pollSet.push_back(pollfd{_efd.fd, POLLIN, 0});
+ pollSet.push_back(pollfd{efd().fd, POLLIN, 0});
for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) {
- pollSet.push_back(
- pollfd{iter->first,
- static_cast<short>(iter->second.type == Type::In ? POLLIN : POLLOUT),
- 0});
+ pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0});
sessions.push_back(iter);
}
@@ -330,9 +330,9 @@ public:
now = clkSource->now();
// Fire expired timers
- for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) {
- toFulfill.push_back(std::move(iter->promise));
- _timersById.erase(iter->id);
+ for (auto iter = _timers.begin(); iter != _timers.end() && iter->first < now;) {
+ toFulfill.push_back(std::move(iter->second.promise));
+ _timersById.erase(iter->second.id);
iter = _timers.erase(iter);
}
@@ -343,12 +343,13 @@ public:
auto pollIter = pollSet.begin();
if (pollIter->revents) {
- _efd.wait();
+ efd().wait();
remaining--;
}
++pollIter;
+
for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining;
++sessionIter, ++pollIter) {
if (pollIter->revents) {
@@ -366,28 +367,75 @@ public:
}
private:
- struct Timer {
- const ReactorTimer* id;
- Date_t expiration;
- mutable Promise<void> promise; // Needs to be mutable to move from it while in std::set.
+ Future<void> addSessionImpl(Session& session, short type) noexcept {
+ auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+ auto id = session.id();
+ auto pf = makePromiseFuture<void>();
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ if (!_opCtx) {
+ return Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot addSession");
+ }
- struct LessThan {
- bool operator()(const Timer& lhs, const Timer& rhs) const {
- return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id);
+ _safeExecute(std::move(lk),
+ [ id, fd, type, promise = std::move(pf.promise), this ]() mutable {
+ _sessions[id] = TransportSession{fd, type, std::move(promise)};
+ });
+
+ return std::move(pf.future);
+ }
+
+ void detachImpl() noexcept override {
+ decltype(_sessions) sessions;
+ decltype(_scheduled) scheduled;
+ decltype(_timers) timers;
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ {
+ stdx::lock_guard<Client> lk(*_opCtx->getClient());
+ invariant(_opCtx->getBaton().get() == this);
+ _opCtx->setBaton(nullptr);
}
- };
+
+ _opCtx = nullptr;
+
+ using std::swap;
+ swap(_sessions, sessions);
+ swap(_scheduled, scheduled);
+ swap(_timers, timers);
+ }
+
+ for (auto& job : scheduled) {
+ job(nullptr);
+ }
+
+ for (auto& session : sessions) {
+ session.second.promise.setError(Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, cannot wait for socket"));
+ }
+
+ for (auto& pair : timers) {
+ pair.second.promise.setError(Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, completing timer early"));
+ }
+ }
+
+ struct Timer {
+ Timer(size_t id, Promise<void> promise) : id(id), promise(std::move(promise)) {}
+
+ size_t id;
+ Promise<void> promise; // Needs to be mutable to move from it while in std::set.
};
struct TransportSession {
- Type type;
+ int fd;
+ short type;
Promise<void> promise;
};
- template <typename Callback>
- void _safeExecute(Callback&& cb) {
- return _safeExecute(stdx::unique_lock<stdx::mutex>(_mutex), std::forward<Callback>(cb));
- }
-
/**
* Safely executes method on the reactor. If we're in poll, we schedule a task, then write to
* the eventfd. If not, we run inline.
@@ -395,37 +443,44 @@ private:
template <typename Callback>
void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
if (_inPoll) {
- _scheduled.push_back([ cb = std::forward<Callback>(cb), this ]() mutable {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- cb();
- });
+ _scheduled.push_back(
+ [ cb = std::forward<Callback>(cb), this ](OperationContext*) mutable {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ cb();
+ });
- _efd.notify();
+ efd().notify();
} else {
cb();
}
}
+ EventFDHolder& efd() {
+ return EventFDHolder::getForClient(_opCtx->getClient());
+ }
+
stdx::mutex _mutex;
OperationContext* _opCtx;
bool _inPoll = false;
- EventFDHolder _efd;
-
// This map stores the sessions we need to poll on. We unwind it into a pollset for every
// blocking call to run
- stdx::unordered_map<int, TransportSession> _sessions;
+ stdx::unordered_map<SessionId, TransportSession> _sessions;
// The set is used to find the next timer which will fire. The unordered_map looks up the
// timers so we can remove them in O(1)
- std::set<Timer, Timer::LessThan> _timers;
- stdx::unordered_map<const ReactorTimer*, decltype(_timers)::const_iterator> _timersById;
+ std::multimap<Date_t, Timer> _timers;
+ stdx::unordered_map<size_t, decltype(_timers)::const_iterator> _timersById;
// For tasks that come in via schedule. Or that were deferred because we were in poll
- std::vector<unique_function<void()>> _scheduled;
+ std::vector<unique_function<void(OperationContext*)>> _scheduled;
};
+const Client::Decoration<TransportLayerASIO::BatonASIO::EventFDHolder>
+ TransportLayerASIO::BatonASIO::EventFDHolder::getForClient =
+ Client::declareDecoration<TransportLayerASIO::BatonASIO::EventFDHolder>();
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index a5c4b78de82..4e350887446 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -85,7 +85,7 @@ public:
return Message(); // Subclasses can do something different.
}
- Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) override {
+ Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) override {
return Future<Message>::makeReady(sourceMessage());
}
@@ -101,12 +101,11 @@ public:
return Status::OK();
}
- Future<void> asyncSinkMessage(Message message,
- const transport::BatonHandle& handle = nullptr) override {
+ Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) override {
return Future<void>::makeReady(sinkMessage(message));
}
- void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) override {}
+ void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {}
void setTimeout(boost::optional<Milliseconds>) override {}
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 4fa709ed849..28349273362 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -33,6 +33,7 @@
#include <memory>
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/baton.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/rpc/message.h"
#include "mongo/transport/session_id.h"
@@ -46,8 +47,6 @@ namespace transport {
class TransportLayer;
class Session;
-class Baton;
-using BatonHandle = std::shared_ptr<Baton>;
using SessionHandle = std::shared_ptr<Session>;
using ConstSessionHandle = std::shared_ptr<const Session>;
@@ -107,7 +106,7 @@ public:
* Source (receive) a new Message from the remote host for this Session.
*/
virtual StatusWith<Message> sourceMessage() = 0;
- virtual Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) = 0;
+ virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0;
/**
* Sink (send) a Message to the remote host for this Session.
@@ -115,15 +114,14 @@ public:
* Async version will keep the buffer alive until the operation completes.
*/
virtual Status sinkMessage(Message message) = 0;
- virtual Future<void> asyncSinkMessage(Message message,
- const transport::BatonHandle& handle = nullptr) = 0;
+ virtual Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) = 0;
/**
* Cancel any outstanding async operations. There is no way to cancel synchronous calls.
* Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already
* completed.
*/
- virtual void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) = 0;
+ virtual void cancelAsyncOperations(const BatonHandle& handle = nullptr) = 0;
/**
* This should only be used to detect when the remote host has disappeared without
@@ -155,14 +153,14 @@ public:
*
* The 'kPending' tag is only for new sessions; callers should not set it directly.
*/
- virtual void setTags(TagMask tagsToSet);
+ void setTags(TagMask tagsToSet);
/**
* Atomically clears all of the session tags specified in the 'tagsToUnset' bit field. If the
* 'kPending' tag is set, indicating that no tags have yet been specified for the session, this
* function also clears that tag as part of the same atomic operation.
*/
- virtual void unsetTags(TagMask tagsToUnset);
+ void unsetTags(TagMask tagsToUnset);
/**
* Loads the session tags, passes them to 'mutateFunc' and then stores the result of that call
@@ -175,9 +173,9 @@ public:
* of the 'mutateFunc' call. The 'kPending' tag is only for new sessions; callers should never
* try to set it.
*/
- virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc);
+ void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc);
- virtual TagMask getTags() const;
+ TagMask getTags() const;
protected:
Session();
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index b74b29af1d9..49f56357686 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -133,7 +133,7 @@ public:
return sourceMessageImpl().getNoThrow();
}
- Future<Message> asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override {
+ Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override {
ensureAsync();
return sourceMessageImpl(baton);
}
@@ -150,8 +150,7 @@ public:
.getNoThrow();
}
- Future<void> asyncSinkMessage(Message message,
- const transport::BatonHandle& baton = nullptr) override {
+ Future<void> asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) override {
ensureAsync();
return write(asio::buffer(message.buf(), message.size()), baton)
.then([this, message /*keep the buffer alive*/]() {
@@ -161,10 +160,10 @@ public:
});
}
- void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override {
+ void cancelAsyncOperations(const BatonHandle& baton = nullptr) override {
LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote;
- if (baton) {
- baton->cancelSession(*this);
+ if (baton && baton->networking()) {
+ baton->networking()->cancelSession(*this);
} else {
getSocket().cancel();
}
@@ -342,7 +341,7 @@ private:
return _socket;
}
- Future<Message> sourceMessageImpl(const transport::BatonHandle& baton = nullptr) {
+ Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr) {
static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);
auto headerBuffer = SharedBuffer::allocate(kHeaderSize);
@@ -387,8 +386,7 @@ private:
}
template <typename MutableBufferSequence>
- Future<void> read(const MutableBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
return opportunisticRead(*_sslSocket, buffers, baton);
@@ -413,8 +411,7 @@ private:
}
template <typename ConstBufferSequence>
- Future<void> write(const ConstBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
_ranHandshake = true;
if (_sslSocket) {
@@ -439,7 +436,7 @@ private:
template <typename Stream, typename MutableBufferSequence>
Future<void> opportunisticRead(Stream& stream,
const MutableBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ const BatonHandle& baton = nullptr) {
std::error_code ec;
size_t size;
@@ -469,8 +466,9 @@ private:
asyncBuffers += size;
}
- if (baton) {
- return baton->addSession(*this, Baton::Type::In)
+ if (baton && baton->networking()) {
+ return baton->networking()
+ ->addSession(*this, NetworkingBaton::Type::In)
.then([&stream, asyncBuffers, baton, this] {
return opportunisticRead(stream, asyncBuffers, baton);
});
@@ -494,7 +492,7 @@ private:
template <typename ConstBufferSequence>
boost::optional<Future<void>> moreToSend(GenericSocket& socket,
const ConstBufferSequence& buffers,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
return boost::none;
}
@@ -517,7 +515,7 @@ private:
template <typename Stream, typename ConstBufferSequence>
Future<void> opportunisticWrite(Stream& stream,
const ConstBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ const BatonHandle& baton = nullptr) {
std::error_code ec;
std::size_t size;
@@ -552,8 +550,9 @@ private:
return std::move(*more);
}
- if (baton) {
- return baton->addSession(*this, Baton::Type::Out)
+ if (baton && baton->networking()) {
+ return baton->networking()
+ ->addSession(*this, NetworkingBaton::Type::Out)
.then([&stream, asyncBuffers, baton, this] {
return opportunisticWrite(stream, asyncBuffers, baton);
});
diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/transport_layer.cpp
index 442cf78b15c..e2b997add0c 100644
--- a/src/mongo/transport/transport_layer.cpp
+++ b/src/mongo/transport/transport_layer.cpp
@@ -31,11 +31,20 @@
#include "mongo/platform/basic.h"
#include "mongo/base/status.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer.h"
namespace mongo {
namespace transport {
+namespace {
+
+AtomicWord<uint64_t> reactorTimerIdCounter(0);
+
+} // namespace
+
const Status TransportLayer::SessionUnknownStatus =
Status(ErrorCodes::TransportSessionUnknown, "TransportLayer does not own the Session.");
@@ -48,5 +57,11 @@ const Status TransportLayer::TicketSessionUnknownStatus = Status(
const Status TransportLayer::TicketSessionClosedStatus = Status(
ErrorCodes::TransportSessionClosed, "Operation attempted on a closed transport Session.");
+ReactorTimer::ReactorTimer() : _id(reactorTimerIdCounter.addAndFetch(1)) {}
+
+BatonHandle TransportLayer::makeDefaultBaton(OperationContext* opCtx) {
+ return opCtx->getServiceContext()->makeBaton(opCtx);
+}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index ca1453cb3a4..0b736ba600f 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -109,17 +109,21 @@ public:
enum WhichReactor { kIngress, kEgress, kNewReactor };
virtual ReactorHandle getReactor(WhichReactor which) = 0;
- virtual BatonHandle makeBaton(OperationContext* opCtx) {
- return nullptr;
+ virtual BatonHandle makeBaton(OperationContext* opCtx) const {
+ return makeDefaultBaton(opCtx);
}
protected:
TransportLayer() = default;
+
+private:
+ static BatonHandle makeDefaultBaton(OperationContext* opCtx);
};
class ReactorTimer {
public:
- ReactorTimer() = default;
+ ReactorTimer();
+
ReactorTimer(const ReactorTimer&) = delete;
ReactorTimer& operator=(const ReactorTimer&) = delete;
@@ -128,6 +132,10 @@ public:
*/
virtual ~ReactorTimer() = default;
+ size_t id() const {
+ return _id;
+ }
+
/*
* Cancel any outstanding future from waitFor/waitUntil. The future will be filled with an
* ErrorCodes::CallbackCancelled status.
@@ -142,6 +150,9 @@ public:
* Calling this implicitly calls cancel().
*/
virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0;
+
+private:
+ const size_t _id;
};
class Reactor {
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 06604606d0c..6f0c02d17fb 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -56,9 +56,11 @@
#endif
// session_asio.h has some header dependencies that require it to be the last header.
+
#ifdef __linux__
#include "mongo/transport/baton_asio_linux.h"
#endif
+
#include "mongo/transport/session_asio.h"
namespace mongo {
@@ -79,7 +81,7 @@ public:
void cancel(const BatonHandle& baton = nullptr) override {
// If we have a baton try to cancel that.
- if (baton && baton->cancelTimer(*this)) {
+ if (baton && baton->networking() && baton->networking()->cancelTimer(*this)) {
LOG(2) << "Canceled via baton, skipping asio cancel.";
return;
}
@@ -90,8 +92,9 @@ public:
Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override {
- if (baton) {
- return _asyncWait([&] { return baton->waitUntil(*this, expiration); }, baton);
+ if (baton && baton->networking()) {
+ return _asyncWait([&] { return baton->networking()->waitUntil(*this, expiration); },
+ baton);
} else {
return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); });
}
@@ -875,8 +878,8 @@ SSLParams::SSLModes TransportLayerASIO::_sslMode() const {
}
#endif
-BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) {
#ifdef __linux__
+BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) const {
auto baton = std::make_shared<BatonASIO>(opCtx);
{
@@ -885,11 +888,9 @@ BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) {
opCtx->setBaton(baton);
}
- return std::move(baton);
-#else
- return nullptr;
-#endif
+ return baton;
}
+#endif
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index 77815024374..b5619bcf361 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -136,7 +136,9 @@ public:
return _listenerPort;
}
- BatonHandle makeBaton(OperationContext* opCtx) override;
+#ifdef __linux__
+ BatonHandle makeBaton(OperationContext* opCtx) const override;
+#endif
private:
class BatonASIO;
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 60cfb58a41e..29ad6457be6 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -90,7 +90,7 @@ public:
static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer();
- BatonHandle makeBaton(OperationContext* opCtx) override {
+ BatonHandle makeBaton(OperationContext* opCtx) const override {
stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
// TODO: figure out what to do about managers with more than one transport layer.
invariant(_tls.size() == 1);