summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-01-23 13:18:49 -0500
committerJason Carey <jcarey@argv.me>2019-02-05 22:41:49 -0500
commita23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81 (patch)
tree1adc2fdb36e6c8babaab134d53f84de3020c2404 /src/mongo
parent5fd66f15797c45c9bab7b59f9e55e0a2f7ad5cd0 (diff)
downloadmongo-a23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81.tar.gz
SERVER-39146 Refactor Baton
Refactor the baton into regular and networking batons while also cleaning up the basic baton implementation.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/client/async_client.cpp9
-rw-r--r--src/mongo/client/async_client.h9
-rw-r--r--src/mongo/client/remote_command_retry_scheduler_test.cpp2
-rw-r--r--src/mongo/db/SConscript16
-rw-r--r--src/mongo/db/baton.h107
-rw-r--r--src/mongo/db/default_baton.cpp154
-rw-r--r--src/mongo/db/default_baton.h78
-rw-r--r--src/mongo/db/operation_context.cpp20
-rw-r--r--src/mongo/db/operation_context.h10
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test.cpp7
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp7
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp7
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp7
-rw-r--r--src/mongo/db/repl/reporter_test.cpp2
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp7
-rw-r--r--src/mongo/db/repl/task_executor_mock.cpp2
-rw-r--r--src/mongo/db/repl/task_executor_mock.h7
-rw-r--r--src/mongo/db/service_context.cpp14
-rw-r--r--src/mongo/db/service_context.h5
-rw-r--r--src/mongo/executor/network_interface.h15
-rw-r--r--src/mongo/executor/network_interface_mock.cpp9
-rw-r--r--src/mongo/executor/network_interface_mock.h8
-rw-r--r--src/mongo/executor/network_interface_tl.cpp90
-rw-r--r--src/mongo/executor/network_interface_tl.h10
-rw-r--r--src/mongo/executor/task_executor.h2
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp54
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h11
-rw-r--r--src/mongo/s/async_requests_sender.h8
-rw-r--r--src/mongo/s/sharding_task_executor.cpp2
-rw-r--r--src/mongo/s/sharding_task_executor.h7
-rw-r--r--src/mongo/transport/SConscript3
-rw-r--r--src/mongo/transport/baton.h61
-rw-r--r--src/mongo/transport/baton_asio_linux.h219
-rw-r--r--src/mongo/transport/mock_session.h7
-rw-r--r--src/mongo/transport/session.h18
-rw-r--r--src/mongo/transport/session_asio.h35
-rw-r--r--src/mongo/transport/transport_layer.cpp15
-rw-r--r--src/mongo/transport/transport_layer.h17
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp17
-rw-r--r--src/mongo/transport/transport_layer_asio.h4
-rw-r--r--src/mongo/transport/transport_layer_manager.h2
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp2
-rw-r--r--src/mongo/unittest/task_executor_proxy.h45
44 files changed, 785 insertions, 347 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index b0be7a0303d..8d01f077791 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -276,6 +276,7 @@ error_code("ExchangePassthrough", 275) # For exchange execution in aggregation.
error_code("IndexBuildAborted", 276)
error_code("AlarmAlreadyFulfilled", 277)
error_code("UnsatisfiableCommitQuorum", 278)
+error_code("ClientDisconnect", 279)
# Error codes 4000-8999 are reserved.
# Non-sequential error codes (for compatibility only)
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index 520e69606bc..1a2c01ed335 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -193,7 +193,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName,
});
}
-Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) {
+Future<Message> AsyncDBClient::_call(Message request, const BatonHandle& baton) {
auto swm = _compressorManager.compressMessage(request);
if (!swm.isOK()) {
return swm.getStatus();
@@ -219,8 +219,7 @@ Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHand
});
}
-Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request,
- const transport::BatonHandle& baton) {
+Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, const BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
return _call(std::move(requestMsg), baton)
@@ -230,7 +229,7 @@ Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request,
}
Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
- executor::RemoteCommandRequest request, const transport::BatonHandle& baton) {
+ executor::RemoteCommandRequest request, const BatonHandle& baton) {
auto clkSource = _svcCtx->getPreciseClockSource();
auto start = clkSource->now();
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
@@ -246,7 +245,7 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-void AsyncDBClient::cancel(const transport::BatonHandle& baton) {
+void AsyncDBClient::cancel(const BatonHandle& baton) {
_session->cancelAsyncOperations(baton);
}
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index 356f4e0f0ca..e7a40dcc73f 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -62,9 +62,8 @@ public:
Milliseconds timeout);
Future<executor::RemoteCommandResponse> runCommandRequest(
- executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr);
- Future<rpc::UniqueReply> runCommand(OpMsgRequest request,
- const transport::BatonHandle& baton = nullptr);
+ executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr);
+ Future<rpc::UniqueReply> runCommand(OpMsgRequest request, const BatonHandle& baton = nullptr);
Future<void> authenticate(const BSONObj& params);
@@ -73,7 +72,7 @@ public:
Future<void> initWireVersion(const std::string& appName,
executor::NetworkConnectionHook* const hook);
- void cancel(const transport::BatonHandle& baton = nullptr);
+ void cancel(const BatonHandle& baton = nullptr);
bool isStillConnected();
@@ -83,7 +82,7 @@ public:
const HostAndPort& local() const;
private:
- Future<Message> _call(Message request, const transport::BatonHandle& baton = nullptr);
+ Future<Message> _call(Message request, const BatonHandle& baton = nullptr);
BSONObj _buildIsMasterRequest(const std::string& appName,
executor::NetworkConnectionHook* hook);
void _parseIsMasterResponse(BSONObj request,
diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp
index 503a7c4a42e..81ecd179dcc 100644
--- a/src/mongo/client/remote_command_retry_scheduler_test.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp
@@ -94,7 +94,7 @@ public:
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override {
+ const BatonHandle& baton = nullptr) override {
if (scheduleRemoteCommandFailPoint) {
return Status(ErrorCodes::ShutdownInProgress,
"failed to send remote command - shutdown in progress");
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index b135cb559a2..a8fe9910b39 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -527,13 +527,27 @@ env.Library(
'$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/db/multi_key_path_tracker',
'$BUILD_DIR/mongo/db/storage/write_unit_of_work',
- '$BUILD_DIR/mongo/transport/transport_layer_common',
'$BUILD_DIR/mongo/util/clock_sources',
'$BUILD_DIR/mongo/util/concurrency/spin_lock',
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/net/network',
'$BUILD_DIR/mongo/util/periodic_runner',
],
+ LIBDEPS_PRIVATE=[
+ 'default_baton',
+ ],
+)
+
+env.Library(
+ target='default_baton',
+ source=[
+ 'default_baton.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/util/clock_sources',
+ '$BUILD_DIR/mongo/util/concurrency/spin_lock',
+ ],
)
env.CppUnitTest(
diff --git a/src/mongo/db/baton.h b/src/mongo/db/baton.h
new file mode 100644
index 00000000000..6e1d1319d2b
--- /dev/null
+++ b/src/mongo/db/baton.h
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/util/functional.h"
+#include "mongo/util/future.h"
+#include "mongo/util/time_support.h"
+#include "mongo/util/waitable.h"
+
+namespace mongo {
+
+class OperationContext;
+
+namespace transport {
+
+class NetworkingBaton;
+
+} // namespace transport
+
+/**
+ * A Baton is lightweight executor, with parallel forward progress guarantees. Rather than
+ * asynchronously running tasks through one, the baton records the intent of those tasks and defers
+ * waiting and execution to a later call to run().
+ *
+ * Note: This occurs automatically when opCtx waiting on a condition variable.
+ */
+class Baton : public Waitable, public std::enable_shared_from_this<Baton> {
+public:
+ virtual ~Baton() = default;
+
+ /**
+ * Detaches a baton from an associated opCtx.
+ *
+ * This invokes all callbacks currently inside the baton (from the detaching thread) with a void
+ * opCtx. Also, any calls to schedule after this point will immediately invoke their callback
+ * with a null opCtx.
+ */
+ void detach() noexcept {
+ // We make this anchor so that deleting the shared_ptr inside opCtx doesn't remove the last
+ // reference to this type until we return from detach.
+ const auto anchor = shared_from_this();
+
+ detachImpl();
+ }
+
+ /**
+ * Schedules a callback to run on the baton.
+ *
+ * The function will be invoked in one of 3 ways:
+ * 1. With an opCtx inside run() (I.e. opCtx->waitForConditionOrInterrupt)
+ * 2. Without an opCtx inside detach()
+ * 3. Without an opCtx inside schedule() after detach()
+ *
+ * Note that the latter two cases are indistinguishable, so the callback should be safe to run
+ * inline if passed a nullptr. Examples of such work are logging, simple cleanup and
+ * rescheduling the task on another executor.
+ */
+ virtual void schedule(unique_function<void(OperationContext*)> func) noexcept = 0;
+
+ /**
+ * Returns a networking view of the baton, if this baton supports networking functionality
+ */
+ virtual transport::NetworkingBaton* networking() noexcept {
+ return nullptr;
+ }
+
+ /**
+ * Marks the baton to wake up on client socket disconnect
+ */
+ virtual void markKillOnClientDisconnect() noexcept = 0;
+
+private:
+ virtual void detachImpl() noexcept = 0;
+};
+
+using BatonHandle = std::shared_ptr<Baton>;
+
+} // namespace mongo
diff --git a/src/mongo/db/default_baton.cpp b/src/mongo/db/default_baton.cpp
new file mode 100644
index 00000000000..9855957135a
--- /dev/null
+++ b/src/mongo/db/default_baton.cpp
@@ -0,0 +1,154 @@
+
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/default_baton.h"
+
+#include "mongo/db/operation_context.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+DefaultBaton::DefaultBaton(OperationContext* opCtx) : _opCtx(opCtx) {}
+
+DefaultBaton::~DefaultBaton() {
+ invariant(!_opCtx);
+ invariant(_scheduled.empty());
+}
+
+void DefaultBaton::markKillOnClientDisconnect() noexcept {
+ if (_opCtx->getClient() && _opCtx->getClient()->session()) {
+ _hasIngressSocket = true;
+ }
+}
+
+void DefaultBaton::detachImpl() noexcept {
+ decltype(_scheduled) scheduled;
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ {
+ stdx::lock_guard<Client> lk(*_opCtx->getClient());
+ invariant(_opCtx->getBaton().get() == this);
+ _opCtx->setBaton(nullptr);
+ }
+
+ _opCtx = nullptr;
+ _hasIngressSocket = false;
+
+ using std::swap;
+ swap(_scheduled, scheduled);
+ }
+
+ for (auto& job : scheduled) {
+ job(nullptr);
+ }
+}
+
+void DefaultBaton::schedule(unique_function<void(OperationContext*)> func) noexcept {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ if (!_opCtx) {
+ lk.unlock();
+ func(nullptr);
+
+ return;
+ }
+
+ _scheduled.push_back(std::move(func));
+
+ if (_sleeping && !_notified) {
+ _notified = true;
+ _cv.notify_one();
+ }
+}
+
+void DefaultBaton::notify() noexcept {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _notified = true;
+ _cv.notify_one();
+}
+
+Waitable::TimeoutState DefaultBaton::run_until(ClockSource* clkSource,
+ Date_t oldDeadline) noexcept {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks
+ const auto guard = makeGuard([&] {
+ // While we have scheduled work, keep running jobs
+ while (_scheduled.size()) {
+ auto toRun = std::exchange(_scheduled, {});
+
+ lk.unlock();
+ for (auto& job : toRun) {
+ job(_opCtx);
+ }
+ lk.lock();
+ }
+ });
+
+ // If anything was scheduled, run it now.
+ if (_scheduled.size()) {
+ return Waitable::TimeoutState::NoTimeout;
+ }
+
+ auto newDeadline = oldDeadline;
+
+ // If we have an ingress socket, sleep no more than 1 second (so we poll for closure in the
+ // outside opCtx waitForConditionOrInterruptUntil implementation)
+ if (_hasIngressSocket) {
+ newDeadline = std::min(oldDeadline, clkSource->now() + Seconds(1));
+ }
+
+ // we mark sleeping, so that we receive notifications
+ _sleeping = true;
+
+ // while we're not notified
+ auto notified =
+ clkSource->waitForConditionUntil(_cv, lk, newDeadline, [&] { return _notified; });
+
+ _sleeping = false;
+ _notified = false;
+
+ // If we've been notified, or we haven't timed out yet, return as if by spurious wakeup.
+ // Otherwise call it a timeout.
+ return (notified || (clkSource->now() < oldDeadline)) ? Waitable::TimeoutState::NoTimeout
+ : Waitable::TimeoutState::Timeout;
+}
+
+void DefaultBaton::run(ClockSource* clkSource) noexcept {
+ run_until(clkSource, Date_t::max());
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/default_baton.h b/src/mongo/db/default_baton.h
new file mode 100644
index 00000000000..6311f397e95
--- /dev/null
+++ b/src/mongo/db/default_baton.h
@@ -0,0 +1,78 @@
+
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/db/baton.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/functional.h"
+
+namespace mongo {
+
+class OperationContext;
+
+/**
+ * The most basic Baton implementation.
+ */
+class DefaultBaton : public Baton {
+public:
+ explicit DefaultBaton(OperationContext* opCtx);
+
+ ~DefaultBaton();
+
+ void markKillOnClientDisconnect() noexcept override;
+
+ void schedule(unique_function<void(OperationContext*)> func) noexcept override;
+
+ void notify() noexcept override;
+
+ Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t oldDeadline) noexcept override;
+
+ void run(ClockSource* clkSource) noexcept override;
+
+private:
+ void detachImpl() noexcept override;
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _cv;
+ bool _notified = false;
+ bool _sleeping = false;
+
+ OperationContext* _opCtx;
+
+ bool _hasIngressSocket = false;
+
+ std::vector<unique_function<void(OperationContext*)>> _scheduled;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 11ca8b8f641..319b803288c 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -77,6 +77,8 @@ MONGO_FAIL_POINT_DEFINE(maxTimeNeverTimeOut);
// inclusive.
MONGO_FAIL_POINT_DEFINE(checkForInterruptFail);
+const auto kNoWaiterThread = stdx::thread::id();
+
} // namespace
OperationContext::OperationContext(Client* client, unsigned int opId)
@@ -261,6 +263,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
stdx::lock_guard<Client> clientLock(*getClient());
invariant(!_waitMutex);
invariant(!_waitCV);
+ invariant(_waitThread == kNoWaiterThread);
invariant(0 == _numKillers);
// This interrupt check must be done while holding the client lock, so as not to race with a
@@ -271,6 +274,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
}
_waitMutex = m.mutex();
_waitCV = &cv;
+ _waitThread = stdx::this_thread::get_id();
}
// If the maxTimeNeverTimeOut failpoint is set, behave as though the operation's deadline does
@@ -302,6 +306,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
if (0 == _numKillers) {
_waitMutex = nullptr;
_waitCV = nullptr;
+ _waitThread = kNoWaiterThread;
return true;
}
return false;
@@ -328,7 +333,20 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
void OperationContext::markKilled(ErrorCodes::Error killCode) {
invariant(killCode != ErrorCodes::OK);
stdx::unique_lock<stdx::mutex> lkWaitMutex;
- if (_waitMutex) {
+
+ // If we have a _waitMutex, it means this opCtx is currently blocked in
+ // waitForConditionOrInterrupt.
+ //
+ // From there, we also know which thread is actually doing that waiting (it's recorded in
+ // _waitThread). If that thread isn't our thread, it's necessary to do the regular numKillers
+ // song and dance mentioned in waitForConditionOrInterrupt.
+ //
+ // If it is our thread, we know that we're currently inside that call to
+ // waitForConditionOrInterrupt and are being invoked by a callback run from Baton->run. And
+ // that means we don't need to deal with the waitMutex and waitCV (because we're running
+ // callbacks, which means run is returning, which means we'll be checking _killCode in the near
+ // future).
+ if (_waitMutex && stdx::this_thread::get_id() != _waitThread) {
invariant(++_numKillers > 0);
getClient()->unlock();
ON_BLOCK_EXIT([this] {
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 389797c2e1c..f376a98e048 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -184,14 +184,14 @@ public:
/**
* Sets a transport Baton on the operation. This will trigger the Baton on markKilled.
*/
- void setBaton(const transport::BatonHandle& baton) {
+ void setBaton(const BatonHandle& baton) {
_baton = baton;
}
/**
* Retrieves the baton associated with the operation.
*/
- const transport::BatonHandle& getBaton() const {
+ const BatonHandle& getBaton() const {
return _baton;
}
@@ -436,13 +436,17 @@ private:
// A transport Baton associated with the operation. The presence of this object implies that a
// client thread is doing it's own async networking by blocking on it's own thread.
- transport::BatonHandle _baton;
+ BatonHandle _baton;
// If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the
// operation is currently waiting on inside a call to waitForConditionOrInterrupt...().
+ //
+ // _waitThread is the calling thread's thread id.
+ //
// All access guarded by the Client's lock.
stdx::mutex* _waitMutex = nullptr;
stdx::condition_variable* _waitCV = nullptr;
+ stdx::thread::id _waitThread;
// If _waitMutex and _waitCV are non-null, this is the number of threads in a call to markKilled
// actively attempting to kill the operation. If this value is non-zero, the operation is inside
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
index 330f7c1e199..463c990052a 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
@@ -372,10 +372,9 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 466bd80a4ce..a8ca6efb7c7 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -507,10 +507,9 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index af688a3b26f..07f7bc7bfff 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -233,10 +233,9 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 7463f93ab7c..02cdcd1118d 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -772,10 +772,9 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index 7ce5768cf62..97e8e8d707a 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -604,7 +604,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp, FailingToScheduleRemoteCommandTaskShouldMak
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override {
+ const BatonHandle& baton = nullptr) override {
// Any error status other than ShutdownInProgress will cause the reporter to fassert.
return Status(ErrorCodes::ShutdownInProgress,
"failed to send remote command - shutdown in progress");
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index 7d00daff9e7..6084dc12041 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -59,10 +59,9 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp
index 727014a5cc2..6c994dc719d 100644
--- a/src/mongo/db/repl/task_executor_mock.cpp
+++ b/src/mongo/db/repl/task_executor_mock.cpp
@@ -61,7 +61,7 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWor
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (shouldFailScheduleRemoteCommandRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h
index e5e92e0f0e1..ffceae96d23 100644
--- a/src/mongo/db/repl/task_executor_mock.h
+++ b/src/mongo/db/repl/task_executor_mock.h
@@ -49,10 +49,9 @@ public:
StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
// Override to make scheduleWork() fail during testing.
ShouldFailScheduleWorkRequestFn shouldFailScheduleWorkRequest = []() { return false; };
diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp
index f2f72ac6ed0..4b1683edab0 100644
--- a/src/mongo/db/service_context.cpp
+++ b/src/mongo/db/service_context.cpp
@@ -38,6 +38,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/locker_noop.h"
+#include "mongo/db/default_baton.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/storage/recovery_unit_noop.h"
@@ -153,7 +154,6 @@ void onCreate(T* object, const ObserversContainer& observers) {
onCreate(object, observers.cbegin(), observers.cend());
}
-
} // namespace
ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc,
@@ -406,4 +406,16 @@ void ServiceContext::ServiceContextDeleter::operator()(ServiceContext* service)
delete service;
}
+BatonHandle ServiceContext::makeBaton(OperationContext* opCtx) const {
+ auto baton = std::make_shared<DefaultBaton>(opCtx);
+
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ invariant(!opCtx->getBaton());
+ opCtx->setBaton(baton);
+ }
+
+ return baton;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index 31ad873085a..548efe1cd88 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -511,6 +511,11 @@ public:
*/
void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec);
+ /**
+ * Creates a delayed execution baton with basic functionality
+ */
+ BatonHandle makeBaton(OperationContext* opCtx) const;
+
private:
class ClientObserverHolder {
public:
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 62012162bf6..34685392f1e 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -144,12 +144,11 @@ public:
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton = nullptr) = 0;
+ const BatonHandle& baton = nullptr) = 0;
- Future<TaskExecutor::ResponseStatus> startCommand(
- const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request,
- const transport::BatonHandle& baton = nullptr) {
+ Future<TaskExecutor::ResponseStatus> startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ const BatonHandle& baton = nullptr) {
auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>();
auto status = startCommand(
@@ -171,7 +170,7 @@ public:
* completed.
*/
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton = nullptr) = 0;
+ const BatonHandle& baton = nullptr) = 0;
/**
* Sets an alarm, which schedules "action" to run no sooner than "when".
@@ -187,9 +186,7 @@ public:
* Any callbacks invoked from setAlarm must observe onNetworkThread to
* return true. See that method for why.
*/
- virtual Status setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton = nullptr) = 0;
+ virtual Status setAlarm(Date_t when, unique_function<void()> action) = 0;
/**
* Returns true if called from a thread dedicated to networking. I.e. not a
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index f700f6c211e..641f1095530 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -108,7 +108,7 @@ std::string NetworkInterfaceMock::getHostName() {
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
@@ -140,8 +140,7 @@ void NetworkInterfaceMock::setHandshakeReplyForHost(
}
}
-void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle,
- const transport::BatonHandle& baton) {
+void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const BatonHandle& baton) {
invariant(!inShutdown());
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -171,9 +170,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock(
}
}
-Status NetworkInterfaceMock::setAlarm(const Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton) {
+Status NetworkInterfaceMock::setAlarm(const Date_t when, unique_function<void()> action) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 937bed41525..c0f1693a660 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -114,7 +114,7 @@ public:
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton = nullptr) override;
+ const BatonHandle& baton = nullptr) override;
/**
* If the network operation is in the _unscheduled or _processing queues, moves the operation
@@ -123,14 +123,12 @@ public:
* called after the task has already completed, but its callback has not yet been run.
*/
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton = nullptr) override;
+ const BatonHandle& baton = nullptr) override;
/**
* Not implemented.
*/
- Status setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton = nullptr) override;
+ Status setAlarm(Date_t when, unique_function<void()> action) override;
bool onNetworkThread() override;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 512b3fee58b..5ca24991e8c 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -173,7 +173,7 @@ Date_t NetworkInterfaceTL::now() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
@@ -280,14 +280,19 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (baton) {
// If we have a baton, we want to get back to the baton thread immediately after we get a
// connection
- std::move(connFuture).getAsync([
- baton,
- rw = std::move(remainingWork)
- ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
- baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable {
- std::move(rw)(std::move(swConn));
+ std::move(connFuture)
+ .getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ baton->schedule([ rw = std::move(rw),
+ swConn = std::move(swConn) ](OperationContext * opCtx) mutable {
+ if (opCtx) {
+ std::move(rw)(std::move(swConn));
+ } else {
+ std::move(rw)(Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, failing operation"));
+ }
+ });
});
- });
} else {
// otherwise we're happy to run inline
std::move(connFuture)
@@ -306,7 +311,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
CommandState::ConnHandle conn,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
conn->indicateSuccess();
return future;
@@ -416,7 +421,7 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH
}
void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
auto it = _inProgress.find(cbHandle);
if (it == _inProgress.end()) {
@@ -445,19 +450,13 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
}
}
-Status NetworkInterfaceTL::setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton) {
+Status NetworkInterfaceTL::setAlarm(Date_t when, unique_function<void()> action) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
if (when <= now()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
- }
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
return Status::OK();
}
@@ -469,39 +468,34 @@ Status NetworkInterfaceTL::setAlarm(Date_t when,
_inProgressAlarms.insert(alarmTimer);
}
- alarmTimer->waitUntil(when, baton)
- .getAsync(
- [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable {
- auto alarmTimer = weakTimer.lock();
- if (!alarmTimer) {
- return;
- } else {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgressAlarms.erase(alarmTimer);
- }
-
- auto nowVal = now();
- if (nowVal < when) {
- warning() << "Alarm returned early. Expected at: " << when
- << ", fired at: " << nowVal;
- const auto status = setAlarm(when, std::move(action), baton);
- if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(50785, status);
- }
+ alarmTimer->waitUntil(when, nullptr)
+ .getAsync([ this, weakTimer, action = std::move(action), when ](Status status) mutable {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
+ }
- return;
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when
+ << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action));
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
}
- if (status.isOK()) {
- if (baton) {
- baton->schedule(std::move(action));
- } else {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
- }
- } else if (status != ErrorCodes::CallbackCanceled) {
- warning() << "setAlarm() received an error: " << status;
- }
- });
+ return;
+ }
+
+ if (status.isOK()) {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
return Status::OK();
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 0b0c99cd03e..5cd5dd6115b 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -68,13 +68,11 @@ public:
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
- const transport::BatonHandle& baton) override;
+ const BatonHandle& baton) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton) override;
- Status setAlarm(Date_t when,
- unique_function<void()> action,
- const transport::BatonHandle& baton) override;
+ const BatonHandle& baton) override;
+ Status setAlarm(Date_t when, unique_function<void()> action) override;
bool onNetworkThread() override;
@@ -117,7 +115,7 @@ private:
Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
CommandState::ConnHandle conn,
- const transport::BatonHandle& baton);
+ const BatonHandle& baton);
std::string _instanceName;
ServiceContext* _svcCtx;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 1e3051e17b4..37c2669f306 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -239,7 +239,7 @@ public:
virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) = 0;
+ const BatonHandle& baton = nullptr) = 0;
/**
* If the callback referenced by "cbHandle" hasn't already executed, marks it as
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 1af20aa8692..4800de47db1 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -67,14 +67,14 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState
public:
static std::shared_ptr<CallbackState> make(CallbackFn&& cb,
Date_t readyDate,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
return std::make_shared<CallbackState>(std::move(cb), readyDate, baton);
}
/**
* Do not call directly. Use make.
*/
- CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton)
+ CallbackState(CallbackFn&& cb, Date_t theReadyDate, const BatonHandle& baton)
: callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {}
virtual ~CallbackState() = default;
@@ -102,7 +102,7 @@ public:
bool isNetworkOperation = false;
AtomicWord<bool> isFinished{false};
boost::optional<stdx::condition_variable> finishedCondition;
- transport::BatonHandle baton;
+ BatonHandle baton;
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
@@ -350,21 +350,23 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
return cbHandle;
}
lk.unlock();
- _net->setAlarm(when,
- [this, cbHandle] {
- auto cbState =
- checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue()));
- if (cbState->canceled.load()) {
- return;
- }
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (cbState->canceled.load()) {
- return;
- }
- scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
- },
- nullptr)
- .transitional_ignore();
+
+ auto status = _net->setAlarm(when, [this, cbHandle] {
+ auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue()));
+ if (cbState->canceled.load()) {
+ return;
+ }
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (cbState->canceled.load()) {
+ return;
+ }
+ scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
+ });
+
+ if (!status.isOK()) {
+ cancel(cbHandle.getValue());
+ return status;
+ }
return cbHandle;
}
@@ -406,7 +408,7 @@ const auto initialSyncPauseCmds =
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
if (MONGO_FAIL_POINT(initialSyncFuzzerSynchronizationPoint1)) {
// We are only going to pause on these failpoints if the command issued is for the
@@ -537,7 +539,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback
}
ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(
- CallbackFn work, const transport::BatonHandle& baton, Date_t when) {
+ CallbackFn work, const BatonHandle& baton, Date_t when) {
WorkQueue result;
result.emplace_front(CallbackState::make(std::move(work), when, baton));
result.front()->iter = result.begin();
@@ -592,7 +594,17 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
for (const auto& cbState : todo) {
if (cbState->baton) {
- cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ cbState->baton->schedule([this, cbState](OperationContext* opCtx) {
+ if (opCtx) {
+ runCallback(std::move(cbState));
+ return;
+ }
+
+ cbState->canceled.store(1);
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress);
+ });
} else {
const auto status =
_pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 1558eaf7f01..1832a021d2c 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -74,7 +74,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void appendDiagnosticBSON(BSONObjBuilder* b) const;
+ void appendDiagnosticBSON(BSONObjBuilder* b) const override;
Date_t now() override;
StatusWith<EventHandle> makeEvent() override;
void signalEvent(const EventHandle& event) override;
@@ -85,10 +85,9 @@ public:
void waitForEvent(const EventHandle& event) override;
StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) override;
@@ -138,7 +137,7 @@ private:
* called outside of _mutex.
*/
static WorkQueue makeSingletonWorkQueue(CallbackFn work,
- const transport::BatonHandle& baton,
+ const BatonHandle& baton,
Date_t when = {});
/**
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 4646d371db8..a9bdcc43cb0 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -227,15 +227,15 @@ private:
explicit BatonDetacher(OperationContext* opCtx);
~BatonDetacher();
- transport::Baton& operator*() const {
+ Baton& operator*() const {
return *_baton;
}
- transport::Baton* operator->() const noexcept {
+ Baton* operator->() const noexcept {
return _baton.get();
}
- operator transport::BatonHandle() const {
+ operator BatonHandle() const {
return _baton;
}
@@ -244,7 +244,7 @@ private:
}
private:
- transport::BatonHandle _baton;
+ BatonHandle _baton;
};
/**
diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp
index d11de192011..8189b308c33 100644
--- a/src/mongo/s/sharding_task_executor.cpp
+++ b/src/mongo/s/sharding_task_executor.cpp
@@ -115,7 +115,7 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(Da
StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
// schedule the user's callback if there is not opCtx
if (!request.opCtx) {
diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h
index 3a0df5fb57e..4f43a70b5a0 100644
--- a/src/mongo/s/sharding_task_executor.h
+++ b/src/mongo/s/sharding_task_executor.h
@@ -69,10 +69,9 @@ public:
Date_t deadline) override;
StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(
- const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) override;
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 43032ccb1b7..12b2cf44899 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -14,6 +14,9 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/service_context',
+ ],
)
env.Library(
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index 9dbf570ede2..de981ef6284 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -1,6 +1,6 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -32,9 +32,11 @@
#include <memory>
+#include "mongo/db/baton.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
+#include "mongo/util/out_of_line_executor.h"
#include "mongo/util/time_support.h"
#include "mongo/util/waitable.h"
@@ -49,44 +51,17 @@ class Session;
class ReactorTimer;
/**
- * A Baton is basically a networking reactor, with limited functionality and no forward progress
- * guarantees. Rather than asynchronously running tasks through one, the baton records the intent
- * of those tasks and defers waiting and execution to a later call to run();
+ * A NetworkingBaton is basically a networking reactor, with limited functionality and parallel
+ * forward progress guarantees. Rather than asynchronously running tasks through one, the baton
+ * records the intent of those tasks and defers waiting and execution to a later call to run();
*
- * Baton's provide a mechanism to allow consumers of a transport layer to execute IO themselves,
- * rather than having this occur on another thread. This can improve performance by minimizing
- * context switches, as well as improving the readability of stack traces by grounding async
- * execution on top of a regular client call stack.
+ * NetworkingBaton's provide a mechanism to allow consumers of a transport layer to execute IO
+ * themselves, rather than having this occur on another thread. This can improve performance by
+ * minimizing context switches, as well as improving the readability of stack traces by grounding
+ * async execution on top of a regular client call stack.
*/
-class Baton : public Waitable {
+class NetworkingBaton : public Baton {
public:
- virtual ~Baton() = default;
-
- /**
- * Detaches a baton from an associated opCtx.
- */
- virtual void detach() = 0;
-
- /**
- * Executes a callback on the baton via schedule. Returns a future which will execute on the
- * baton runner.
- */
- template <typename Callback>
- Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
- auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
-
- schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
- p.setWith(std::move(cb));
- });
-
- return std::move(pf.future);
- }
-
- /**
- * Executes a callback on the baton.
- */
- virtual void schedule(unique_function<void()> func) = 0;
-
/**
* Adds a session, returning a future which activates on read/write-ability of the session.
*/
@@ -94,29 +69,31 @@ public:
In,
Out,
};
- virtual Future<void> addSession(Session& session, Type type) = 0;
+ virtual Future<void> addSession(Session& session, Type type) noexcept = 0;
/**
* Adds a timer, returning a future which activates after a deadline.
*/
- virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0;
+ virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept = 0;
/**
* Cancels waiting on a session.
*
* Returns true if the session was in the baton to be cancelled.
*/
- virtual bool cancelSession(Session& session) = 0;
+ virtual bool cancelSession(Session& session) noexcept = 0;
/**
* Cancels waiting on a timer
*
* Returns true if the timer was in the baton to be cancelled.
*/
- virtual bool cancelTimer(const ReactorTimer& timer) = 0;
-};
+ virtual bool cancelTimer(const ReactorTimer& timer) noexcept = 0;
-using BatonHandle = std::shared_ptr<Baton>;
+ NetworkingBaton* networking() noexcept final {
+ return this;
+ }
+};
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 63bfe67da3f..55570768ba7 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -30,8 +30,8 @@
#pragma once
+#include <map>
#include <memory>
-#include <set>
#include <vector>
#include <poll.h>
@@ -55,7 +55,7 @@ namespace transport {
*
* We implement our networking reactor on top of poll + eventfd for wakeups
*/
-class TransportLayerASIO::BatonASIO : public Baton {
+class TransportLayerASIO::BatonASIO : public NetworkingBaton {
/**
* We use this internal reactor timer to exit run_until calls (by forcing an early timeout for
* ::poll).
@@ -117,6 +117,8 @@ class TransportLayerASIO::BatonASIO : public Baton {
}
const int fd;
+
+ static const Client::Decoration<EventFDHolder> getForClient;
};
public:
@@ -129,75 +131,69 @@ public:
invariant(_timers.empty());
}
- void detach() override {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_sessions.empty());
- invariant(_scheduled.empty());
- invariant(_timers.empty());
- }
+ void markKillOnClientDisconnect() noexcept override {
+ if (_opCtx->getClient() && _opCtx->getClient()->session()) {
+ addSessionImpl(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) {
+ if (!s.isOK()) {
+ return;
+ }
- {
- stdx::lock_guard<Client> lk(*_opCtx->getClient());
- invariant(_opCtx->getBaton().get() == this);
- _opCtx->setBaton(nullptr);
+ _opCtx->markKilled(ErrorCodes::ClientDisconnect);
+ });
}
+ }
- _opCtx = nullptr;
+ Future<void> addSession(Session& session, Type type) noexcept override {
+ return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT);
}
- Future<void> addSession(Session& session, Type type) override {
- auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+ Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override {
auto pf = makePromiseFuture<void>();
+ auto id = timer.id();
- _safeExecute([ fd, type, promise = std::move(pf.promise), this ]() mutable {
- _sessions[fd] = TransportSession{type, std::move(promise)};
- });
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
- return std::move(pf.future);
- }
+ if (!_opCtx) {
+ return Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, cannot waitUntil on timer");
+ }
- Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
- auto pf = makePromiseFuture<void>();
- _safeExecute(
- [ timerPtr = &timer, expiration, promise = std::move(pf.promise), this ]() mutable {
- auto pair = _timers.insert({
- timerPtr, expiration, std::move(promise),
- });
- invariant(pair.second);
- _timersById[pair.first->id] = pair.first;
- });
+ _safeExecute(std::move(lk),
+ [ id, expiration, promise = std::move(pf.promise), this ]() mutable {
+ auto iter = _timers.emplace(std::piecewise_construct,
+ std::forward_as_tuple(expiration),
+ std::forward_as_tuple(id, std::move(promise)));
+ _timersById[id] = iter;
+ });
return std::move(pf.future);
}
- bool cancelSession(Session& session) override {
- const auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+ bool cancelSession(Session& session) noexcept override {
+ const auto id = session.id();
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_sessions.find(fd) == _sessions.end()) {
+ if (_sessions.find(id) == _sessions.end()) {
return false;
}
- // TODO: There's an ABA issue here with fds where between previously and before we could
- // have removed the fd, then opened and added a new socket with the same fd. We need to
- // solve it via using session id's for handles.
- _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); });
+ _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); });
return true;
}
- bool cancelTimer(const ReactorTimer& timer) override {
+ bool cancelTimer(const ReactorTimer& timer) noexcept override {
+ const auto id = timer.id();
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_timersById.find(&timer) == _timersById.end()) {
+ if (_timersById.find(id) == _timersById.end()) {
return false;
}
- // TODO: Same ABA issue as above, but for pointers.
- _safeExecute(std::move(lk), [ timerPtr = &timer, this ] {
- auto iter = _timersById.find(timerPtr);
+ _safeExecute(std::move(lk), [id, this] {
+ auto iter = _timersById.find(id);
if (iter != _timersById.end()) {
_timers.erase(iter->second);
@@ -208,18 +204,24 @@ public:
return true;
}
- void schedule(unique_function<void()> func) override {
+ void schedule(unique_function<void(OperationContext*)> func) noexcept override {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_opCtx) {
+ func(nullptr);
+
+ return;
+ }
+
_scheduled.push_back(std::move(func));
if (_inPoll) {
- _efd.notify();
+ efd().notify();
}
}
void notify() noexcept override {
- _efd.notify();
+ efd().notify();
}
/**
@@ -263,7 +265,7 @@ public:
lk.unlock();
for (auto& job : toRun) {
- job();
+ job(_opCtx);
}
lk.lock();
}
@@ -280,21 +282,19 @@ public:
// If we have a timer, poll no longer than that
if (_timers.size()) {
- deadline = _timers.begin()->expiration;
+ deadline = _timers.begin()->first;
}
std::vector<decltype(_sessions)::iterator> sessions;
sessions.reserve(_sessions.size());
std::vector<pollfd> pollSet;
+
pollSet.reserve(_sessions.size() + 1);
- pollSet.push_back(pollfd{_efd.fd, POLLIN, 0});
+ pollSet.push_back(pollfd{efd().fd, POLLIN, 0});
for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) {
- pollSet.push_back(
- pollfd{iter->first,
- static_cast<short>(iter->second.type == Type::In ? POLLIN : POLLOUT),
- 0});
+ pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0});
sessions.push_back(iter);
}
@@ -330,9 +330,9 @@ public:
now = clkSource->now();
// Fire expired timers
- for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) {
- toFulfill.push_back(std::move(iter->promise));
- _timersById.erase(iter->id);
+ for (auto iter = _timers.begin(); iter != _timers.end() && iter->first < now;) {
+ toFulfill.push_back(std::move(iter->second.promise));
+ _timersById.erase(iter->second.id);
iter = _timers.erase(iter);
}
@@ -343,12 +343,13 @@ public:
auto pollIter = pollSet.begin();
if (pollIter->revents) {
- _efd.wait();
+ efd().wait();
remaining--;
}
++pollIter;
+
for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining;
++sessionIter, ++pollIter) {
if (pollIter->revents) {
@@ -366,28 +367,75 @@ public:
}
private:
- struct Timer {
- const ReactorTimer* id;
- Date_t expiration;
- mutable Promise<void> promise; // Needs to be mutable to move from it while in std::set.
+ Future<void> addSessionImpl(Session& session, short type) noexcept {
+ auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+ auto id = session.id();
+ auto pf = makePromiseFuture<void>();
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ if (!_opCtx) {
+ return Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot addSession");
+ }
- struct LessThan {
- bool operator()(const Timer& lhs, const Timer& rhs) const {
- return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id);
+ _safeExecute(std::move(lk),
+ [ id, fd, type, promise = std::move(pf.promise), this ]() mutable {
+ _sessions[id] = TransportSession{fd, type, std::move(promise)};
+ });
+
+ return std::move(pf.future);
+ }
+
+ void detachImpl() noexcept override {
+ decltype(_sessions) sessions;
+ decltype(_scheduled) scheduled;
+ decltype(_timers) timers;
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ {
+ stdx::lock_guard<Client> lk(*_opCtx->getClient());
+ invariant(_opCtx->getBaton().get() == this);
+ _opCtx->setBaton(nullptr);
}
- };
+
+ _opCtx = nullptr;
+
+ using std::swap;
+ swap(_sessions, sessions);
+ swap(_scheduled, scheduled);
+ swap(_timers, timers);
+ }
+
+ for (auto& job : scheduled) {
+ job(nullptr);
+ }
+
+ for (auto& session : sessions) {
+ session.second.promise.setError(Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, cannot wait for socket"));
+ }
+
+ for (auto& pair : timers) {
+ pair.second.promise.setError(Status(ErrorCodes::ShutdownInProgress,
+ "baton is detached, completing timer early"));
+ }
+ }
+
+ struct Timer {
+ Timer(size_t id, Promise<void> promise) : id(id), promise(std::move(promise)) {}
+
+ size_t id;
+ Promise<void> promise; // Needs to be mutable to move from it while in std::set.
};
struct TransportSession {
- Type type;
+ int fd;
+ short type;
Promise<void> promise;
};
- template <typename Callback>
- void _safeExecute(Callback&& cb) {
- return _safeExecute(stdx::unique_lock<stdx::mutex>(_mutex), std::forward<Callback>(cb));
- }
-
/**
* Safely executes method on the reactor. If we're in poll, we schedule a task, then write to
* the eventfd. If not, we run inline.
@@ -395,37 +443,44 @@ private:
template <typename Callback>
void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
if (_inPoll) {
- _scheduled.push_back([ cb = std::forward<Callback>(cb), this ]() mutable {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- cb();
- });
+ _scheduled.push_back(
+ [ cb = std::forward<Callback>(cb), this ](OperationContext*) mutable {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ cb();
+ });
- _efd.notify();
+ efd().notify();
} else {
cb();
}
}
+ EventFDHolder& efd() {
+ return EventFDHolder::getForClient(_opCtx->getClient());
+ }
+
stdx::mutex _mutex;
OperationContext* _opCtx;
bool _inPoll = false;
- EventFDHolder _efd;
-
// This map stores the sessions we need to poll on. We unwind it into a pollset for every
// blocking call to run
- stdx::unordered_map<int, TransportSession> _sessions;
+ stdx::unordered_map<SessionId, TransportSession> _sessions;
// The set is used to find the next timer which will fire. The unordered_map looks up the
// timers so we can remove them in O(1)
- std::set<Timer, Timer::LessThan> _timers;
- stdx::unordered_map<const ReactorTimer*, decltype(_timers)::const_iterator> _timersById;
+ std::multimap<Date_t, Timer> _timers;
+ stdx::unordered_map<size_t, decltype(_timers)::const_iterator> _timersById;
// For tasks that come in via schedule. Or that were deferred because we were in poll
- std::vector<unique_function<void()>> _scheduled;
+ std::vector<unique_function<void(OperationContext*)>> _scheduled;
};
+const Client::Decoration<TransportLayerASIO::BatonASIO::EventFDHolder>
+ TransportLayerASIO::BatonASIO::EventFDHolder::getForClient =
+ Client::declareDecoration<TransportLayerASIO::BatonASIO::EventFDHolder>();
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index a5c4b78de82..4e350887446 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -85,7 +85,7 @@ public:
return Message(); // Subclasses can do something different.
}
- Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) override {
+ Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) override {
return Future<Message>::makeReady(sourceMessage());
}
@@ -101,12 +101,11 @@ public:
return Status::OK();
}
- Future<void> asyncSinkMessage(Message message,
- const transport::BatonHandle& handle = nullptr) override {
+ Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) override {
return Future<void>::makeReady(sinkMessage(message));
}
- void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) override {}
+ void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {}
void setTimeout(boost::optional<Milliseconds>) override {}
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 4fa709ed849..28349273362 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -33,6 +33,7 @@
#include <memory>
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/baton.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/rpc/message.h"
#include "mongo/transport/session_id.h"
@@ -46,8 +47,6 @@ namespace transport {
class TransportLayer;
class Session;
-class Baton;
-using BatonHandle = std::shared_ptr<Baton>;
using SessionHandle = std::shared_ptr<Session>;
using ConstSessionHandle = std::shared_ptr<const Session>;
@@ -107,7 +106,7 @@ public:
* Source (receive) a new Message from the remote host for this Session.
*/
virtual StatusWith<Message> sourceMessage() = 0;
- virtual Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) = 0;
+ virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0;
/**
* Sink (send) a Message to the remote host for this Session.
@@ -115,15 +114,14 @@ public:
* Async version will keep the buffer alive until the operation completes.
*/
virtual Status sinkMessage(Message message) = 0;
- virtual Future<void> asyncSinkMessage(Message message,
- const transport::BatonHandle& handle = nullptr) = 0;
+ virtual Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) = 0;
/**
* Cancel any outstanding async operations. There is no way to cancel synchronous calls.
* Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already
* completed.
*/
- virtual void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) = 0;
+ virtual void cancelAsyncOperations(const BatonHandle& handle = nullptr) = 0;
/**
* This should only be used to detect when the remote host has disappeared without
@@ -155,14 +153,14 @@ public:
*
* The 'kPending' tag is only for new sessions; callers should not set it directly.
*/
- virtual void setTags(TagMask tagsToSet);
+ void setTags(TagMask tagsToSet);
/**
* Atomically clears all of the session tags specified in the 'tagsToUnset' bit field. If the
* 'kPending' tag is set, indicating that no tags have yet been specified for the session, this
* function also clears that tag as part of the same atomic operation.
*/
- virtual void unsetTags(TagMask tagsToUnset);
+ void unsetTags(TagMask tagsToUnset);
/**
* Loads the session tags, passes them to 'mutateFunc' and then stores the result of that call
@@ -175,9 +173,9 @@ public:
* of the 'mutateFunc' call. The 'kPending' tag is only for new sessions; callers should never
* try to set it.
*/
- virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc);
+ void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc);
- virtual TagMask getTags() const;
+ TagMask getTags() const;
protected:
Session();
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index b74b29af1d9..49f56357686 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -133,7 +133,7 @@ public:
return sourceMessageImpl().getNoThrow();
}
- Future<Message> asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override {
+ Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override {
ensureAsync();
return sourceMessageImpl(baton);
}
@@ -150,8 +150,7 @@ public:
.getNoThrow();
}
- Future<void> asyncSinkMessage(Message message,
- const transport::BatonHandle& baton = nullptr) override {
+ Future<void> asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) override {
ensureAsync();
return write(asio::buffer(message.buf(), message.size()), baton)
.then([this, message /*keep the buffer alive*/]() {
@@ -161,10 +160,10 @@ public:
});
}
- void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override {
+ void cancelAsyncOperations(const BatonHandle& baton = nullptr) override {
LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote;
- if (baton) {
- baton->cancelSession(*this);
+ if (baton && baton->networking()) {
+ baton->networking()->cancelSession(*this);
} else {
getSocket().cancel();
}
@@ -342,7 +341,7 @@ private:
return _socket;
}
- Future<Message> sourceMessageImpl(const transport::BatonHandle& baton = nullptr) {
+ Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr) {
static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);
auto headerBuffer = SharedBuffer::allocate(kHeaderSize);
@@ -387,8 +386,7 @@ private:
}
template <typename MutableBufferSequence>
- Future<void> read(const MutableBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
return opportunisticRead(*_sslSocket, buffers, baton);
@@ -413,8 +411,7 @@ private:
}
template <typename ConstBufferSequence>
- Future<void> write(const ConstBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
_ranHandshake = true;
if (_sslSocket) {
@@ -439,7 +436,7 @@ private:
template <typename Stream, typename MutableBufferSequence>
Future<void> opportunisticRead(Stream& stream,
const MutableBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ const BatonHandle& baton = nullptr) {
std::error_code ec;
size_t size;
@@ -469,8 +466,9 @@ private:
asyncBuffers += size;
}
- if (baton) {
- return baton->addSession(*this, Baton::Type::In)
+ if (baton && baton->networking()) {
+ return baton->networking()
+ ->addSession(*this, NetworkingBaton::Type::In)
.then([&stream, asyncBuffers, baton, this] {
return opportunisticRead(stream, asyncBuffers, baton);
});
@@ -494,7 +492,7 @@ private:
template <typename ConstBufferSequence>
boost::optional<Future<void>> moreToSend(GenericSocket& socket,
const ConstBufferSequence& buffers,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
return boost::none;
}
@@ -517,7 +515,7 @@ private:
template <typename Stream, typename ConstBufferSequence>
Future<void> opportunisticWrite(Stream& stream,
const ConstBufferSequence& buffers,
- const transport::BatonHandle& baton = nullptr) {
+ const BatonHandle& baton = nullptr) {
std::error_code ec;
std::size_t size;
@@ -552,8 +550,9 @@ private:
return std::move(*more);
}
- if (baton) {
- return baton->addSession(*this, Baton::Type::Out)
+ if (baton && baton->networking()) {
+ return baton->networking()
+ ->addSession(*this, NetworkingBaton::Type::Out)
.then([&stream, asyncBuffers, baton, this] {
return opportunisticWrite(stream, asyncBuffers, baton);
});
diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/transport_layer.cpp
index 442cf78b15c..e2b997add0c 100644
--- a/src/mongo/transport/transport_layer.cpp
+++ b/src/mongo/transport/transport_layer.cpp
@@ -31,11 +31,20 @@
#include "mongo/platform/basic.h"
#include "mongo/base/status.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer.h"
namespace mongo {
namespace transport {
+namespace {
+
+AtomicWord<uint64_t> reactorTimerIdCounter(0);
+
+} // namespace
+
const Status TransportLayer::SessionUnknownStatus =
Status(ErrorCodes::TransportSessionUnknown, "TransportLayer does not own the Session.");
@@ -48,5 +57,11 @@ const Status TransportLayer::TicketSessionUnknownStatus = Status(
const Status TransportLayer::TicketSessionClosedStatus = Status(
ErrorCodes::TransportSessionClosed, "Operation attempted on a closed transport Session.");
+ReactorTimer::ReactorTimer() : _id(reactorTimerIdCounter.addAndFetch(1)) {}
+
+BatonHandle TransportLayer::makeDefaultBaton(OperationContext* opCtx) {
+ return opCtx->getServiceContext()->makeBaton(opCtx);
+}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index ca1453cb3a4..0b736ba600f 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -109,17 +109,21 @@ public:
enum WhichReactor { kIngress, kEgress, kNewReactor };
virtual ReactorHandle getReactor(WhichReactor which) = 0;
- virtual BatonHandle makeBaton(OperationContext* opCtx) {
- return nullptr;
+ virtual BatonHandle makeBaton(OperationContext* opCtx) const {
+ return makeDefaultBaton(opCtx);
}
protected:
TransportLayer() = default;
+
+private:
+ static BatonHandle makeDefaultBaton(OperationContext* opCtx);
};
class ReactorTimer {
public:
- ReactorTimer() = default;
+ ReactorTimer();
+
ReactorTimer(const ReactorTimer&) = delete;
ReactorTimer& operator=(const ReactorTimer&) = delete;
@@ -128,6 +132,10 @@ public:
*/
virtual ~ReactorTimer() = default;
+ size_t id() const {
+ return _id;
+ }
+
/*
* Cancel any outstanding future from waitFor/waitUntil. The future will be filled with an
* ErrorCodes::CallbackCancelled status.
@@ -142,6 +150,9 @@ public:
* Calling this implicitly calls cancel().
*/
virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0;
+
+private:
+ const size_t _id;
};
class Reactor {
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 06604606d0c..6f0c02d17fb 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -56,9 +56,11 @@
#endif
// session_asio.h has some header dependencies that require it to be the last header.
+
#ifdef __linux__
#include "mongo/transport/baton_asio_linux.h"
#endif
+
#include "mongo/transport/session_asio.h"
namespace mongo {
@@ -79,7 +81,7 @@ public:
void cancel(const BatonHandle& baton = nullptr) override {
// If we have a baton try to cancel that.
- if (baton && baton->cancelTimer(*this)) {
+ if (baton && baton->networking() && baton->networking()->cancelTimer(*this)) {
LOG(2) << "Canceled via baton, skipping asio cancel.";
return;
}
@@ -90,8 +92,9 @@ public:
Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override {
- if (baton) {
- return _asyncWait([&] { return baton->waitUntil(*this, expiration); }, baton);
+ if (baton && baton->networking()) {
+ return _asyncWait([&] { return baton->networking()->waitUntil(*this, expiration); },
+ baton);
} else {
return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); });
}
@@ -875,8 +878,8 @@ SSLParams::SSLModes TransportLayerASIO::_sslMode() const {
}
#endif
-BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) {
#ifdef __linux__
+BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) const {
auto baton = std::make_shared<BatonASIO>(opCtx);
{
@@ -885,11 +888,9 @@ BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) {
opCtx->setBaton(baton);
}
- return std::move(baton);
-#else
- return nullptr;
-#endif
+ return baton;
}
+#endif
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index 77815024374..b5619bcf361 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -136,7 +136,9 @@ public:
return _listenerPort;
}
- BatonHandle makeBaton(OperationContext* opCtx) override;
+#ifdef __linux__
+ BatonHandle makeBaton(OperationContext* opCtx) const override;
+#endif
private:
class BatonASIO;
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 60cfb58a41e..29ad6457be6 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -90,7 +90,7 @@ public:
static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer();
- BatonHandle makeBaton(OperationContext* opCtx) override {
+ BatonHandle makeBaton(OperationContext* opCtx) const override {
stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
// TODO: figure out what to do about managers with more than one transport layer.
invariant(_tls.size() == 1);
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
index 2cf2886afa0..1e66347da96 100644
--- a/src/mongo/unittest/task_executor_proxy.cpp
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -103,7 +103,7 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWo
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton) {
+ const BatonHandle& baton) {
return _executor->scheduleRemoteCommand(request, cb, baton);
}
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
index e03e2a6955e..26d83b2bece 100644
--- a/src/mongo/unittest/task_executor_proxy.h
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -47,33 +47,32 @@ public:
* Does not own target executor.
*/
TaskExecutorProxy(executor::TaskExecutor* executor);
- virtual ~TaskExecutorProxy();
+ ~TaskExecutorProxy();
executor::TaskExecutor* getExecutor() const;
void setExecutor(executor::TaskExecutor* executor);
- virtual void startup() override;
- virtual void shutdown() override;
- virtual void join() override;
- virtual void appendDiagnosticBSON(BSONObjBuilder* builder) const override;
- virtual Date_t now() override;
- virtual StatusWith<EventHandle> makeEvent() override;
- virtual void signalEvent(const EventHandle& event) override;
- virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override;
- virtual void waitForEvent(const EventHandle& event) override;
- virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
- const EventHandle& event,
- Date_t deadline) override;
- virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
- virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
- virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const transport::BatonHandle& baton = nullptr) override;
- virtual void cancel(const CallbackHandle& cbHandle) override;
- virtual void wait(const CallbackHandle& cbHandle,
- Interruptible* interruptible = Interruptible::notInterruptible()) override;
- virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
+ void startup() override;
+ void shutdown() override;
+ void join() override;
+ void appendDiagnosticBSON(BSONObjBuilder* builder) const override;
+ Date_t now() override;
+ StatusWith<EventHandle> makeEvent() override;
+ void signalEvent(const EventHandle& event) override;
+ StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override;
+ void waitForEvent(const EventHandle& event) override;
+ StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline) override;
+ StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
+ void cancel(const CallbackHandle& cbHandle) override;
+ void wait(const CallbackHandle& cbHandle,
+ Interruptible* interruptible = Interruptible::notInterruptible()) override;
+ void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
private:
// Not owned by us.