summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2017-06-27 14:04:53 -0400
committerJonathan Reams <jbreams@mongodb.com>2017-07-14 16:19:40 -0400
commit1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69 (patch)
tree4cad8d64f1ace9bc13aea786b460872b1ce466c3
parente0b06a9da3c0c6071f4e636f3c3ba3e8851c5db0 (diff)
downloadmongo-1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69.tar.gz
SERVER-29402 Implement ServiceExecutor and fixed-size test executor
-rw-r--r--buildscripts/resmokelib/config.py4
-rw-r--r--buildscripts/resmokelib/core/programs.py2
-rw-r--r--buildscripts/resmokelib/parser.py5
-rw-r--r--etc/evergreen.yml39
-rw-r--r--jstests/noPassthrough/command_line_parsing.js2
-rw-r--r--src/mongo/client/scoped_db_conn_test.cpp6
-rw-r--r--src/mongo/db/commands/SConscript5
-rw-r--r--src/mongo/db/commands/feature_compatibility_version.cpp4
-rw-r--r--src/mongo/db/commands/server_status.cpp4
-rw-r--r--src/mongo/db/db.cpp22
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/server_options.h12
-rw-r--r--src/mongo/db/server_options_helpers.cpp23
-rw-r--r--src/mongo/db/service_context.cpp9
-rw-r--r--src/mongo/db/service_context.h19
-rw-r--r--src/mongo/s/server.cpp8
-rw-r--r--src/mongo/shell/servers.js9
-rw-r--r--src/mongo/shell/utils.js1
-rw-r--r--src/mongo/transport/SConscript22
-rw-r--r--src/mongo/transport/service_entry_point.h5
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp74
-rw-r--r--src/mongo/transport/service_entry_point_impl.h17
-rw-r--r--src/mongo/transport/service_entry_point_mock.cpp20
-rw-r--r--src/mongo/transport/service_entry_point_mock.h2
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.cpp4
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.h3
-rw-r--r--src/mongo/transport/service_executor.h77
-rw-r--r--src/mongo/transport/service_executor_base.cpp104
-rw-r--r--src/mongo/transport/service_executor_base.h77
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp102
-rw-r--r--src/mongo/transport/service_executor_fixed.h61
-rw-r--r--src/mongo/transport/service_state_machine.cpp308
-rw-r--r--src/mongo/transport/service_state_machine.h114
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp9
-rw-r--r--src/mongo/transport/session_asio.h41
-rw-r--r--src/mongo/transport/transport_layer.h10
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp85
-rw-r--r--src/mongo/transport/transport_layer_asio.h12
-rw-r--r--src/mongo/transport/transport_layer_legacy.cpp66
-rw-r--r--src/mongo/transport/transport_layer_legacy.h21
-rw-r--r--src/mongo/transport/transport_layer_legacy_test.cpp15
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp17
-rw-r--r--src/mongo/transport/transport_layer_manager.h1
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp9
-rw-r--r--src/mongo/transport/transport_layer_mock.h1
-rw-r--r--src/mongo/transport/transport_layer_mock_test.cpp7
46 files changed, 1079 insertions, 383 deletions
diff --git a/buildscripts/resmokelib/config.py b/buildscripts/resmokelib/config.py
index 7e81662ba68..77c88cd9e56 100644
--- a/buildscripts/resmokelib/config.py
+++ b/buildscripts/resmokelib/config.py
@@ -53,6 +53,7 @@ DEFAULTS = {
"reportFailureStatus": "fail",
"reportFile": None,
"seed": long(time.time() * 256), # Taken from random.py code in Python 2.7.
+ "serviceExecutor": None,
"shellReadMode": None,
"shellWriteMode": None,
"shuffle": None,
@@ -141,6 +142,9 @@ REPORT_FAILURE_STATUS = None
# If set, then resmoke.py will write out a report file with the status of each test that ran.
REPORT_FILE = None
+# IF set, then mongod/mongos's started by resmoke.py will use the specified service executor
+SERVICE_EXECUTOR = None
+
# If set, then mongo shells started by resmoke.py will use the specified read mode.
SHELL_READ_MODE = None
diff --git a/buildscripts/resmokelib/core/programs.py b/buildscripts/resmokelib/core/programs.py
index 55508ad3fd0..984e34f036e 100644
--- a/buildscripts/resmokelib/core/programs.py
+++ b/buildscripts/resmokelib/core/programs.py
@@ -48,6 +48,7 @@ def mongod_program(logger, executable=None, process_kwargs=None, **kwargs):
shortcut_opts = {
"nojournal": config.NO_JOURNAL,
"nopreallocj": config.NO_PREALLOC_JOURNAL,
+ "serviceExecutor": config.SERVICE_EXECUTOR,
"storageEngine": config.STORAGE_ENGINE,
"wiredTigerCollectionConfigString": config.WT_COLL_CONFIG,
"wiredTigerEngineConfigString": config.WT_ENGINE_CONFIG,
@@ -141,6 +142,7 @@ def mongo_shell_program(logger, executable=None, filename=None, process_kwargs=N
shortcut_opts = {
"noJournal": (config.NO_JOURNAL, False),
"noJournalPrealloc": (config.NO_PREALLOC_JOURNAL, False),
+ "serviceExecutor": (config.SERVICE_EXECUTOR, ""),
"storageEngine": (config.STORAGE_ENGINE, ""),
"storageEngineCacheSizeGB": (config.STORAGE_ENGINE_CACHE_SIZE, ""),
"testName": (os.path.splitext(os.path.basename(filename))[0], ""),
diff --git a/buildscripts/resmokelib/parser.py b/buildscripts/resmokelib/parser.py
index 3845ed92fe0..5071d2c2be9 100644
--- a/buildscripts/resmokelib/parser.py
+++ b/buildscripts/resmokelib/parser.py
@@ -39,6 +39,7 @@ DEST_TO_CONFIG = {
"report_failure_status": "reportFailureStatus",
"report_file": "reportFile",
"seed": "seed",
+ "service_executor": "serviceExecutor",
"shell_read_mode": "shellReadMode",
"shell_write_mode": "shellWriteMode",
"shuffle": "shuffle",
@@ -176,6 +177,9 @@ def parse_command_line():
help=("Seed for the random number generator. Useful in combination with the"
" --shuffle option for producing a consistent test execution order."))
+ parser.add_option("--serviceExecutor", dest="service_executor", metavar="EXECUTOR",
+ help="The service executor used by jstests")
+
parser.add_option("--shellReadMode", type="choice", action="store", dest="shell_read_mode",
choices=("commands", "compatibility", "legacy"), metavar="READ_MODE",
help="The read mode used by the mongo shell.")
@@ -276,6 +280,7 @@ def update_config_vars(values):
_config.REPEAT = config.pop("repeat")
_config.REPORT_FAILURE_STATUS = config.pop("reportFailureStatus")
_config.REPORT_FILE = config.pop("reportFile")
+ _config.SERVICE_EXECUTOR = config.pop("serviceExecutor")
_config.SHELL_READ_MODE = config.pop("shellReadMode")
_config.SHELL_WRITE_MODE = config.pop("shellWriteMode")
_config.STAGGER_JOBS = config.pop("staggerJobs") == "on"
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 95a2e46b552..0f1725c4a79 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -2344,6 +2344,15 @@ tasks:
run_multiple_jobs: true
- <<: *task_template
+ name: jsCore_async
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=core --storageEngine=wiredTiger --serviceExecutor=fixedForTesting
+ run_multiple_jobs: true
+
+- <<: *task_template
name: jsCore_inMem
commands:
- func: "do setup"
@@ -2820,6 +2829,16 @@ tasks:
resmoke_args: --suites=parallel --storageEngine=wiredTiger
- <<: *task_template
+ name: parallel_async
+ depends_on:
+ - name: jsCore_async
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=parallel --storageEngine=wiredTiger --serviceExecutor=fixedForTesting
+
+- <<: *task_template
name: parallel_compatibility
depends_on:
- name: jsCore_compatibility
@@ -8972,6 +8991,26 @@ buildvariants:
- name: jsCore
- name: jsCore_WT
+- name: ubuntu1604-debug-aubsan-async
+ display_name: "~ {A,UB}SAN Enterprise SSL Ubuntu 16.04 async"
+ modules:
+ - enterprise
+ run_on:
+ - ubuntu1604-build
+ stepback: true
+ expansions:
+ tooltags: "-tags 'ssl'"
+ # We need llvm-symbolizer in the PATH for ASAN for clang-3.7 or later.
+ variant_path_suffix: /opt/mongodbtoolchain/v2/bin
+ san_options: UBSAN_OPTIONS="print_stacktrace=1" LSAN_OPTIONS="suppressions=etc/lsan.suppressions:report_objects=1" ASAN_OPTIONS=detect_leaks=1:check_initialization_order=true:strict_init_order=true:abort_on_error=1:disable_coredump=0
+ compile_flags: --variables-files=etc/scons/mongodbtoolchain_clang.vars --dbg=on --opt=on --allocator=system --sanitize=undefined,address --ssl -j$(grep -c ^processor /proc/cpuinfo) --nostrip
+ num_jobs_available: $(($(grep -c ^processor /proc/cpuinfo) / 3)) # Avoid starting too many mongod's under {A,UB}SAN build.
+ build_mongoreplay: false
+ tasks:
+ - name: compile
+ - name: jsCore_async
+ - name: parallel_async
+
- name: enterprise-ubuntu-dynamic-1604-64-bit
display_name: "* Shared Library Enterprise Ubuntu 16.04"
modules:
diff --git a/jstests/noPassthrough/command_line_parsing.js b/jstests/noPassthrough/command_line_parsing.js
index cda7e8ab36e..19f9d143476 100644
--- a/jstests/noPassthrough/command_line_parsing.js
+++ b/jstests/noPassthrough/command_line_parsing.js
@@ -25,6 +25,7 @@ var m2expected = {
var m2result = m2.getDB("admin").runCommand("getCmdLineOpts");
// remove variables that depend on the way the test is started.
+delete m2result.parsed.net.serviceExecutor;
delete m2result.parsed.storage.mmapv1;
delete m2result.parsed.setParameter;
delete m2result.parsed.storage.engine;
@@ -49,6 +50,7 @@ var m3expected = {
var m3result = m3.getDB("admin").runCommand("getCmdLineOpts");
// remove variables that depend on the way the test is started.
+delete m3result.parsed.net.serviceExecutor;
delete m3result.parsed.storage.mmapv1;
delete m3result.parsed.setParameter;
delete m3result.parsed.storage.engine;
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp
index 1a9d7581424..c26428c7539 100644
--- a/src/mongo/client/scoped_db_conn_test.cpp
+++ b/src/mongo/client/scoped_db_conn_test.cpp
@@ -90,6 +90,12 @@ public:
_threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session));
}
+ // This is not used in this test, so it is only here to complete the interface of
+ // ServiceEntryPoint
+ void endAllSessions(transport::Session::TagMask tags) override {
+ MONGO_UNREACHABLE;
+ }
+
void setReplyDelay(Milliseconds delay) {
_replyDelay = delay;
}
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index ed571c55ca3..9f16eda1e46 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -31,7 +31,10 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands',
- ]
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/transport/service_executor',
+ ],
)
env.Library(
diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp
index 9184c309049..b8a54bead96 100644
--- a/src/mongo/db/commands/feature_compatibility_version.cpp
+++ b/src/mongo/db/commands/feature_compatibility_version.cpp
@@ -41,7 +41,7 @@
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/service_entry_point.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -185,7 +185,7 @@ void FeatureCompatibilityVersion::set(OperationContext* opCtx, StringData versio
// Close all internal connections to versions lower than 3.6.
if (version == FeatureCompatibilityVersionCommandParser::kVersion36) {
- opCtx->getServiceContext()->getTransportLayer()->endAllSessions(
+ opCtx->getServiceContext()->getServiceEntryPoint()->endAllSessions(
transport::Session::kLatestVersionInternalClientKeepOpen |
transport::Session::kExternalClientKeepOpen);
}
diff --git a/src/mongo/db/commands/server_status.cpp b/src/mongo/db/commands/server_status.cpp
index 1d559708d73..4464df34a33 100644
--- a/src/mongo/db/commands/server_status.cpp
+++ b/src/mongo/db/commands/server_status.cpp
@@ -295,6 +295,10 @@ public:
BSONObjBuilder b;
networkCounter.append(b);
appendMessageCompressionStats(&b);
+ auto executor = opCtx->getServiceContext()->getServiceExecutor();
+ if (executor)
+ executor->appendStats(&b);
+
return b.obj();
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 569958ccbad..7ead195653e 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -737,6 +737,14 @@ ExitCode _initAndListen(int listenPort) {
return EXIT_NET_ERROR;
}
+ if (globalServiceContext->getServiceExecutor()) {
+ start = globalServiceContext->getServiceExecutor()->start();
+ if (!start.isOK()) {
+ error() << "Failed to start the service executor: " << start;
+ return EXIT_NET_ERROR;
+ }
+ }
+
globalServiceContext->notifyStartupComplete();
#ifndef _WIN32
mongo::signalForkSuccess();
@@ -997,18 +1005,22 @@ static void shutdownTask() {
log(LogComponent::kNetwork)
<< "shutdown: going to close all sockets because ASAN is active...";
+
+ // Shutdown the TransportLayer so that new connections aren't accepted
tl->shutdown();
+ // Request that all sessions end.
+ sep->endAllSessions(transport::Session::kEmptyTagMask);
+
// Close all sockets in a detached thread, and then wait for the number of active
// connections to reach zero. Give the detached background thread a 10 second deadline. If
// we haven't closed drained all active operations within that deadline, just keep going
// with shutdown: the OS will do it for us when the process terminates.
-
stdx::packaged_task<void()> dryOutTask([sep] {
// There isn't currently a way to wait on the TicketHolder to have all its tickets back,
// unfortunately. So, busy wait in this detached thread.
while (true) {
- const auto runningWorkers = sep->getNumberOfActiveWorkerThreads();
+ const auto runningWorkers = sep->getNumberOfConnections();
if (runningWorkers == 0) {
log(LogComponent::kNetwork) << "shutdown: no running workers found...";
@@ -1027,6 +1039,12 @@ static void shutdownTask() {
log(LogComponent::kNetwork) << "shutdown: exhausted grace period for"
<< " active workers to drain; continuing with shutdown... ";
}
+
+ // Shutdown and wait for the service executor to exit
+ auto svcExec = serviceContext->getServiceExecutor();
+ if (svcExec) {
+ fassertStatusOK(40550, svcExec->shutdown());
+ }
}
#endif
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index e14878e88f4..bff3aa40f5e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -90,8 +90,8 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/session.h"
-#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
@@ -698,7 +698,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
}
void ReplicationCoordinatorExternalStateImpl::closeConnections() {
- _service->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen);
+ _service->getServiceEntryPoint()->endAllSessions(transport::Session::kKeepOpen);
}
void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* opCtx) {
diff --git a/src/mongo/db/server_options.h b/src/mongo/db/server_options.h
index 502785e1982..c8775b6a56e 100644
--- a/src/mongo/db/server_options.h
+++ b/src/mongo/db/server_options.h
@@ -73,10 +73,14 @@ struct ServerGlobalParams {
int defaultLocalThresholdMillis = 15; // --localThreshold in ms to consider a node local
bool moveParanoia = false; // for move chunk paranoia
- bool noUnixSocket = false; // --nounixsocket
- bool doFork = false; // --fork
- std::string socket = "/tmp"; // UNIX domain socket directory
- std::string transportLayer; // --transportLayer (must be either "asio" or "legacy")
+ bool noUnixSocket = false; // --nounixsocket
+ bool doFork = false; // --fork
+ std::string socket = "/tmp"; // UNIX domain socket directory
+ std::string transportLayer; // --transportLayer (must be either "asio" or "legacy")
+
+ // --serviceExecutor ("adaptive", "synchronous", or "fixedForTesting")
+ std::string serviceExecutor;
+
int maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections.
int unixSocketPermissions = DEFAULT_UNIX_PERMS; // permissions for the UNIX domain socket
diff --git a/src/mongo/db/server_options_helpers.cpp b/src/mongo/db/server_options_helpers.cpp
index 3d968ebabd1..52bf5e2022c 100644
--- a/src/mongo/db/server_options_helpers.cpp
+++ b/src/mongo/db/server_options_helpers.cpp
@@ -211,6 +211,14 @@ Status addGeneralServerOptions(moe::OptionSection* options) {
.setDefault(moe::Value("asio"));
options
+ ->addOptionChaining("net.serviceExecutor",
+ "serviceExecutor",
+ moe::String,
+ "sets the service executor implementation")
+ .hidden()
+ .setDefault(moe::Value("synchronous"));
+
+ options
->addOptionChaining(
"logpath",
"logpath",
@@ -808,6 +816,21 @@ Status storeServerOptions(const moe::Environment& params) {
}
}
+ if (params.count("net.serviceExecutor")) {
+ if (serverGlobalParams.transportLayer == "legacy") {
+ return {ErrorCodes::BadValue,
+ "Cannot specify a serviceExecutor with the legacy transportLayer"};
+ }
+ const auto valid = {"synchronous"_sd, "fixedForTesting"_sd};
+ auto value = params["net.serviceExecutor"].as<std::string>();
+ if (std::find(valid.begin(), valid.end(), value) == valid.end()) {
+ return {ErrorCodes::BadValue, "Unsupported value for serviceExecutor"};
+ }
+ serverGlobalParams.serviceExecutor = value;
+ } else {
+ serverGlobalParams.serviceExecutor = "synchronous";
+ }
+
if (params.count("security.transitionToAuth")) {
serverGlobalParams.transitionToAuth = params["security.transitionToAuth"].as<bool>();
}
diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp
index 805a6f0802e..2b4eb81c354 100644
--- a/src/mongo/db/service_context.cpp
+++ b/src/mongo/db/service_context.cpp
@@ -184,6 +184,11 @@ ServiceEntryPoint* ServiceContext::getServiceEntryPoint() const {
return _serviceEntryPoint.get();
}
+transport::ServiceExecutor* ServiceContext::getServiceExecutor() const {
+ return _serviceExecutor.get();
+}
+
+
TickSource* ServiceContext::getTickSource() const {
return _tickSource.get();
}
@@ -216,6 +221,10 @@ void ServiceContext::setTransportLayer(std::unique_ptr<transport::TransportLayer
_transportLayer = std::move(tl);
}
+void ServiceContext::setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec) {
+ _serviceExecutor = std::move(exec);
+}
+
void ServiceContext::ClientDeleter::operator()(Client* client) const {
ServiceContext* const service = client->getServiceContext();
{
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index 2386e3065a7..db486a72857 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -40,6 +40,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/transport/service_executor_base.h"
#include "mongo/transport/session.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/decorable.h"
@@ -371,6 +372,14 @@ public:
ServiceEntryPoint* getServiceEntryPoint() const;
/**
+ * Get the service executor for the service context.
+ *
+ * See ServiceStateMachine for how this is used. Some configurations may not have a service
+ * executor registered and this will return a nullptr.
+ */
+ transport::ServiceExecutor* getServiceExecutor() const;
+
+ /**
* Waits for the ServiceContext to be fully initialized and for all TransportLayers to have been
* added/started.
*
@@ -444,6 +453,11 @@ public:
*/
void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl);
+ /**
+ * Binds the service executor to the service context
+ */
+ void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec);
+
protected:
ServiceContext();
@@ -487,6 +501,11 @@ private:
std::unique_ptr<ServiceEntryPoint> _serviceEntryPoint;
/**
+ * The ServiceExecutor
+ */
+ std::unique_ptr<transport::ServiceExecutor> _serviceExecutor;
+
+ /**
* Vector of registered observers.
*/
std::vector<std::unique_ptr<ClientObserver>> _clientObservers;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 4558e93701a..a5c8746e759 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -353,6 +353,14 @@ static ExitCode runMongosServer() {
return EXIT_NET_ERROR;
}
+ if (auto svcExec = getGlobalServiceContext()->getServiceExecutor()) {
+ start = svcExec->start();
+ if (!start.isOK()) {
+ error() << "Failed to start the service executor: " << start;
+ return EXIT_NET_ERROR;
+ }
+ }
+
getGlobalServiceContext()->notifyStartupComplete();
#if !defined(_WIN32)
mongo::signalForkSuccess();
diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js
index cd2ebd5d34f..995dfd993cf 100644
--- a/src/mongo/shell/servers.js
+++ b/src/mongo/shell/servers.js
@@ -1054,6 +1054,15 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro
if (jsTest.options().auth) {
argArray.push(...['--setParameter', "enableLocalhostAuthBypass=false"]);
}
+
+ if (jsTest.options().serviceExecutor &&
+ (!programVersion || (parseInt(programVersion.split(".")[0]) >= 3 &&
+ parseInt(programVersion.split(".")[1]) >= 5))) {
+ if (!argArrayContains("serviceExecutor")) {
+ argArray.push(...["--serviceExecutor", jsTest.options().serviceExecutor]);
+ }
+ }
+
// Since options may not be backward compatible, mongos options are not
// set on older versions, e.g., mongos-3.0.
if (programName.endsWith('mongos')) {
diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js
index 2cb79d743ab..eff1c4aea03 100644
--- a/src/mongo/shell/utils.js
+++ b/src/mongo/shell/utils.js
@@ -198,6 +198,7 @@ var _jsTestOptions = {enableTestCommands: true}; // Test commands should be ena
jsTestOptions = function() {
if (TestData) {
return Object.merge(_jsTestOptions, {
+ serviceExecutor: TestData.serviceExecutor,
setParameters: TestData.setParameters,
setParametersMongos: TestData.setParametersMongos,
storageEngine: TestData.storageEngine,
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index ce0679290c9..5fe57bfa9e5 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -36,6 +36,9 @@ env.Library(
LIBDEPS=[
'transport_layer',
],
+ LIBDEPS_PRIVATE=[
+ 'service_executor',
+ ],
)
env.Library(
@@ -59,12 +62,27 @@ tlEnv.Library(
],
LIBDEPS=[
'transport_layer_common',
- '$BUILD_DIR/mongo/db/auth/authentication_restriction',
- '$BUILD_DIR/third_party/shim_asio',
'$BUILD_DIR/mongo/base/system_error',
+ '$BUILD_DIR/mongo/db/auth/authentication_restriction',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/stats/counters',
+ '$BUILD_DIR/third_party/shim_asio',
+ ],
+)
+
+tlEnv.Library(
+ target='service_executor',
+ source=[
+ 'service_executor_base.cpp',
+ 'service_executor_fixed.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/service_context',
+ ],
+ LIBDEPS_PRIVATE=[
+ "$BUILD_DIR/mongo/util/processinfo",
+ '$BUILD_DIR/third_party/shim_asio',
],
)
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index b03bb65825a..834e76ab16b 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -53,6 +53,11 @@ public:
virtual void startSession(transport::SessionHandle session) = 0;
/**
+ * End all sessions that do not match the mask in tags.
+ */
+ virtual void endAllSessions(transport::Session::TagMask tags) = 0;
+
+ /**
* Processes a request and fills out a DbResponse.
*/
virtual DbResponse handleRequest(OperationContext* opCtx, const Message& request) = 0;
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index a4f5c6a665d..cd2fa26ea1e 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -38,6 +38,7 @@
#include "mongo/transport/service_entry_point_utils.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/transport/session.h"
+#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"
#include "mongo/util/scopeguard.h"
@@ -52,14 +53,30 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr);
RestrictionEnvironment::set(session, std::move(restrictionEnvironment));
- // Pass ownership of the transport::SessionHandle into our worker thread. When this
- // thread exits, the session will end.
- //
- launchServiceWorkerThread([ this, session = std::move(session) ]() mutable {
+ SSMListIterator ssmIt;
+
+ const auto sync = (_svcCtx->getServiceExecutor() == nullptr);
+ auto ssm = ServiceStateMachine::create(_svcCtx, std::move(session), sync);
+ {
+ stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
+ ssmIt = _sessions.emplace(_sessions.begin(), ssm);
+ }
+
+ ssm->setCleanupHook([this, ssmIt] {
+ stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
+ _sessions.erase(ssmIt);
+ });
+
+ if (!sync) {
+ dassert(_svcCtx->getServiceExecutor());
+ ssm->scheduleNext();
+ return;
+ }
+
+ launchServiceWorkerThread([ this, ssm = std::move(ssm) ]() mutable {
_nWorkers.addAndFetch(1);
- const auto guard = MakeGuard([this] { _nWorkers.subtractAndFetch(1); });
+ const auto guard = MakeGuard([this, &ssm] { _nWorkers.subtractAndFetch(1); });
- ServiceStateMachine ssm(_svcCtx, std::move(session), true);
const auto numCores = [] {
ProcessInfo p;
if (auto availCores = p.getNumAvailableCores()) {
@@ -68,12 +85,53 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
return static_cast<unsigned>(p.getNumCores());
}();
- while (ssm.state() != ServiceStateMachine::State::Ended) {
- ssm.runNext();
+ while (ssm->state() != ServiceStateMachine::State::Ended) {
+ ssm->runNext();
+
+ /*
+ * In perf testing we found that yielding after running a each request produced
+ * at 5% performance boost in microbenchmarks if the number of worker threads
+ * was greater than the number of available cores.
+ */
if (_nWorkers.load() > numCores)
stdx::this_thread::yield();
}
});
}
+void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
+ SSMList connsToEnd;
+
+ // While holding the _sesionsMutex, loop over all the current connections, and if their tags
+ // do not match the requested tags to skip, create a copy of their shared_ptr and place it in
+ // connsToEnd.
+ //
+ // This will ensure that sessions to be ended will live at least long enough for us to call
+ // their terminate() function, even if they've already ended because of an i/o error.
+ {
+ stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
+ for (auto& ssm : _sessions) {
+ if (ssm->session()->getTags() & tags) {
+ log() << "Skip closing connection for connection # " << ssm->session()->id();
+ } else {
+ connsToEnd.emplace_back(ssm);
+ }
+ }
+ }
+
+ // Loop through all the connections we marked for ending and call terminate on them. They will
+ // then remove themselves from _sessions whenever they transition to the next state.
+ //
+ // If they've already ended, then this is a noop, and the SSM will be destroyed when connsToEnd
+ // goes out of scope.
+ for (auto& ssm : connsToEnd) {
+ ssm->terminate();
+ }
+}
+
+std::size_t ServiceEntryPointImpl::getNumberOfConnections() const {
+ stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
+ return _sessions.size();
+}
+
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 6dd6047d9d2..fa1f92eb545 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -28,12 +28,13 @@
#pragma once
-#include <vector>
#include "mongo/base/disallow_copying.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_state_machine.h"
namespace mongo {
class ServiceContext;
@@ -57,13 +58,19 @@ public:
void startSession(transport::SessionHandle session) final;
- std::size_t getNumberOfActiveWorkerThreads() const {
- return _nWorkers.load();
- }
+ void endAllSessions(transport::Session::TagMask tags) final;
+
+ std::size_t getNumberOfConnections() const;
private:
- ServiceContext* _svcCtx;
+ using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>;
+ using SSMListIterator = SSMList::iterator;
+
+ ServiceContext* const _svcCtx;
AtomicWord<std::size_t> _nWorkers;
+
+ mutable stdx::mutex _sessionsMutex;
+ SSMList _sessions;
};
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp
index 0cc465aff92..0f1fd2f07bd 100644
--- a/src/mongo/transport/service_entry_point_mock.cpp
+++ b/src/mongo/transport/service_entry_point_mock.cpp
@@ -46,14 +46,7 @@ ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl)
: _tl(tl), _inShutdown(false) {}
ServiceEntryPointMock::~ServiceEntryPointMock() {
- {
- stdx::lock_guard<stdx::mutex> lk(_shutdownLock);
- _inShutdown = true;
- }
-
- for (auto& t : _threads) {
- t.join();
- }
+ endAllSessions(transport::Session::kEmptyTagMask);
}
void ServiceEntryPointMock::startSession(transport::SessionHandle session) {
@@ -106,4 +99,15 @@ DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx, const M
return {Message(b.release()), ""};
}
+void ServiceEntryPointMock::endAllSessions(transport::Session::TagMask) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_shutdownLock);
+ _inShutdown = true;
+ }
+
+ for (auto& t : _threads) {
+ t.join();
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h
index 58c6149bf7d..719454f16dd 100644
--- a/src/mongo/transport/service_entry_point_mock.h
+++ b/src/mongo/transport/service_entry_point_mock.h
@@ -65,6 +65,8 @@ public:
*/
void startSession(transport::SessionHandle session) override;
+ void endAllSessions(transport::Session::TagMask tags) override;
+
DbResponse handleRequest(OperationContext* opCtx, const Message& request) override;
private:
diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp
index a774285a133..f1efcbf8580 100644
--- a/src/mongo/transport/service_entry_point_test_suite.cpp
+++ b/src/mongo/transport/service_entry_point_test_suite.cpp
@@ -139,10 +139,6 @@ void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session
return _end(session);
}
-void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions(Session::TagMask tags) {
- return _endAllSessions(tags);
-}
-
Status ServiceEntryPointTestSuite::MockTLHarness::setup() {
return Status::OK();
}
diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h
index 45ebc589978..7c31e38f5b0 100644
--- a/src/mongo/transport/service_entry_point_test_suite.h
+++ b/src/mongo/transport/service_entry_point_test_suite.h
@@ -112,7 +112,6 @@ public:
Stats sessionStats() override;
void end(const transport::SessionHandle& session) override;
- void endAllSessions(transport::Session::TagMask tags) override;
Status setup() override;
Status start() override;
void shutdown() override;
@@ -128,8 +127,6 @@ public:
stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait;
stdx::function<void(const transport::SessionHandle&)> _end;
stdx::function<void(SEPTestSession& session)> _destroy_hook;
- stdx::function<void(transport::Session::TagMask tags)> _endAllSessions =
- [](transport::Session::TagMask tags) {};
stdx::function<Status(void)> _start = [] { return Status::OK(); };
stdx::function<void(void)> _shutdown = [] {};
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
new file mode 100644
index 00000000000..fc2d442eed5
--- /dev/null
+++ b/src/mongo/transport/service_executor.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/stdx/functional.h"
+
+namespace mongo {
+// This needs to be forward declared here because the service_context.h is a circular dependency.
+class ServiceContext;
+
+namespace transport {
+
+/*
+ * This is the interface for all ServiceExecutors.
+ */
+class ServiceExecutor {
+public:
+ virtual ~ServiceExecutor() = default;
+ using Task = stdx::function<void()>;
+
+ /*
+ * Starts the ServiceExecutor. This may create threads even if no tasks are scheduled.
+ */
+ virtual Status start() = 0;
+
+ /*
+ * Schedules a task with the ServiceExecutor and returns immediately.
+ *
+ * This is guaranteed to unwind the stack before running the task, although the task may be
+ * run later in the same thread.
+ */
+ virtual Status schedule(Task task) = 0;
+
+ /*
+ * Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any
+ * associated callbacks waiting on I/O may get called with an error code.
+ *
+ * This should only be called during server shutdown to gracefully destroy the ServiceExecutor
+ */
+ virtual Status shutdown() = 0;
+
+ /*
+ * Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output.
+ */
+ virtual void appendStats(BSONObjBuilder* bob) const = 0;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_base.cpp b/src/mongo/transport/service_executor_base.cpp
new file mode 100644
index 00000000000..b2284c13f23
--- /dev/null
+++ b/src/mongo/transport/service_executor_base.cpp
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor;
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/service_executor_base.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/service_context.h"
+
+namespace mongo {
+namespace transport {
+namespace {
+
+constexpr auto kTotalScheduled = "totalScheduled"_sd;
+constexpr auto kTotalExecuted = "totalExecuted"_sd;
+constexpr auto kQueueDepth = "queueDepth"_sd;
+constexpr auto kTotalTimeRunningUs = "totalTimeRunningMicros"_sd;
+constexpr auto kTotalTimeQueuedUs = "totalTimeQueuedMicros"_sd;
+
+int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) {
+ invariant(tickSource->getTicksPerSecond() > 1000000);
+ static const auto ticksPerMicro = tickSource->getTicksPerSecond() / 1000000;
+ return ticks / ticksPerMicro;
+}
+
+} // namespace
+
+ServiceExecutorBase::ServiceExecutorBase(ServiceContext* ctx) : _tickSource{ctx->getTickSource()} {}
+
+Status ServiceExecutorBase::schedule(ServiceExecutorBase::Task task) {
+
+ const auto scheduledTime = _tickSource->getTicks();
+ auto wrapped = [ this, task = std::move(task), scheduledTime ] {
+ auto start = _tickSource->getTicks();
+ task();
+ auto end = _tickSource->getTicks();
+ _outstandingTasks.subtractAndFetch(1);
+ _tasksExecuted.addAndFetch(1);
+ _ticksRunning.addAndFetch(end - start);
+ _ticksQueued.addAndFetch(start - scheduledTime);
+ };
+
+ auto ret = _schedule(std::move(wrapped));
+ if (ret.isOK()) {
+ _tasksScheduled.addAndFetch(1);
+ _outstandingTasks.addAndFetch(1);
+ }
+
+ return ret;
+}
+
+ServiceExecutorBase::Stats ServiceExecutorBase::getStats() const {
+ return {_ticksRunning.load(),
+ _ticksQueued.load(),
+ _tasksExecuted.load(),
+ _tasksScheduled.load(),
+ _outstandingTasks.load()};
+}
+
+void ServiceExecutorBase::appendStats(BSONObjBuilder* bob) const {
+ const auto stats = getStats();
+
+ BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats"));
+ section << kTotalScheduled << static_cast<int64_t>(stats.tasksScheduled) << kTotalExecuted
+ << static_cast<int64_t>(stats.tasksExecuted) << kQueueDepth << stats.outstandingTasks
+ << kTotalTimeRunningUs << ticksToMicros(stats.ticksRunning, _tickSource)
+ << kTotalTimeQueuedUs << ticksToMicros(stats.ticksQueued, _tickSource);
+ section.doneFast();
+}
+
+TickSource* ServiceExecutorBase::tickSource() const {
+ return _tickSource;
+}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_base.h b/src/mongo/transport/service_executor_base.h
new file mode 100644
index 00000000000..0cc024efaa8
--- /dev/null
+++ b/src/mongo/transport/service_executor_base.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/service_executor.h"
+#include "mongo/util/tick_source.h"
+
+namespace mongo {
+namespace transport {
+/*
+ * This is the base class of ServiceExecutors.
+ *
+ * Service executors should derive from this class and implement scheduleImpl(). They may
+ * get timing/counter statistics by calling getStats().
+ */
+class ServiceExecutorBase : public ServiceExecutor {
+public:
+ Status schedule(Task task) final;
+
+ struct Stats {
+ TickSource::Tick ticksRunning; // Total number of ticks spent running tasks
+ TickSource::Tick ticksQueued; // Total number of ticks tasks have spent waiting to run
+ int64_t tasksExecuted; // Total number of tasks executed
+ int64_t tasksScheduled; // Total number of tasks scheduled
+ int64_t outstandingTasks; // Current number of tasks waiting to be run
+ };
+
+ Stats getStats() const;
+ void appendStats(BSONObjBuilder* bob) const final;
+
+protected:
+ explicit ServiceExecutorBase(ServiceContext* ctx);
+
+ TickSource* tickSource() const;
+
+private:
+ // Sub-classes should implement this function to actually schedule the task. It will be called
+ // by schedule() with a wrapped task that does all the necessary stats/timing tracking.
+ virtual Status _schedule(Task task) = 0;
+
+ TickSource* _tickSource;
+ AtomicWord<TickSource::Tick> _ticksRunning{0};
+ AtomicWord<TickSource::Tick> _ticksQueued{0};
+ AtomicWord<int64_t> _tasksExecuted{0};
+ AtomicWord<int64_t> _tasksScheduled{0};
+ AtomicWord<int64_t> _outstandingTasks{0};
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
new file mode 100644
index 00000000000..cd299a75ac4
--- /dev/null
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor;
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/service_executor_fixed.h"
+
+#include "mongo/db/server_parameters.h"
+#include "mongo/util/log.h"
+#include "mongo/util/processinfo.h"
+
+#include <asio.hpp>
+
+namespace mongo {
+namespace transport {
+namespace {
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(fixedServiceExecutorNumThreads, int, -1);
+
+} // namespace
+
+ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx,
+ std::shared_ptr<asio::io_context> ioCtx)
+ : ServiceExecutorBase(ctx), _ioContext(std::move(ioCtx)) {}
+
+ServiceExecutorFixed::~ServiceExecutorFixed() {
+ invariant(!_isRunning.load());
+}
+
+Status ServiceExecutorFixed::start() {
+ invariant(!_isRunning.load());
+
+ auto threadCount = fixedServiceExecutorNumThreads;
+ if (threadCount == -1) {
+ ProcessInfo pi;
+ threadCount = pi.getNumAvailableCores().value_or(pi.getNumCores());
+ log() << "No thread count configured for fixed executor. Using number of cores: "
+ << threadCount;
+ }
+
+ _isRunning.store(true);
+ for (auto i = 0; i < threadCount; i++) {
+ _threads.push_back(stdx::thread([this, i] {
+ auto threadId = i + 1;
+ LOG(3) << "Starting worker thread " << threadId;
+ asio::io_context::work work(*_ioContext);
+ while (_isRunning.load()) {
+ _ioContext->run();
+ }
+ LOG(3) << "Exiting worker thread " << threadId;
+ }));
+ }
+
+ return Status::OK();
+}
+
+Status ServiceExecutorFixed::shutdown() {
+ invariant(_isRunning.load());
+
+ _isRunning.store(false);
+ _ioContext->stop();
+ for (auto& thread : _threads) {
+ thread.join();
+ }
+ _threads.clear();
+
+ return Status::OK();
+}
+
+Status ServiceExecutorFixed::_schedule(ServiceExecutorFixed::Task task) {
+ _ioContext->post(std::move(task));
+ return Status::OK();
+}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
new file mode 100644
index 00000000000..14203cd794f
--- /dev/null
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_executor_base.h"
+#include "mongo/transport/transport_layer_asio.h"
+
+namespace mongo {
+namespace transport {
+
+/**
+ * This is an ASIO-based fixed-size ServiceExecutor. It should only be used for testing because
+ * it won't add any threads if all threads become blocked.
+ */
+class ServiceExecutorFixed : public ServiceExecutorBase {
+public:
+ explicit ServiceExecutorFixed(ServiceContext* ctx, std::shared_ptr<asio::io_context> ioCtx);
+ virtual ~ServiceExecutorFixed();
+
+ Status start() final;
+ Status shutdown() final;
+
+private:
+ Status _schedule(Task task) final;
+
+ std::shared_ptr<asio::io_context> _ioContext;
+ std::vector<stdx::thread> _threads;
+ AtomicWord<bool> _isRunning{false};
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index d755a5f4c61..624b22dd1d8 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -51,7 +51,6 @@
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/thread_idle_callback.h"
#include "mongo/util/quick_exit.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
@@ -91,12 +90,98 @@ bool setExhaustMessage(Message* m, const DbResponse& dbresponse) {
} // namespace
using transport::TransportLayer;
+
+/*
+ * This class wraps up the logic for swapping/unswapping the Client during runNext().
+ */
+class ServiceStateMachine::ThreadGuard {
+ ThreadGuard(ThreadGuard&) = delete;
+ ThreadGuard& operator=(ThreadGuard&) = delete;
+
+public:
+ explicit ThreadGuard(ServiceStateMachine* ssm)
+ : _ssm{ssm},
+ _haveTakenOwnership{!_ssm->_isOwned.test_and_set()},
+ _oldThreadName{getThreadName().toString()} {
+ const auto currentOwningThread = _ssm->_currentOwningThread.load();
+ const auto currentThreadId = stdx::this_thread::get_id();
+
+ // If this is true, then we are the "owner" of the Client and we should swap the
+ // client/thread name before doing any work.
+ if (_haveTakenOwnership) {
+ _ssm->_currentOwningThread.store(currentThreadId);
+
+ // Set up the thread name
+ setThreadName(_ssm->_threadName);
+
+ // These are sanity checks to make sure that the Client is what we expect it to be
+ invariant(!haveClient());
+ invariant(_ssm->_dbClient.get() == _ssm->_dbClientPtr);
+
+ // Swap the current Client so calls to cc() work as expected
+ Client::setCurrent(std::move(_ssm->_dbClient));
+ } else if (currentOwningThread != currentThreadId) {
+ // If the currentOwningThread does not equal the currentThreadId, then another thread
+ // currently "owns" the Client and we should reschedule ourself.
+ _okayToRunNext = false;
+ }
+ }
+
+ ~ThreadGuard() {
+ // If we are not the owner of the SSM, then do nothing. Something higher up the call stack
+ // will have to clean up.
+ if (!_haveTakenOwnership)
+ return;
+
+ // If the session has ended, then assume that it's unsafe to do anything but call the
+ // cleanup hook.
+ if (_ssm->state() == State::Ended) {
+ // The cleanup hook may change as soon as we unlock the mutex, so move it out of the
+ // ssm before unlocking the lock.
+ auto cleanupHook = std::move(_ssm->_cleanupHook);
+ if (cleanupHook)
+ cleanupHook();
+
+ return;
+ }
+
+ // Otherwise swap thread locals and thread names back into the SSM so its ready for the
+ // next run.
+ if (haveClient()) {
+ _ssm->_dbClient = Client::releaseCurrent();
+ }
+ setThreadName(_oldThreadName);
+ _ssm->_isOwned.clear();
+ }
+
+ // This bool operator reflects whether the ThreadGuard was able to take ownership of the thread
+ // either higher up the call chain, or in this call. If this returns false, then it is not safe
+ // to assume the thread has been setup correctly, or that any mutable state of the SSM is safe
+ // to access except for the current _state value.
+ explicit operator bool() const {
+ return _okayToRunNext;
+ }
+
+private:
+ ServiceStateMachine* _ssm;
+ bool _haveTakenOwnership;
+ const std::string _oldThreadName;
+ bool _okayToRunNext = true;
+};
+
+std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ bool sync) {
+ return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), sync);
+}
+
ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
transport::SessionHandle session,
bool sync)
- : _state{State::Source},
+ : _state{State::Created},
_sep{svcContext->getServiceEntryPoint()},
_sync(sync),
+ _serviceContext(svcContext),
_dbClient{svcContext->makeClient("conn", std::move(session))},
_dbClientPtr{_dbClient.get()},
_threadName{str::stream() << "conn" << _dbClient->session()->id()},
@@ -108,55 +193,74 @@ const transport::SessionHandle& ServiceStateMachine::session() const {
}
void ServiceStateMachine::sourceCallback(Status status) {
+ // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
+ // thread.
+ ThreadGuard guard(this);
+ // If the guard wasn't able to take ownership of the thread, then reschedule this call to
+ // runNext() so that this thread can do other useful work with its timeslice instead of going
+ // to sleep while waiting for the SSM to be released.
+ if (!guard) {
+ return scheduleFunc([this, status] { sourceCallback(status); });
+ }
+
// Make sure we just called sourceMessage();
- invariant(_state == State::SourceWait);
+ invariant(state() == State::SourceWait);
auto remote = session()->remote();
if (status.isOK()) {
- _state = State::Process;
+ _state.store(State::Process);
} else if (ErrorCodes::isInterruption(status.code()) ||
ErrorCodes::isNetworkError(status.code())) {
LOG(2) << "Session from " << remote << " encountered a network error during SourceMessage";
- _state = State::EndSession;
+ _state.store(State::EndSession);
} else if (status == TransportLayer::TicketSessionClosedStatus) {
// Our session may have been closed internally.
LOG(2) << "Session from " << remote << " was closed internally during SourceMessage";
- _state = State::EndSession;
+ _state.store(State::EndSession);
} else {
log() << "Error receiving request from client: " << status << ". Ending connection from "
<< remote << " (connection id: " << session()->id() << ")";
- _state = State::EndSession;
+ _state.store(State::EndSession);
}
- // In asyncronous mode this is the entrypoint back into the database from the network layer
- // after a message has been received, so we want to call runNext() to process the message.
- //
- // In synchronous mode, runNext() will fall through to call processMessage() so we avoid
- // the recursive call.
- if (!_sync)
- return runNext();
+ runNextInGuard(guard);
}
void ServiceStateMachine::sinkCallback(Status status) {
- invariant(_state == State::SinkWait);
+ // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
+ // thread.
+ ThreadGuard guard(this);
+ // If the guard wasn't able to take ownership of the thread, then reschedule this call to
+ // runNext() so that this thread can do other useful work with its timeslice instead of going
+ // to sleep while waiting for the SSM to be released.
+ if (!guard) {
+ return scheduleFunc([this, status] { sinkCallback(status); });
+ }
+
+ invariant(state() == State::SinkWait);
if (!status.isOK()) {
log() << "Error sending response to client: " << status << ". Ending connection from "
<< session()->remote() << " (connection id: " << session()->id() << ")";
- _state = State::EndSession;
+ _state.store(State::EndSession);
} else if (inExhaust) {
- _state = State::Process;
+ _state.store(State::Process);
} else {
- _state = State::Source;
+ _state.store(State::Source);
}
- return scheduleNext();
+ // If the session ended, then runNext to clean it up
+ if (state() == State::EndSession) {
+ runNextInGuard(guard);
+ } else { // Otherwise scheduleNext to unwind the stack and run the next step later
+ scheduleNext();
+ }
}
void ServiceStateMachine::processMessage() {
// This may have been called just after a failure to source a message, in which case this
// should return early so the session can be cleaned up.
- if (_state != State::Process) {
+ if (state() != State::Process) {
return;
}
invariant(!_inMessage.empty());
@@ -174,7 +278,7 @@ void ServiceStateMachine::processMessage() {
networkCounter.hitLogicalIn(_inMessage.size());
- // 2. Pass sourced Message to handler to generate response.
+ // Pass sourced Message to handler to generate response.
auto opCtx = cc().makeOperationContext();
// The handleRequest is implemented in a subclass for mongod/mongos and actually all the
@@ -185,7 +289,7 @@ void ServiceStateMachine::processMessage() {
// up in currentOp results after the response reaches the client
opCtx.reset();
- // 3. Format our response, if we have one
+ // Format our response, if we have one
Message& toSink = dbresponse.response;
if (!toSink.empty()) {
toSink.header().setId(nextMessageId());
@@ -207,9 +311,10 @@ void ServiceStateMachine::processMessage() {
toSink = swm.getValue();
}
- // 4. Sink our response to the client
+ // Sink our response to the client
auto ticket = session()->sinkMessage(toSink);
- _state = State::SinkWait;
+
+ _state.store(State::SinkWait);
if (_sync) {
sinkCallback(session()->getTransportLayer()->wait(std::move(ticket)));
} else {
@@ -217,88 +322,44 @@ void ServiceStateMachine::processMessage() {
std::move(ticket), [this](Status status) { sinkCallback(status); });
}
} else {
- _state = State::Source;
+ _state.store(State::Source);
_inMessage.reset();
return scheduleNext();
}
}
-/*
- * This class wraps up the logic for swapping/unswapping the Client during runNext().
- */
-class ServiceStateMachine::ThreadGuard {
- ThreadGuard(ThreadGuard&) = delete;
- ThreadGuard& operator=(ThreadGuard&) = delete;
-
-public:
- explicit ThreadGuard(ServiceStateMachine* ssm)
- : _ssm{ssm},
- _haveTakenOwnership{!_ssm->_isOwned.test_and_set()},
- _oldThreadName{getThreadName().toString()} {
- const auto currentOwningThread = _ssm->_currentOwningThread.load();
- const auto currentThreadId = stdx::this_thread::get_id();
-
- // If this is true, then we are the "owner" of the Client and we should swap the
- // client/thread name before doing any work.
- if (_haveTakenOwnership) {
- _ssm->_currentOwningThread.store(currentThreadId);
-
- // Set up the thread name
- setThreadName(_ssm->_threadName);
-
- // These are sanity checks to make sure that the Client is what we expect it to be
- invariant(!haveClient());
- invariant(_ssm->_dbClient.get() == _ssm->_dbClientPtr);
-
- // Swap the current Client so calls to cc() work as expected
- Client::setCurrent(std::move(_ssm->_dbClient));
- } else if (currentOwningThread != currentThreadId) {
- // If the currentOwningThread does not equal the currentThreadId, then another thread
- // currently "owns" the Client and we should reschedule ourself.
- _okayToRunNext = false;
- }
- }
-
- ~ThreadGuard() {
- if (!_haveTakenOwnership)
- return;
-
- if (haveClient()) {
- _ssm->_dbClient = Client::releaseCurrent();
- }
- setThreadName(_oldThreadName);
- _ssm->_isOwned.clear();
+void ServiceStateMachine::runNext() {
+ // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
+ // thread.
+ ThreadGuard guard(this);
+ // If the guard wasn't able to take ownership of the thread, then reschedule this call to
+ // runNext() so that this thread can do other useful work with its timeslice instead of going
+ // to sleep while waiting for the SSM to be released.
+ if (!guard) {
+ return scheduleNext();
}
+ return runNextInGuard(guard);
+}
- void dismiss() {
- _haveTakenOwnership = false;
- }
+void ServiceStateMachine::runNextInGuard(ThreadGuard& guard) {
+ auto curState = state();
+ invariant(curState != State::Ended);
- explicit operator bool() const {
- return _okayToRunNext;
+ // If this is the first run of the SSM, then update its state to Source
+ if (curState == State::Created) {
+ curState = State::Source;
+ _state.store(curState);
}
-private:
- ServiceStateMachine* _ssm;
- bool _haveTakenOwnership;
- const std::string _oldThreadName;
- bool _okayToRunNext = true;
-};
-
-void ServiceStateMachine::runNext() {
- ThreadGuard guard(this);
- if (!guard)
- return scheduleNext();
-
// Make sure the current Client got set correctly
invariant(Client::getCurrent() == _dbClientPtr);
try {
- switch (_state) {
+ switch (curState) {
case State::Source: {
invariant(_inMessage.empty());
auto ticket = session()->sourceMessage(&_inMessage);
- _state = State::SourceWait;
+ _state.store(State::SourceWait);
if (_sync) {
MONGO_IDLE_THREAD_BLOCK;
sourceCallback(session()->getTransportLayer()->wait(std::move(ticket)));
@@ -319,14 +380,14 @@ void ServiceStateMachine::runNext() {
MONGO_UNREACHABLE;
}
- if (_state == State::EndSession) {
- guard.dismiss();
- endSession();
- }
-
if ((_counter++ & 0xf) == 0) {
markThreadIdle();
- };
+ }
+
+ if (state() == State::EndSession) {
+ cleanupSession();
+ }
+
return;
} catch (const AssertionException& e) {
log() << "AssertionException handling request, closing client connection: " << e;
@@ -340,16 +401,33 @@ void ServiceStateMachine::runNext() {
quickExit(EXIT_UNCAUGHT);
}
- _state = State::EndSession;
- guard.dismiss();
- endSession();
+ _state.store(State::EndSession);
+ cleanupSession();
}
-// TODO: Right now this is a noop because we only run in synchronous mode. When an async
-// TransportLayer is written, this will call the serviceexecutor to schedule calls to runNext().
-void ServiceStateMachine::scheduleNext() {}
+void ServiceStateMachine::scheduleNext() {
+ maybeScheduleFunc(_serviceContext->getServiceExecutor(), [this] { runNext(); });
+}
+
+void ServiceStateMachine::terminate() {
+ if (state() == State::Ended)
+ return;
+ auto tl = session()->getTransportLayer();
+ tl->end(session());
+}
+
+void ServiceStateMachine::setCleanupHook(stdx::function<void()> hook) {
+ invariant(state() == State::Created);
+ _cleanupHook = std::move(hook);
+}
+
+ServiceStateMachine::State ServiceStateMachine::state() {
+ return _state.load();
+}
+
+void ServiceStateMachine::cleanupSession() {
+ _state.store(State::Ended);
-void ServiceStateMachine::endSession() {
auto tl = session()->getTransportLayer();
_inMessage.reset();
@@ -362,32 +440,6 @@ void ServiceStateMachine::endSession() {
const char* word = (conns == 1 ? " connection" : " connections");
log() << "end connection " << remote << " (" << conns << word << " now open)";
}
-
- _state = State::Ended;
-}
-
-std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state) {
- switch (state) {
- case ServiceStateMachine::State::Source:
- stream << "source";
- break;
- case ServiceStateMachine::State::SourceWait:
- stream << "sourceWait";
- break;
- case ServiceStateMachine::State::Process:
- stream << "process";
- break;
- case ServiceStateMachine::State::SinkWait:
- stream << "sinkWait";
- break;
- case ServiceStateMachine::State::EndSession:
- stream << "endSession";
- break;
- case ServiceStateMachine::State::Ended:
- stream << "ended";
- break;
- }
- return stream;
}
} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index d1750a6eb98..e3212ad21b0 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -33,6 +33,8 @@
#include "mongo/base/status.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/session.h"
@@ -40,25 +42,29 @@
namespace mongo {
class ServiceEntryPoint;
-namespace transport {
-class ServiceExecutorBase;
-} // namespace transport
-
/*
* The ServiceStateMachine holds the state of a single client connection and represents the
* lifecycle of each user request as a state machine. It is the glue between the stateless
* ServiceEntryPoint and TransportLayer that ties network and database logic together for a
* user.
*/
-class ServiceStateMachine {
+class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {
ServiceStateMachine(ServiceStateMachine&) = delete;
ServiceStateMachine& operator=(ServiceStateMachine&) = delete;
public:
- ServiceStateMachine() = default;
ServiceStateMachine(ServiceStateMachine&&) = default;
ServiceStateMachine& operator=(ServiceStateMachine&&) = default;
+ /*
+ * Creates a new ServiceStateMachine for a given session/service context. If sync is true,
+ * then calls into the transport layer will block while they complete, otherwise they will
+ * be handled asynchronously.
+ */
+ static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ bool sync);
+
ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync);
/*
@@ -69,6 +75,7 @@ public:
* Source -> SourceWait -> Process -> Source (fire-and-forget)
*/
enum class State {
+ Created, // The session has been created, but no operations have been performed yet
Source, // Request a new Message from the network to handle
SourceWait, // Wait for the new Message to arrive from the network
Process, // Run the Message through the database
@@ -85,6 +92,9 @@ public:
* Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack
* if they have just completed a database operation to make sure that this doesn't infinitely
* recurse.
+ *
+ * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take
+ * ownership of the SSM, it will call scheduleNext() and return immediately.
*/
void runNext();
@@ -100,17 +110,60 @@ public:
/*
* Gets the current state of connection for testing/diagnostic purposes.
*/
- State state() const {
- return _state;
- }
+ State state();
+
+ /*
+ * Terminates the associated transport Session, and requests that the next call to runNext
+ * should end the session. If the session has already ended, this does nothing.
+ */
+ void terminate();
/*
- * Explicitly ends the session.
+ * Sets a function to be called after the session is ended
*/
- void endSession();
+ void setCleanupHook(stdx::function<void()> hook);
+
+ /*
+ * Gets the transport::Session associated with this connection
+ */
+ const transport::SessionHandle& session() const;
private:
/*
+ * A class that wraps up lifetime management of the _dbClient and _threadName for runNext();
+ */
+ class ThreadGuard;
+ friend class ThreadGuard;
+
+ /*
+ * This and scheduleFunc() are helper functions to schedule tasks on the serviceExecutor
+ * while maintaining a shared_ptr copy to anchor the lifetime of the SSM while waiting for
+ * callbacks to run.
+ */
+ template <typename Executor, typename Func>
+ void maybeScheduleFunc(Executor* svcExec, Func&& func) {
+ if (svcExec) {
+ uassertStatusOK(svcExec->schedule(
+ [ func = std::move(func), anchor = shared_from_this() ] { func(); }));
+ }
+ }
+
+ template <typename Func>
+ void scheduleFunc(Func&& func) {
+ auto svcExec = _serviceContext->getServiceExecutor();
+ invariant(svcExec);
+ maybeScheduleFunc(svcExec, func);
+ }
+
+ /*
+ * This is the actual implementation of runNext() that gets called after the ThreadGuard
+ * has been successfully created. If any callbacks (like sourceCallback()) need to call
+ * runNext() and already own a ThreadGuard, they should call this with that guard as the
+ * argument.
+ */
+ void runNextInGuard(ThreadGuard& guard);
+
+ /*
* This function actually calls into the database and processes a request. It's broken out
* into its own inline function for better readability.
*/
@@ -123,21 +176,20 @@ private:
void sinkCallback(Status status);
/*
- * A class that wraps up lifetime management of the _dbClient and _threadName for runNext();
+ * Releases all the resources associated with the session and call the cleanupHook.
*/
- class ThreadGuard;
- friend class ThreadGuard;
-
- const transport::SessionHandle& session() const;
+ void cleanupSession();
- State _state{State::Source};
+ AtomicWord<State> _state{State::Created};
ServiceEntryPoint* _sep;
bool _sync;
+ ServiceContext* const _serviceContext;
ServiceContext::UniqueClient _dbClient;
const Client* _dbClientPtr;
const std::string _threadName;
+ stdx::function<void()> _cleanupHook;
bool inExhaust = false;
bool wasCompressed = false;
@@ -148,6 +200,32 @@ private:
std::atomic_flag _isOwned = ATOMIC_FLAG_INIT; // NOLINT
};
-std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state);
+template <typename T>
+T& operator<<(T& stream, const ServiceStateMachine::State& state) {
+ switch (state) {
+ case ServiceStateMachine::State::Created:
+ stream << "created";
+ break;
+ case ServiceStateMachine::State::Source:
+ stream << "source";
+ break;
+ case ServiceStateMachine::State::SourceWait:
+ stream << "sourceWait";
+ break;
+ case ServiceStateMachine::State::Process:
+ stream << "process";
+ break;
+ case ServiceStateMachine::State::SinkWait:
+ stream << "sinkWait";
+ break;
+ case ServiceStateMachine::State::EndSession:
+ stream << "endSession";
+ break;
+ case ServiceStateMachine::State::Ended:
+ stream << "ended";
+ break;
+ }
+ return stream;
+}
} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index 1153b554a27..fd65cb0cf6a 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -74,6 +74,8 @@ public:
return DbResponse{builder.finish()};
}
+ void endAllSessions(transport::Session::TagMask tags) override {}
+
void setUassertInHandler() {
_uassertInHandler = true;
}
@@ -213,8 +215,7 @@ protected:
sc->setTransportLayer(std::move(tl));
_tl->start().transitional_ignore();
- _ssm = stdx::make_unique<ServiceStateMachine>(
- getGlobalServiceContext(), _tl->createSession(), true);
+ _ssm = ServiceStateMachine::create(getGlobalServiceContext(), _tl->createSession(), true);
_tl->setSSM(_ssm.get());
}
@@ -228,7 +229,7 @@ protected:
MockTL* _tl;
MockSEP* _sep;
SessionHandle _session;
- std::unique_ptr<ServiceStateMachine> _ssm;
+ std::shared_ptr<ServiceStateMachine> _ssm;
bool _ranHandler;
};
@@ -236,7 +237,7 @@ ServiceStateMachine::State ServiceStateMachineFixture::runPingTest() {
_tl->setNextMessage(buildRequest(BSON("ping" << 1)));
ASSERT_FALSE(haveClient());
- ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Source);
+ ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Created);
log() << "run next";
_ssm->runNext();
auto ret = _ssm->state();
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 9586118093c..df0eb67b9eb 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -37,8 +37,6 @@
#include "mongo/util/net/ssl_types.h"
#endif
-#include "boost/optional.hpp"
-
#include "asio.hpp"
#ifdef MONGO_CONFIG_SSL
#include "asio/ssl.hpp"
@@ -57,8 +55,13 @@ public:
: _socket(std::move(socket)), _tl(tl) {}
virtual ~ASIOSession() {
- if (_sessionsListIterator) {
- _tl->eraseSession(*_sessionsListIterator);
+ if (_didPostAcceptSetup) {
+ // This is incremented in TransportLayerASIO::_acceptConnection if there are less than
+ // maxConns connections already established. A call to postAcceptSetup means that the
+ // session is valid and will be handed off to the ServiceEntryPoint.
+ //
+ // We decrement this here to keep the counters in the TL accurate.
+ _tl->_currentConnections.subtractAndFetch(1);
}
}
@@ -85,6 +88,7 @@ public:
void shutdown() {
std::error_code ec;
+ getSocket().cancel();
getSocket().shutdown(GenericSocket::shutdown_both, ec);
if (ec) {
error() << "Error closing socket: " << ec.message();
@@ -99,7 +103,7 @@ public:
#endif
}
- void postAcceptSetup(bool async, TransportLayerASIO::SessionsListIterator listIt) {
+ void postAcceptSetup(bool async) {
std::error_code ec;
_socket.non_blocking(async, ec);
fassert(40490, ec.value() == 0);
@@ -143,7 +147,8 @@ public:
if (ec) {
LOG(3) << "Unable to get remote endpoint address: " << ec.message();
}
- _sessionsListIterator.emplace(std::move(listIt));
+
+ _didPostAcceptSetup = true;
}
template <typename MutableBufferSequence, typename CompleteHandler>
@@ -153,7 +158,7 @@ public:
return opportunisticRead(sync, *_sslSocket, buffers, std::move(handler));
} else if (!_ranHandshake) {
invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value));
- auto postHandshakeCb = [this, sync, &buffers, handler](Status status, bool needsRead) {
+ auto postHandshakeCb = [this, sync, buffers, handler](Status status, bool needsRead) {
if (status.isOK()) {
if (needsRead) {
read(sync, buffers, handler);
@@ -167,7 +172,7 @@ public:
};
auto handshakeRecvCb =
- [ this, postHandshakeCb = std::move(postHandshakeCb), sync, &buffers, handler ](
+ [ this, postHandshakeCb = std::move(postHandshakeCb), sync, buffers, handler ](
const std::error_code& ec, size_t size) {
_ranHandshake = true;
if (ec) {
@@ -210,7 +215,14 @@ private:
std::error_code ec;
auto size = asio::read(stream, buffers, ec);
if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {
- asio::async_read(stream, buffers, std::move(handler));
+ // asio::read is a loop internally, so some of buffers may have been read into already.
+ // So we need to adjust the buffers passed into async_read to be offset by size, if
+ // size is > 0.
+ MutableBufferSequence asyncBuffers(buffers);
+ if (size > 0) {
+ asyncBuffers += size;
+ }
+ asio::async_read(stream, asyncBuffers, std::move(handler));
} else {
handler(ec, size);
}
@@ -224,7 +236,14 @@ private:
std::error_code ec;
auto size = asio::write(stream, buffers, ec);
if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {
- asio::async_write(stream, buffers, std::move(handler));
+ // asio::write is a loop internally, so some of buffers may have been read into already.
+ // So we need to adjust the buffers passed into async_write to be offset by size, if
+ // size is > 0.
+ ConstBufferSequence asyncBuffers(buffers);
+ if (size > 0) {
+ asyncBuffers += size;
+ }
+ asio::async_write(stream, asyncBuffers, std::move(handler));
} else {
handler(ec, size);
}
@@ -318,7 +337,7 @@ private:
#endif
TransportLayerASIO* const _tl;
- boost::optional<TransportLayerASIO::SessionsListIterator> _sessionsListIterator;
+ bool _didPostAcceptSetup = false;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index 186452c0098..47b0e379992 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -166,16 +166,6 @@ public:
virtual void end(const SessionHandle& session) = 0;
/**
- * End all active sessions in the TransportLayer. Tickets that have already been started via
- * wait() or asyncWait() will complete, but may return a failed Status. This method is
- * asynchronous and will return after all sessions have been notified to end.
- *
- * If a non-empty TagMask is provided, endAllSessions() will skip over sessions with matching
- * tags and leave them open.
- */
- virtual void endAllSessions(Session::TagMask tags) = 0;
-
- /**
* Start the TransportLayer. After this point, the TransportLayer will begin accepting active
* sessions from new transport::Endpoints.
*/
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 25a3e9c7832..9d0b894071d 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -81,7 +81,7 @@ std::shared_ptr<TransportLayerASIO::ASIOSession> TransportLayerASIO::createSessi
TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
ServiceEntryPoint* sep)
- : _ioContext(stdx::make_unique<asio::io_context>()),
+ : _ioContext(std::make_shared<asio::io_context>()),
#ifdef MONGO_CONFIG_SSL
_sslContext(nullptr),
#endif
@@ -129,10 +129,7 @@ void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {
TransportLayer::Stats TransportLayerASIO::sessionStats() {
TransportLayer::Stats ret;
- auto sessionCount = [this] {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _sessions.size();
- }();
+ auto sessionCount = _currentConnections.load();
ret.numOpenSessions = sessionCount;
ret.numCreatedSessions = _createdConnections.load();
ret.numAvailableSessions = static_cast<size_t>(_listenerOptions.maxConns) - sessionCount;
@@ -145,39 +142,6 @@ void TransportLayerASIO::end(const SessionHandle& session) {
asioSession->shutdown();
}
-void TransportLayerASIO::endAllSessions(Session::TagMask tags) {
- log() << "ASIO transport layer closing all connections";
- std::vector<ASIOSessionHandle> sessions;
- // This is more complicated than it seems. We need to lock() all the weak_ptrs in _sessions
- // and then end them if their tags don't match the tags passed into the function.
- //
- // When you lock the session, the lifetime of the session is extended by creating a new
- // shared_ptr, but the session could end before this lock is released, which means that we
- // must extend the lifetime of the session past the scope of the lock_guard, or else the
- // session's destructor will acquire a lock already held here and deadlock/crash.
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- sessions.reserve(_sessions.size());
- for (auto&& weakSession : _sessions) {
- if (auto session = weakSession.lock()) {
- sessions.emplace_back(std::move(session));
- }
- }
- }
-
- // Outside the lock we kill any sessions that don't match our tags.
- for (auto&& session : sessions) {
- if (session->getTags() & tags) {
- log() << "Skip closing connection for connection # " << session->id();
- } else {
- end(session);
- }
- }
-
- // Any other sessions that may have ended while this was running will get cleaned up by
- // sessions being destructed at the end of this function.
-}
-
Status TransportLayerASIO::setup() {
std::vector<std::string> listenAddrs;
if (_listenerOptions.ipList.empty()) {
@@ -293,18 +257,29 @@ Status TransportLayerASIO::start() {
void TransportLayerASIO::shutdown() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_running.store(false);
- _ioContext->stop();
+ // Loop through the acceptors and cancel their calls to async_accept. This will prevent new
+ // connections from being opened.
+ for (auto& acceptor : _acceptors) {
+ acceptor.cancel();
+ }
+
+ // If the listener thread is joinable (that is, we created/started a listener thread), then
+ // the io_context is owned exclusively by the TransportLayer and we should stop it and join
+ // the listener thread.
+ //
+ // Otherwise the ServiceExecutor may need to continue running the io_context to drain running
+ // connections, so we just cancel the acceptors and return.
if (_listenerThread.joinable()) {
+ // We should only have started a listener if the TransportLayer is in sync mode.
+ dassert(!_listenerOptions.async);
+ _ioContext->stop();
_listenerThread.join();
}
}
-void TransportLayerASIO::eraseSession(TransportLayerASIO::SessionsListIterator it) {
- if (it != _sessions.end()) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _sessions.erase(it);
- }
+const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() {
+ return _ioContext;
}
void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
@@ -316,6 +291,9 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
auto& socket = session->getSocket();
auto acceptCb = [ this, session = std::move(session), &acceptor ](std::error_code ec) mutable {
+ if (!_running.load())
+ return;
+
if (ec) {
log() << "Error accepting new connection on "
<< endpointToHostAndPort(acceptor.local_endpoint()) << ": " << ec.message();
@@ -323,20 +301,15 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
return;
}
- size_t connCount = 0;
- SessionsListIterator listIt;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_sessions.size() + 1 > _listenerOptions.maxConns) {
- log() << "connection refused because too many open connections: "
- << _sessions.size();
- _acceptConnection(acceptor);
- return;
- }
- listIt = _sessions.emplace(_sessions.end(), session);
+ size_t connCount = _currentConnections.addAndFetch(1);
+ if (connCount > _listenerOptions.maxConns) {
+ log() << "connection refused because too many open connections: " << connCount;
+ _currentConnections.subtractAndFetch(1);
+ _acceptConnection(acceptor);
+ return;
}
- session->postAcceptSetup(_listenerOptions.async, listIt);
+ session->postAcceptSetup(_listenerOptions.async);
_createdConnections.addAndFetch(1);
if (!serverGlobalParams.quiet.load()) {
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index ede44211c22..a9182ea9b8b 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -34,7 +34,6 @@
#include "mongo/config.h"
#include "mongo/db/server_options.h"
#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/list.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@@ -107,37 +106,33 @@ public:
void end(const SessionHandle& session) final;
- void endAllSessions(transport::Session::TagMask tags) final;
-
Status setup() final;
Status start() final;
void shutdown() final;
+ const std::shared_ptr<asio::io_context>& getIOContext();
+
private:
class ASIOSession;
class ASIOTicket;
class ASIOSourceTicket;
class ASIOSinkTicket;
- class SessionListGuard;
using ASIOSessionHandle = std::shared_ptr<ASIOSession>;
using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>;
using GenericAcceptor = asio::basic_socket_acceptor<asio::generic::stream_protocol>;
- using SessionsListIterator = stdx::list<std::weak_ptr<ASIOSession>>::iterator;
ASIOSessionHandle createSession();
- void eraseSession(SessionsListIterator it);
void _acceptConnection(GenericAcceptor& acceptor);
stdx::mutex _mutex;
std::vector<GenericAcceptor> _acceptors;
- stdx::list<std::weak_ptr<ASIOSession>> _sessions;
// Only used if _listenerOptions.async is false.
stdx::thread _listenerThread;
- std::unique_ptr<asio::io_context> _ioContext;
+ std::shared_ptr<asio::io_context> _ioContext;
#ifdef MONGO_CONFIG_SSL
std::unique_ptr<asio::ssl::context> _sslContext;
SSLParams::SSLModes _sslMode;
@@ -148,6 +143,7 @@ private:
Options _listenerOptions;
AtomicWord<size_t> _createdConnections{0};
+ AtomicWord<size_t> _currentConnections{0};
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp
index 94db97ee8ce..920baa5e5c4 100644
--- a/src/mongo/transport/transport_layer_legacy.cpp
+++ b/src/mongo/transport/transport_layer_legacy.cpp
@@ -50,14 +50,6 @@
namespace mongo {
namespace transport {
-namespace {
-struct lock_weak {
- template <typename T>
- std::shared_ptr<T> operator()(const std::weak_ptr<T>& p) const {
- return p.lock();
- }
-};
-} // namespace
TransportLayerLegacy::Options::Options(const ServerGlobalParams* params)
: port(params->port), ipList(params->bind_ip) {}
@@ -163,11 +155,7 @@ Ticket TransportLayerLegacy::sourceMessage(const SessionHandle& session,
TransportLayer::Stats TransportLayerLegacy::sessionStats() {
Stats stats;
- {
- stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
- stats.numOpenSessions = _sessions.size();
- }
-
+ stats.numOpenSessions = _currentConnections.load();
stats.numAvailableSessions = Listener::globalTicketHolder.available();
stats.numCreatedSessions = Listener::globalConnectionNumber.load();
@@ -222,48 +210,10 @@ void TransportLayerLegacy::_closeConnection(Connection* conn) {
Listener::globalTicketHolder.release();
}
-// Capture all of the weak pointers behind the lock, to delay their expiry until we leave the
-// locking context. This function requires proof of locking, by passing the lock guard.
-auto TransportLayerLegacy::lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const
- -> std::vector<LegacySessionHandle> {
- using std::begin;
- using std::end;
- std::vector<std::shared_ptr<LegacySession>> result;
- std::transform(begin(_sessions), end(_sessions), std::back_inserter(result), lock_weak());
- // Skip expired weak pointers.
- result.erase(std::remove(begin(result), end(result), nullptr), end(result));
- return result;
-}
-
-void TransportLayerLegacy::endAllSessions(Session::TagMask tags) {
- log() << "legacy transport layer closing all connections";
- {
- stdx::unique_lock<stdx::mutex> lk(_sessionsMutex);
- // We want to capture the shared_ptrs to our sessions in a way which lets us destroy them
- // outside of the lock.
- const auto sessions = lockAllSessions(lk);
-
- for (auto&& session : sessions) {
- if (session->getTags() & tags) {
- log() << "Skip closing connection for connection # "
- << session->conn()->connectionId;
- } else {
- _closeConnection(session->conn());
- }
- }
- // TODO(SERVER-27069): Revamp this lock to not cover the loop. This unlock was put here
- // specifically to minimize risk, just before the release of 3.4. The risk is that we would
- // be in the loop without the lock, which most of our testing didn't do. We must unlock
- // manually here, because the `sessions` vector must be destroyed *outside* of the lock.
- lk.unlock();
- }
-}
-
void TransportLayerLegacy::shutdown() {
_running.store(false);
_listener->shutdown();
_listenerThread.join();
- endAllSessions(Session::kEmptyTagMask);
}
void TransportLayerLegacy::_destroy(LegacySession& session) {
@@ -271,8 +221,7 @@ void TransportLayerLegacy::_destroy(LegacySession& session) {
_closeConnection(session.conn());
}
- stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
- _sessions.erase(session.getIter());
+ _currentConnections.subtractAndFetch(1);
}
Status TransportLayerLegacy::_runTicket(Ticket ticket) {
@@ -328,17 +277,8 @@ void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagin
amp->setLogLevel(logger::LogSeverity::Debug(1));
+ _currentConnections.addAndFetch(1);
auto session = LegacySession::create(std::move(amp), this);
- stdx::list<std::weak_ptr<LegacySession>> list;
- auto it = list.emplace(list.begin(), session);
-
- {
- // Add the new session to our list
- stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
- session->setIter(it);
- _sessions.splice(_sessions.begin(), list, it);
- }
-
invariant(_sep);
_sep->startSession(std::move(session));
}
diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h
index 781e7783034..07a0d808066 100644
--- a/src/mongo/transport/transport_layer_legacy.h
+++ b/src/mongo/transport/transport_layer_legacy.h
@@ -30,6 +30,7 @@
#include <vector>
+#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
@@ -85,7 +86,6 @@ public:
Stats sessionStats() override;
void end(const SessionHandle& session) override;
- void endAllSessions(transport::Session::TagMask tags) override;
void shutdown() override;
@@ -93,7 +93,6 @@ private:
class LegacySession;
using LegacySessionHandle = std::shared_ptr<LegacySession>;
using ConstLegacySessionHandle = std::shared_ptr<const LegacySession>;
- using SessionEntry = std::list<std::weak_ptr<LegacySession>>::iterator;
void _destroy(LegacySession& session);
@@ -104,8 +103,6 @@ private:
using NewConnectionCb = stdx::function<void(std::unique_ptr<AbstractMessagingPort>)>;
using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>;
- std::vector<LegacySessionHandle> lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const;
-
/**
* Connection object, to associate Sessions with AbstractMessagingPorts.
*/
@@ -153,14 +150,6 @@ private:
return _connection.get();
}
- void setIter(SessionEntry it) {
- _entry = std::move(it);
- }
-
- SessionEntry getIter() const {
- return _entry;
- }
-
private:
explicit LegacySession(std::unique_ptr<AbstractMessagingPort> amp,
TransportLayerLegacy* tl);
@@ -173,9 +162,6 @@ private:
TagMask _tags;
std::unique_ptr<Connection> _connection;
-
- // A handle to this session's entry in the TL's session list
- SessionEntry _entry;
};
/**
@@ -238,11 +224,8 @@ private:
std::unique_ptr<Listener> _listener;
stdx::thread _listenerThread;
- // TransportLayerLegacy holds non-owning pointers to all of its sessions.
- mutable stdx::mutex _sessionsMutex;
- stdx::list<std::weak_ptr<LegacySession>> _sessions;
-
AtomicWord<bool> _running;
+ AtomicWord<size_t> _currentConnections{0};
Options _options;
};
diff --git a/src/mongo/transport/transport_layer_legacy_test.cpp b/src/mongo/transport/transport_layer_legacy_test.cpp
index 103ad622d2b..39ec7236d3e 100644
--- a/src/mongo/transport/transport_layer_legacy_test.cpp
+++ b/src/mongo/transport/transport_layer_legacy_test.cpp
@@ -45,13 +45,25 @@ public:
ASSERT_NOT_OK(s);
tll->end(session);
+
+ _sessions.emplace_back(std::move(session));
}
DbResponse handleRequest(OperationContext* opCtx, const Message& request) override {
MONGO_UNREACHABLE;
}
+ // Sessions end as soon as they're started, so this doesn't need to do anything.
+ void endAllSessions(transport::Session::TagMask tags) override {
+ for (auto& session : _sessions) {
+ tll->end(session);
+ }
+ _sessions.clear();
+ }
+
transport::TransportLayerLegacy* tll = nullptr;
+
+ std::list<transport::SessionHandle> _sessions;
};
// This test verifies a fix for SERVER-28239. The actual service entry point in use by mongod and
@@ -61,7 +73,6 @@ TEST(TransportLayerLegacy, endSessionsDoesntDoubleClose) {
// Disabling this test until we can figure out the best way to allocate port numbers for unit
// tests
return;
-
ServiceEntryPointUtil sepu;
transport::TransportLayerLegacy::Options opts{};
@@ -89,7 +100,7 @@ TEST(TransportLayerLegacy, endSessionsDoesntDoubleClose) {
while (Listener::globalTicketHolder.used() == 0) {
}
- tll.endAllSessions(transport::Session::TagMask{});
+ sepu.endAllSessions(transport::Session::TagMask{});
while (Listener::globalTicketHolder.used() == 1) {
}
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index cdfc86cb93d..e28115c1e78 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer_asio.h"
#include "mongo/transport/transport_layer_legacy.h"
@@ -101,10 +102,6 @@ void TransportLayerManager::end(const SessionHandle& session) {
session->getTransportLayer()->end(session);
}
-void TransportLayerManager::endAllSessions(Session::TagMask tags) {
- _foreach([&tags](TransportLayer* tl) { tl->endAllSessions(tags); });
-}
-
// TODO Right now this and setup() leave TLs started if there's an error. In practice the server
// exits with an error and this isn't an issue, but we should make this more robust.
Status TransportLayerManager::start() {
@@ -151,7 +148,17 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
auto sep = ctx->getServiceEntryPoint();
if (config->transportLayer == "asio") {
transport::TransportLayerASIO::Options opts(config);
- transportLayer = stdx::make_unique<transport::TransportLayerASIO>(opts, sep);
+ if (config->serviceExecutor != "synchronous") {
+ opts.async = true;
+ }
+
+ auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep);
+
+ if (config->serviceExecutor == "fixedForTesting") {
+ ctx->setServiceExecutor(
+ stdx::make_unique<ServiceExecutorFixed>(ctx, transportLayerASIO->getIOContext()));
+ }
+ transportLayer = std::move(transportLayerASIO);
} else if (serverGlobalParams.transportLayer == "legacy") {
transport::TransportLayerLegacy::Options opts(config);
transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(opts, sep);
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 0fb762264ea..3c91e31719d 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -72,7 +72,6 @@ public:
Stats sessionStats() override;
void end(const SessionHandle& session) override;
- void endAllSessions(Session::TagMask tags) override;
Status start() override;
void shutdown() override;
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
index d1d7ec6e8fc..9b51fda6f5c 100644
--- a/src/mongo/transport/transport_layer_mock.cpp
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -119,14 +119,6 @@ void TransportLayerMock::end(const SessionHandle& session) {
_sessions[session->id()].ended = true;
}
-void TransportLayerMock::endAllSessions(Session::TagMask tags) {
- auto it = _sessions.begin();
- while (it != _sessions.end()) {
- end(it->second.session);
- it++;
- }
-}
-
Status TransportLayerMock::setup() {
return Status::OK();
}
@@ -138,7 +130,6 @@ Status TransportLayerMock::start() {
void TransportLayerMock::shutdown() {
if (!inShutdown()) {
_shutdown = true;
- endAllSessions(Session::kEmptyTagMask);
}
}
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
index d0eeee0e8ac..208a7d93c97 100644
--- a/src/mongo/transport/transport_layer_mock.h
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -67,7 +67,6 @@ public:
SessionHandle get(Session::Id id);
bool owns(Session::Id id);
void end(const SessionHandle& session) override;
- void endAllSessions(Session::TagMask tags) override;
Status setup() override;
Status start() override;
diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp
index 1cf90b395ff..2234095c806 100644
--- a/src/mongo/transport/transport_layer_mock_test.cpp
+++ b/src/mongo/transport/transport_layer_mock_test.cpp
@@ -249,13 +249,6 @@ void assertEnded(TransportLayer* tl,
}
}
-// endAllSessions() ends all sessions
-TEST_F(TransportLayerMockTest, EndAllSessions) {
- std::vector<SessionHandle> sessions = createSessions(tl());
- tl()->endAllSessions(Session::kEmptyTagMask);
- assertEnded(tl(), sessions);
-}
-
// shutdown() ends all sessions and shuts down
TEST_F(TransportLayerMockTest, Shutdown) {
std::vector<SessionHandle> sessions = createSessions(tl());