summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-04-13 16:22:24 -0400
committerSpencer T Brody <spencer@mongodb.com>2016-04-18 14:05:05 -0400
commite96fc394af9d694a86e6e5d5081bd3f9b4698bae (patch)
treebf87c047e0ee7d0cb1c548814a0c073775147669
parenta6caba6dd563c82c9662c1da6d9dffe37855381c (diff)
downloadmongo-e96fc394af9d694a86e6e5d5081bd3f9b4698bae.tar.gz
SERVER-23694 Move TaskExecutorPool currently owned by ShardRegistry to Grid
-rw-r--r--src/mongo/db/commands/conn_pool_stats.cpp9
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp6
-rw-r--r--src/mongo/db/s/sharding_state.cpp2
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp11
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp6
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h3
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp2
-rw-r--r--src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp47
-rw-r--r--src/mongo/s/client/shard_registry.cpp27
-rw-r--r--src/mongo/s/client/shard_registry.h46
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp4
-rw-r--r--src/mongo/s/commands/commands_public.cpp2
-rw-r--r--src/mongo/s/grid.cpp14
-rw-r--r--src/mongo/s/grid.h25
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp120
-rw-r--r--src/mongo/s/query/cluster_find.cpp2
-rw-r--r--src/mongo/s/server.cpp2
-rw-r--r--src/mongo/s/sharding_initialization.cpp21
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp19
-rw-r--r--src/mongo/s/sharding_test_fixture.h2
20 files changed, 180 insertions, 190 deletions
diff --git a/src/mongo/db/commands/conn_pool_stats.cpp b/src/mongo/db/commands/conn_pool_stats.cpp
index ea668b35489..08993fd918a 100644
--- a/src/mongo/db/commands/conn_pool_stats.cpp
+++ b/src/mongo/db/commands/conn_pool_stats.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/s/client/shard_registry.h"
@@ -85,10 +86,10 @@ public:
}
// Sharding connections, if we have any.
- auto registry = grid.shardRegistry();
- if (registry) {
- registry->appendConnectionStats(&stats);
- grid.catalogManager(txn)->appendConnectionStats(&stats);
+ auto grid = Grid::get(txn);
+ if (grid->shardRegistry()) {
+ grid->getExecutorPool()->appendConnectionStats(&stats);
+ grid->catalogManager(txn)->appendConnectionStats(&stats);
}
// Output to a BSON object.
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index a9b324eaaad..4fdef19c6b6 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -464,17 +464,19 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO
StatusWith<executor::RemoteCommandResponse> responseStatus(
Status{ErrorCodes::InternalError, "Uninitialized value"});
- auto scheduleStatus = grid.shardRegistry()->getExecutor()->scheduleRemoteCommand(
+ auto executor = grid.getExecutorPool()->getArbitraryExecutor();
+ auto scheduleStatus = executor->scheduleRemoteCommand(
executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj),
[&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
responseStatus = args.response;
});
+ // TODO: Update RemoteCommandTargeter on NotMaster errors.
if (!scheduleStatus.isOK()) {
return scheduleStatus.getStatus();
}
- grid.shardRegistry()->getExecutor()->wait(scheduleStatus.getValue());
+ executor->wait(scheduleStatus.getValue());
if (!responseStatus.isOK()) {
return responseStatus.getStatus();
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 8e12acc8872..bd79cf69884 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -174,7 +174,7 @@ void ShardingState::shutDown(OperationContext* txn) {
}
if (_getInitializationState() == InitializationState::kInitialized) {
- grid.shardRegistry()->shutdown();
+ grid.getExecutorPool()->shutdownAndJoin();
grid.catalogManager(txn)->shutDown(txn);
}
}
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index fbaa6a9d678..de9e8b00466 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -72,16 +72,17 @@ void initGrid(OperationContext* txn, const ConnectionString& configConnString) {
auto executorPool = stdx::make_unique<executor::TaskExecutorPool>();
executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExec));
+ executorPool->startup();
- auto shardRegistry(stdx::make_unique<ShardRegistry>(
- std::move(shardFactory), std::move(executorPool), mockNetwork, configConnString));
- shardRegistry->startup();
+ auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configConnString));
grid.init(
stdx::make_unique<CatalogManagerMock>(),
stdx::make_unique<CatalogCache>(),
std::move(shardRegistry),
- stdx::make_unique<ClusterCursorManager>(txn->getServiceContext()->getPreciseClockSource()));
+ stdx::make_unique<ClusterCursorManager>(txn->getServiceContext()->getPreciseClockSource()),
+ std::move(executorPool),
+ mockNetwork);
}
class ShardingStateTest : public mongo::unittest::Test {
@@ -106,7 +107,7 @@ public:
// Cleanup only if shard registry was initialized
if (grid.shardRegistry()) {
- grid.shardRegistry()->shutdown();
+ grid.getExecutorPool()->shutdownAndJoin();
grid.clearForUnitTests();
}
}
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 878f7d12083..6fd27bcdb9e 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -668,8 +668,8 @@ Status CatalogManagerReplicaSet::_log(OperationContext* txn,
const std::string& what,
const std::string& operationNS,
const BSONObj& detail) {
- Date_t now = grid.shardRegistry()->getExecutor()->now();
- const std::string hostName = grid.shardRegistry()->getNetwork()->getHostName();
+ Date_t now = Grid::get(txn)->getNetwork()->now();
+ const std::string hostName = Grid::get(txn)->getNetwork()->getHostName();
const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen();
ChangeLogType changeLog;
@@ -1111,7 +1111,7 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam
coll.setNs(ns);
coll.setDropped(true);
coll.setEpoch(ChunkVersion::DROPPED().epoch());
- coll.setUpdatedAt(grid.shardRegistry()->getNetwork()->now());
+ coll.setUpdatedAt(Grid::get(txn)->getNetwork()->now());
result = updateCollection(txn, ns.ns(), coll);
if (!result.isOK()) {
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index 288ee4a9b34..69e3667b49a 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -346,9 +346,6 @@ private:
// True if startup() has been called.
bool _started = false; // (M)
- // Last known highest opTime from the config server.
- repl::OpTime _configOpTime; // (M)
-
// Whether the logAction call should attempt to create the actionlog collection
AtomicInt32 _actionLogCollectionCreated{0}; // (S)
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
index ba9e5310f86..62dfa3e2633 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
@@ -219,7 +219,7 @@ protected:
// Expect the change log operation
expectChangeLogInsert(configHost,
- shardRegistry()->getNetwork()->now(),
+ network()->now(),
"addShard",
"",
BSON("name" << shardName << "host" << shardHost));
diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
index c077e869fde..f6f2a517486 100644
--- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
+++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
@@ -42,12 +42,15 @@
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/network_test_env.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager_mock.h"
#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/type_lockpings.h"
#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/client/shard_factory_mock.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/write_ops/batched_update_request.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
@@ -89,7 +92,8 @@ public:
}
std::shared_ptr<RemoteCommandTargeterMock> targeter() {
- return RemoteCommandTargeterMock::get(_shardRegistry->getConfigShard()->getTargeter());
+ return RemoteCommandTargeterMock::get(
+ grid.shardRegistry()->getConfigShard()->getTargeter());
}
DistLockCatalogImpl* catalog() {
@@ -97,10 +101,10 @@ public:
}
// Not thread safe
- void shutdownShardRegistry() {
+ void shutdownExecutor() {
if (!_shutdownCalled) {
_shutdownCalled = true;
- _shardRegistry->shutdown();
+ grid.getExecutorPool()->shutdownAndJoin();
}
}
@@ -121,19 +125,27 @@ private:
auto executorPool = stdx::make_unique<executor::TaskExecutorPool>();
executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExecutor));
+ executorPool->startup();
ConnectionString configCS(HostAndPort("dummy:1234"));
- _shardRegistry = stdx::make_unique<ShardRegistry>(
- stdx::make_unique<ShardFactoryMock>(), std::move(executorPool), network, configCS);
- _shardRegistry->startup();
+ auto shardRegistry =
+ stdx::make_unique<ShardRegistry>(stdx::make_unique<ShardFactoryMock>(), configCS);
- _distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(_shardRegistry.get());
+ _distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry.get());
+
+ grid.init(stdx::make_unique<CatalogManagerMock>(),
+ stdx::make_unique<CatalogCache>(),
+ std::move(shardRegistry),
+ std::unique_ptr<ClusterCursorManager>{nullptr},
+ std::move(executorPool),
+ network);
targeter()->setFindHostReturnValue(dummyHost);
}
void tearDown() override {
- shutdownShardRegistry();
+ shutdownExecutor();
+ grid.clearForUnitTests();
}
bool _shutdownCalled = false;
@@ -142,7 +154,6 @@ private:
CatalogManagerMock _catalogMgr;
- std::unique_ptr<ShardRegistry> _shardRegistry;
std::unique_ptr<DistLockCatalogImpl> _distLockCatalog;
OperationContextNoop _txn;
};
@@ -198,7 +209,7 @@ TEST_F(DistLockCatalogFixture, PingTargetError) {
}
TEST_F(DistLockCatalogFixture, PingRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->ping(txn(), "abcd", Date_t::now());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
@@ -442,7 +453,7 @@ TEST_F(DistLockCatalogFixture, GrabLockTargetError) {
}
TEST_F(DistLockCatalogFixture, GrabLockRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->grabLock(txn(), "", OID::gen(), "", "", Date_t::now(), "").getStatus();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
@@ -761,7 +772,7 @@ TEST_F(DistLockCatalogFixture, OvertakeLockTargetError) {
}
TEST_F(DistLockCatalogFixture, OvertakeLockRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status =
catalog()->overtakeLock(txn(), "", OID(), OID(), "", "", Date_t::now(), "").getStatus();
@@ -936,7 +947,7 @@ TEST_F(DistLockCatalogFixture, UnlockTargetError) {
}
TEST_F(DistLockCatalogFixture, UnlockRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->unlock(txn(), OID());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
@@ -1144,7 +1155,7 @@ TEST_F(DistLockCatalogFixture, GetServerTargetError) {
}
TEST_F(DistLockCatalogFixture, GetServerRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->getServerInfo(txn()).getStatus();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
@@ -1290,7 +1301,7 @@ TEST_F(DistLockCatalogFixture, StopPingTargetError) {
}
TEST_F(DistLockCatalogFixture, StopPingRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->stopPing(txn(), "");
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
@@ -1435,7 +1446,7 @@ TEST_F(DistLockCatalogFixture, GetPingTargetError) {
}
TEST_F(DistLockCatalogFixture, GetPingRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->getPing(txn(), "").getStatus();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
@@ -1520,7 +1531,7 @@ TEST_F(DistLockCatalogFixture, GetLockByTSTargetError) {
}
TEST_F(DistLockCatalogFixture, GetLockByTSRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->getLockByTS(txn(), OID()).getStatus();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
ASSERT_FALSE(status.reason().empty());
@@ -1607,7 +1618,7 @@ TEST_F(DistLockCatalogFixture, GetLockByNameTargetError) {
}
TEST_F(DistLockCatalogFixture, GetLockByNameRunCmdError) {
- shutdownShardRegistry();
+ shutdownExecutor();
auto status = catalog()->getLockByName(txn(), "x").getStatus();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 007942c9f06..cbbf3e18bc0 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -170,12 +170,8 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kWriteConcernErrors{
ErrorCodes::CannotSatisfyWriteConcern};
ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
- std::unique_ptr<executor::TaskExecutorPool> executorPool,
- executor::NetworkInterface* network,
ConnectionString configServerCS)
- : _shardFactory(std::move(shardFactory)),
- _executorPool(std::move(executorPool)),
- _network(network) {
+ : _shardFactory(std::move(shardFactory)) {
updateConfigServerConnectionString(configServerCS);
}
@@ -198,14 +194,6 @@ void ShardRegistry::_updateConfigServerConnectionString_inlock(ConnectionString
_addConfigShard_inlock();
}
-void ShardRegistry::startup() {
- _executorPool->startup();
-}
-
-void ShardRegistry::shutdown() {
- _executorPool->shutdownAndJoin();
-}
-
bool ShardRegistry::reload(OperationContext* txn) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
@@ -410,11 +398,6 @@ void ShardRegistry::toBSON(BSONObjBuilder* result) {
}
}
-void ShardRegistry::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
- // Get stats from the pool of task executors, including fixed executor within.
- _executorPool->appendConnectionStats(stats);
-}
-
void ShardRegistry::_addConfigShard_inlock() {
_addShard_inlock("config", _configServerCS);
}
@@ -579,7 +562,7 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig(
findCmdBuilder.append(LiteParsedQuery::cmdOptionMaxTimeMS,
durationCount<Milliseconds>(maxTime));
- QueryFetcher fetcher(_executorPool->getFixedExecutor(),
+ QueryFetcher fetcher(Grid::get(txn)->getExecutorPool()->getFixedExecutor(),
host.getValue(),
nss,
findCmdBuilder.done(),
@@ -636,7 +619,7 @@ StatusWith<BSONObj> ShardRegistry::runIdempotentCommandOnShard(
const std::string& dbName,
const BSONObj& cmdObj) {
auto response = _runCommandWithRetries(txn,
- _executorPool->getFixedExecutor(),
+ Grid::get(txn)->getExecutorPool()->getFixedExecutor(),
shard,
readPref,
dbName,
@@ -672,7 +655,7 @@ StatusWith<BSONObj> ShardRegistry::runIdempotentCommandOnConfig(
const BSONObj& cmdObj) {
auto response = _runCommandWithRetries(
txn,
- _executorPool->getFixedExecutor(),
+ Grid::get(txn)->getExecutorPool()->getFixedExecutor(),
getConfigShard(),
readPref,
dbName,
@@ -693,7 +676,7 @@ StatusWith<BSONObj> ShardRegistry::runCommandOnConfigWithRetries(
const BSONObj& cmdObj,
const ShardRegistry::ErrorCodesSet& errorsToCheck) {
auto response = _runCommandWithRetries(txn,
- _executorPool->getFixedExecutor(),
+ Grid::get(txn)->getExecutorPool()->getFixedExecutor(),
getConfigShard(),
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
dbname,
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 8c16ce310a5..866d0536625 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -86,42 +86,12 @@ public:
* Instantiates a new shard registry.
*
* @param shardFactory Makes shards
- * @param commandRunner Command runner for executing commands against hosts
- * @param executor Asynchronous task executor to use for making calls to shards and
- * config servers.
- * @param network Network interface backing executor.
* @param configServerCS ConnectionString used for communicating with the config servers
*/
- ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
- std::unique_ptr<executor::TaskExecutorPool> executorPool,
- executor::NetworkInterface* network,
- ConnectionString configServerCS);
+ ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, ConnectionString configServerCS);
~ShardRegistry();
- /**
- * Invokes the executor's startup method, which will start any networking/async execution
- * threads.
- */
- void startup();
-
- /**
- * Stops the executor thread and waits for it to join.
- */
- void shutdown();
-
- executor::TaskExecutor* getExecutor() const {
- return _executorPool->getFixedExecutor();
- }
-
- executor::TaskExecutorPool* getExecutorPool() const {
- return _executorPool.get();
- }
-
- executor::NetworkInterface* getNetwork() const {
- return _network;
- }
-
ConnectionString getConfigServerConnectionString() const;
/**
@@ -206,11 +176,6 @@ public:
void toBSON(BSONObjBuilder* result);
/**
- * Append information about the sharding subsystem's connection pools.
- */
- void appendConnectionStats(executor::ConnectionPoolStats* stats) const;
-
- /**
* Executes 'find' command against a config server matching the given read preference, and
* fetches *all* the results that the host will return until there are no more or until an error
* is returned.
@@ -369,15 +334,6 @@ private:
// to access outside of _mutex.
const std::unique_ptr<ShardFactory> _shardFactory;
- // Executor pool for scheduling work and remote commands to shards and config servers. Each
- // contained executor has a connection hook set on it for initialization sharding data on shards
- // and detecting if the catalog manager needs swapping.
- const std::unique_ptr<executor::TaskExecutorPool> _executorPool;
-
- // Network interface being used by _executor. Used for asking questions about the network
- // configuration, such as getting the current server's hostname.
- executor::NetworkInterface* const _network;
-
// Protects the _reloadState, config server connections string, and the lookup maps below.
mutable stdx::mutex _mutex;
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index e81f698ea53..eba2fb46523 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -205,7 +205,7 @@ public:
if (!needSplit) {
invariant(shardResults.size() == 1);
invariant(shardResults[0].target.getServers().size() == 1);
- auto executorPool = grid.shardRegistry()->getExecutorPool();
+ auto executorPool = grid.getExecutorPool();
const BSONObj reply =
uassertStatusOK(storePossibleCursor(shardResults[0].target.getServers()[0],
shardResults[0].result,
@@ -411,7 +411,7 @@ BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn,
throw RecvStaleConfigException("command failed because of stale config", result);
}
- auto executorPool = grid.shardRegistry()->getExecutorPool();
+ auto executorPool = grid.getExecutorPool();
result = uassertStatusOK(storePossibleCursor(HostAndPort(cursor->originalHost()),
result,
executorPool->getArbitraryExecutor(),
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 1d533d1dd36..1af094c366f 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -112,7 +112,7 @@ bool cursorCommandPassthrough(OperationContext* txn,
StatusWith<BSONObj> transformedResponse =
storePossibleCursor(HostAndPort(cursor->originalHost()),
response,
- grid.shardRegistry()->getExecutorPool()->getArbitraryExecutor(),
+ grid.getExecutorPool()->getArbitraryExecutor(),
grid.getCursorManager());
if (!transformedResponse.isOK()) {
return Command::appendCommandStatus(*out, transformedResponse.getStatus());
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index 9d36641bd05..3f6ff1f06bb 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -32,6 +32,8 @@
#include "mongo/s/grid.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/client/shard_registry.h"
@@ -44,7 +46,7 @@ namespace mongo {
// Global grid instance
Grid grid;
-Grid::Grid() : _allowLocalShard(true) {}
+Grid::Grid() : _network(nullptr), _allowLocalShard(true) {}
Grid::~Grid() = default;
@@ -55,16 +57,22 @@ Grid* Grid::get(OperationContext* operationContext) {
void Grid::init(std::unique_ptr<CatalogManager> catalogManager,
std::unique_ptr<CatalogCache> catalogCache,
std::unique_ptr<ShardRegistry> shardRegistry,
- std::unique_ptr<ClusterCursorManager> cursorManager) {
+ std::unique_ptr<ClusterCursorManager> cursorManager,
+ std::unique_ptr<executor::TaskExecutorPool> executorPool,
+ executor::NetworkInterface* network) {
invariant(!_catalogManager);
invariant(!_catalogCache);
invariant(!_shardRegistry);
invariant(!_cursorManager);
+ invariant(!_executorPool);
+ invariant(!_network);
_catalogManager = std::move(catalogManager);
_catalogCache = std::move(catalogCache);
_shardRegistry = std::move(shardRegistry);
_cursorManager = std::move(cursorManager);
+ _executorPool = std::move(executorPool);
+ _network = network;
}
bool Grid::allowLocalHost() const {
@@ -87,6 +95,8 @@ void Grid::clearForUnitTests() {
_catalogCache.reset();
_shardRegistry.reset();
_cursorManager.reset();
+ _executorPool.reset();
+ _network = nullptr;
_configOpTime = repl::OpTime();
}
diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h
index 7c96ab4e5cb..f0746b5f142 100644
--- a/src/mongo/s/grid.h
+++ b/src/mongo/s/grid.h
@@ -40,6 +40,11 @@ class ClusterCursorManager;
class OperationContext;
class ShardRegistry;
+namespace executor {
+class NetworkInterface;
+class TaskExecutorPool;
+} // namespace executor
+
/**
* Holds the global sharding context. Single instance exists for a running server. Exists on
* both MongoD and MongoS.
@@ -64,7 +69,9 @@ public:
void init(std::unique_ptr<CatalogManager> catalogManager,
std::unique_ptr<CatalogCache> catalogCache,
std::unique_ptr<ShardRegistry> shardRegistry,
- std::unique_ptr<ClusterCursorManager> cursorManager);
+ std::unique_ptr<ClusterCursorManager> cursorManager,
+ std::unique_ptr<executor::TaskExecutorPool> executorPool,
+ executor::NetworkInterface* network);
/**
* @return true if shards and config servers are allowed to use 'localhost' in address
@@ -96,6 +103,14 @@ public:
return _cursorManager.get();
}
+ executor::TaskExecutorPool* getExecutorPool() {
+ return _executorPool.get();
+ }
+
+ executor::NetworkInterface* getNetwork() {
+ return _network;
+ }
+
repl::OpTime configOpTime() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _configOpTime;
@@ -120,6 +135,14 @@ private:
std::unique_ptr<ShardRegistry> _shardRegistry;
std::unique_ptr<ClusterCursorManager> _cursorManager;
+ // Executor pool for scheduling work and remote commands to shards and config servers. Each
+ // contained executor has a connection hook set on it for sending/receiving sharding metadata.
+ std::unique_ptr<executor::TaskExecutorPool> _executorPool;
+
+ // Network interface being used by the fixed executor in _executorPool. Used for asking
+ // questions about the network configuration, such as getting the current server's hostname.
+ executor::NetworkInterface* _network;
+
// Protects _configOpTime.
mutable stdx::mutex _mutex;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 17826b9a0da..43dab10205e 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -67,8 +67,6 @@ public:
void setUp() override {
ShardingTestFixture::setUp();
- executor = shardRegistry()->getExecutor();
-
getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345));
configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
@@ -123,7 +121,7 @@ protected:
params.remotes.emplace_back(shardId, findCmd);
}
- arm = stdx::make_unique<AsyncResultsMerger>(executor, std::move(params));
+ arm = stdx::make_unique<AsyncResultsMerger>(executor(), std::move(params));
}
/**
@@ -138,7 +136,7 @@ protected:
params.remotes.emplace_back(hostIdPair.first, hostIdPair.second);
}
- arm = stdx::make_unique<AsyncResultsMerger>(executor, std::move(params));
+ arm = stdx::make_unique<AsyncResultsMerger>(executor(), std::move(params));
}
/**
@@ -200,8 +198,6 @@ protected:
const NamespaceString _nss;
- executor::TaskExecutor* executor;
-
std::unique_ptr<AsyncResultsMerger> arm;
};
@@ -234,7 +230,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
@@ -269,7 +265,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(12), batch3);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
@@ -299,7 +295,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
responses.emplace_back(_nss, CursorId(0), batch6);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
@@ -321,7 +317,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
responses.emplace_back(_nss, CursorId(0), batch7);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
@@ -349,7 +345,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
fromjson("{_id: 8, $sortKey: {'': 8}}")};
responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady()));
@@ -386,7 +382,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
fromjson("{$sortKey: {'': 8}}")};
responses.emplace_back(_nss, CursorId(2), batch3);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady()));
@@ -407,7 +403,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady()));
@@ -426,7 +422,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
responses.emplace_back(_nss, CursorId(0), batch5);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady()));
@@ -457,7 +453,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
fromjson("{$sortKey: {'': 5, '': 9}}")};
responses.emplace_back(_nss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"), *unittest::assertGet(arm->nextReady()));
@@ -488,7 +484,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
auto statusWithNext = arm->nextReady();
@@ -497,7 +493,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
// Required to kill the 'arm' on error before destruction.
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
@@ -528,7 +524,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -551,7 +547,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
@@ -573,7 +569,7 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) {
responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -603,7 +599,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -626,7 +622,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
blackHoleNextRequest();
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady()));
@@ -643,7 +639,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady()));
@@ -670,7 +666,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -689,14 +685,14 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
responses.emplace_back(_nss, CursorId(456), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT(!arm->nextReady().isOK());
// Required to kill the 'arm' on error before destruction.
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
@@ -715,7 +711,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3)
.toBSON(CursorResponse::ResponseType::InitialResponse);
scheduleNetworkResponseObjs({response1, response2, response3});
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
auto statusWithNext = arm->nextReady();
@@ -723,7 +719,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
// Required to kill the 'arm' on error before destruction.
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
@@ -742,7 +738,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"});
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
auto statusWithNext = arm->nextReady();
@@ -752,7 +748,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
// Required to kill the 'arm' on error before destruction.
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
@@ -769,7 +765,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -780,13 +776,13 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
// Required to kill the 'arm' on error before destruction.
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
makeCursorFromFindCmd(findCmd, kTestShardIds);
- executor->shutdown();
+ executor()->shutdown();
ASSERT_NOT_OK(arm->nextEvent().getStatus());
auto killEvent = arm->kill();
ASSERT_FALSE(killEvent.isValid());
@@ -803,7 +799,7 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
blackHoleNextRequest();
// Executor shuts down before a response is received.
- executor->shutdown();
+ executor()->shutdown();
auto killEvent = arm->kill();
ASSERT_FALSE(killEvent.isValid());
}
@@ -820,7 +816,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
ASSERT_TRUE(arm->ready());
ASSERT_NOT_OK(arm->nextReady().getStatus());
- executor->waitForEvent(killedEvent);
+ executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
@@ -844,7 +840,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
auto killedEvent = arm->kill();
ASSERT_TRUE(arm->ready());
ASSERT_NOT_OK(arm->nextReady().getStatus());
- executor->waitForEvent(killedEvent);
+ executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
@@ -882,8 +878,8 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
// Ensure that we properly signal both those waiting for the kill, and those waiting for more
// results to be ready.
- executor->waitForEvent(readyEvent);
- executor->waitForEvent(killedEvent);
+ executor()->waitForEvent(readyEvent);
+ executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
@@ -928,8 +924,8 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
// Ensure that we properly signal both those waiting for the kill, and those waiting for more
// results to be ready.
- executor->waitForEvent(readyEvent);
- executor->waitForEvent(killedEvent);
+ executor()->waitForEvent(readyEvent);
+ executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
@@ -950,7 +946,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
// Attempting to schedule more network operations on a killed arm is an error.
ASSERT_NOT_OK(arm->nextEvent().getStatus());
- executor->waitForEvent(killedEvent);
+ executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
@@ -960,8 +956,8 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
ASSERT(killedEvent1.isValid());
auto killedEvent2 = arm->kill();
ASSERT(killedEvent2.isValid());
- executor->waitForEvent(killedEvent1);
- executor->waitForEvent(killedEvent2);
+ executor()->waitForEvent(killedEvent1);
+ executor()->waitForEvent(killedEvent2);
}
TEST_F(AsyncResultsMergerTest, TailableBasic) {
@@ -976,7 +972,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -997,7 +993,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
responses.emplace_back(_nss, CursorId(123), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
@@ -1007,7 +1003,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
ASSERT_FALSE(arm->remotesExhausted());
auto killedEvent = arm->kill();
- executor->waitForEvent(killedEvent);
+ executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
@@ -1023,7 +1019,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
std::vector<BSONObj> batch;
responses.emplace_back(_nss, CursorId(123), batch);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
// After receiving an empty batch, the ARM should return boost::none, but remotes should not be
// marked as exhausted.
@@ -1032,7 +1028,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
ASSERT_FALSE(arm->remotesExhausted());
auto killedEvent = arm->kill();
- executor->waitForEvent(killedEvent);
+ executor()->waitForEvent(killedEvent);
}
TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
@@ -1048,7 +1044,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
std::vector<BSONObj> batch;
responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
// Afterwards, the ARM should return boost::none and remote cursors should be marked as
// exhausted.
@@ -1070,7 +1066,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -1091,7 +1087,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
ASSERT_EQ(request.getValue().cursorid, 1LL);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
@@ -1115,7 +1111,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch1);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -1143,7 +1139,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(99), batch2);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -1164,7 +1160,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.emplace_back(_nss, CursorId(99), batch3);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
@@ -1179,7 +1175,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.emplace_back(_nss, CursorId(0), batch4);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT(!unittest::assertGet(arm->nextReady()));
@@ -1197,7 +1193,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(98), batch);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -1235,7 +1231,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) {
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -1270,7 +1266,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) {
// Protocol is to kill the 'arm' on error before destruction.
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
@@ -1319,7 +1315,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(123), batch1);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
@@ -1343,7 +1339,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.emplace_back(_nss, CursorId(0), batch2);
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- executor->waitForEvent(readyEvent);
+ executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
@@ -1356,7 +1352,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
@@ -1364,7 +1360,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
auto killEvent = arm->kill();
- executor->waitForEvent(killEvent);
+ executor()->waitForEvent(killEvent);
}
} // namespace
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 9b64eca0fb5..692d74e932c 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -209,7 +209,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
}
auto ccc = ClusterClientCursorImpl::make(
- shardRegistry->getExecutorPool()->getArbitraryExecutor(), std::move(params));
+ Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
int bytesBuffered = 0;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 848b7057cca..4b48cc4eaa4 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -125,7 +125,7 @@ static void cleanupTask() {
auto cursorManager = grid.getCursorManager();
cursorManager->shutdown();
- grid.shardRegistry()->shutdown();
+ grid.getExecutorPool()->shutdownAndJoin();
grid.catalogManager(txn)->shutDown(txn);
}
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 5c005659809..1879fad0ae3 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -151,22 +151,23 @@ Status initializeGlobalShardingState(OperationContext* txn,
stdx::make_unique<ShardingNetworkConnectionHook>(),
std::move(metadataHook));
auto networkPtr = network.get();
+ auto executorPool = makeTaskExecutorPool(std::move(network), isMongos);
+ executorPool->startup();
+
auto shardRegistry(
- stdx::make_unique<ShardRegistry>(stdx::make_unique<ShardFactoryImpl>(),
- makeTaskExecutorPool(std::move(network), isMongos),
- networkPtr,
- configCS));
+ stdx::make_unique<ShardRegistry>(stdx::make_unique<ShardFactoryImpl>(), configCS));
auto catalogManager = makeCatalogManager(getGlobalServiceContext(),
shardRegistry.get(),
HostAndPort(getHostName(), serverGlobalParams.port));
- shardRegistry->startup();
- grid.init(std::move(catalogManager),
- stdx::make_unique<CatalogCache>(),
- std::move(shardRegistry),
- stdx::make_unique<ClusterCursorManager>(
- getGlobalServiceContext()->getPreciseClockSource()));
+ grid.init(
+ std::move(catalogManager),
+ stdx::make_unique<CatalogCache>(),
+ std::move(shardRegistry),
+ stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()),
+ std::move(executorPool),
+ networkPtr);
while (!inShutdown()) {
try {
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index 1587bfca09f..8584741c0c5 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -129,20 +129,21 @@ void ShardingTestFixture::setUp() {
_configTargeter = configTargeter.get();
_shardFactory->addTargeterToReturn(configCS, std::move(configTargeter));
- auto shardRegistry(stdx::make_unique<ShardRegistry>(
- std::move(shardFactory), std::move(executorPool), _mockNetwork, configCS));
- shardRegistry->startup();
+ auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS));
+ executorPool->startup();
// For now initialize the global grid object. All sharding objects will be accessible
// from there until we get rid of it.
grid.init(std::move(cm),
stdx::make_unique<CatalogCache>(),
std::move(shardRegistry),
- stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource()));
+ stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource()),
+ std::move(executorPool),
+ _mockNetwork);
}
void ShardingTestFixture::tearDown() {
- grid.shardRegistry()->shutdown();
+ grid.getExecutorPool()->shutdownAndJoin();
grid.catalogManager(_opCtx.get())->shutDown(_opCtx.get());
grid.clearForUnitTests();
@@ -190,6 +191,12 @@ executor::NetworkInterfaceMock* ShardingTestFixture::network() const {
return _mockNetwork;
}
+executor::TaskExecutor* ShardingTestFixture::executor() const {
+ invariant(_executor);
+
+ return _executor;
+}
+
MessagingPortMock* ShardingTestFixture::getMessagingPort() const {
return _messagePort.get();
}
@@ -346,7 +353,7 @@ void ShardingTestFixture::expectConfigCollectionInsert(const HostAndPort& config
const std::string timePiece = changeId.substr(firstDash + 1, lastDash - firstDash - 1);
const std::string oidPiece = changeId.substr(lastDash + 1);
- ASSERT_EQUALS(shardRegistry()->getNetwork()->getHostName(), serverPiece);
+ ASSERT_EQUALS(grid.getNetwork()->getHostName(), serverPiece);
ASSERT_EQUALS(timestamp.toString(), timePiece);
OID generatedOID;
diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h
index 6019bfe928d..894d039eb7f 100644
--- a/src/mongo/s/sharding_test_fixture.h
+++ b/src/mongo/s/sharding_test_fixture.h
@@ -89,6 +89,8 @@ protected:
executor::NetworkInterfaceMock* network() const;
+ executor::TaskExecutor* executor() const;
+
MessagingPortMock* getMessagingPort() const;
DistLockManagerMock* distLock() const;