diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-04-13 16:22:24 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-04-18 14:05:05 -0400 |
commit | e96fc394af9d694a86e6e5d5081bd3f9b4698bae (patch) | |
tree | bf87c047e0ee7d0cb1c548814a0c073775147669 | |
parent | a6caba6dd563c82c9662c1da6d9dffe37855381c (diff) | |
download | mongo-e96fc394af9d694a86e6e5d5081bd3f9b4698bae.tar.gz |
SERVER-23694 Move TaskExecutorPool currently owned by ShardRegistry to Grid
20 files changed, 180 insertions, 190 deletions
diff --git a/src/mongo/db/commands/conn_pool_stats.cpp b/src/mongo/db/commands/conn_pool_stats.cpp index ea668b35489..08993fd918a 100644 --- a/src/mongo/db/commands/conn_pool_stats.cpp +++ b/src/mongo/db/commands/conn_pool_stats.cpp @@ -37,6 +37,7 @@ #include "mongo/db/commands.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/s/client/shard_registry.h" @@ -85,10 +86,10 @@ public: } // Sharding connections, if we have any. - auto registry = grid.shardRegistry(); - if (registry) { - registry->appendConnectionStats(&stats); - grid.catalogManager(txn)->appendConnectionStats(&stats); + auto grid = Grid::get(txn); + if (grid->shardRegistry()) { + grid->getExecutorPool()->appendConnectionStats(&stats); + grid->catalogManager(txn)->appendConnectionStats(&stats); } // Output to a BSON object. diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index a9b324eaaad..4fdef19c6b6 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -464,17 +464,19 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO StatusWith<executor::RemoteCommandResponse> responseStatus( Status{ErrorCodes::InternalError, "Uninitialized value"}); - auto scheduleStatus = grid.shardRegistry()->getExecutor()->scheduleRemoteCommand( + auto executor = grid.getExecutorPool()->getArbitraryExecutor(); + auto scheduleStatus = executor->scheduleRemoteCommand( executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj), [&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { responseStatus = args.response; }); + // TODO: Update RemoteCommandTargeter on NotMaster errors. if (!scheduleStatus.isOK()) { return scheduleStatus.getStatus(); } - grid.shardRegistry()->getExecutor()->wait(scheduleStatus.getValue()); + executor->wait(scheduleStatus.getValue()); if (!responseStatus.isOK()) { return responseStatus.getStatus(); diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 8e12acc8872..bd79cf69884 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -174,7 +174,7 @@ void ShardingState::shutDown(OperationContext* txn) { } if (_getInitializationState() == InitializationState::kInitialized) { - grid.shardRegistry()->shutdown(); + grid.getExecutorPool()->shutdownAndJoin(); grid.catalogManager(txn)->shutDown(txn); } } diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index fbaa6a9d678..de9e8b00466 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -72,16 +72,17 @@ void initGrid(OperationContext* txn, const ConnectionString& configConnString) { auto executorPool = stdx::make_unique<executor::TaskExecutorPool>(); executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExec)); + executorPool->startup(); - auto shardRegistry(stdx::make_unique<ShardRegistry>( - std::move(shardFactory), std::move(executorPool), mockNetwork, configConnString)); - shardRegistry->startup(); + auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configConnString)); grid.init( stdx::make_unique<CatalogManagerMock>(), stdx::make_unique<CatalogCache>(), std::move(shardRegistry), - stdx::make_unique<ClusterCursorManager>(txn->getServiceContext()->getPreciseClockSource())); + stdx::make_unique<ClusterCursorManager>(txn->getServiceContext()->getPreciseClockSource()), + std::move(executorPool), + mockNetwork); } class ShardingStateTest : public mongo::unittest::Test { @@ -106,7 +107,7 @@ public: // Cleanup only if shard registry was initialized if (grid.shardRegistry()) { - grid.shardRegistry()->shutdown(); + grid.getExecutorPool()->shutdownAndJoin(); grid.clearForUnitTests(); } } diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 878f7d12083..6fd27bcdb9e 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -668,8 +668,8 @@ Status CatalogManagerReplicaSet::_log(OperationContext* txn, const std::string& what, const std::string& operationNS, const BSONObj& detail) { - Date_t now = grid.shardRegistry()->getExecutor()->now(); - const std::string hostName = grid.shardRegistry()->getNetwork()->getHostName(); + Date_t now = Grid::get(txn)->getNetwork()->now(); + const std::string hostName = Grid::get(txn)->getNetwork()->getHostName(); const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen(); ChangeLogType changeLog; @@ -1111,7 +1111,7 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam coll.setNs(ns); coll.setDropped(true); coll.setEpoch(ChunkVersion::DROPPED().epoch()); - coll.setUpdatedAt(grid.shardRegistry()->getNetwork()->now()); + coll.setUpdatedAt(Grid::get(txn)->getNetwork()->now()); result = updateCollection(txn, ns.ns(), coll); if (!result.isOK()) { diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 288ee4a9b34..69e3667b49a 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -346,9 +346,6 @@ private: // True if startup() has been called. bool _started = false; // (M) - // Last known highest opTime from the config server. - repl::OpTime _configOpTime; // (M) - // Whether the logAction call should attempt to create the actionlog collection AtomicInt32 _actionLogCollectionCreated{0}; // (S) diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp index ba9e5310f86..62dfa3e2633 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp @@ -219,7 +219,7 @@ protected: // Expect the change log operation expectChangeLogInsert(configHost, - shardRegistry()->getNetwork()->now(), + network()->now(), "addShard", "", BSON("name" << shardName << "host" << shardHost)); diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp index c077e869fde..f6f2a517486 100644 --- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp +++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp @@ -42,12 +42,15 @@ #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/network_test_env.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager_mock.h" #include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" #include "mongo/s/client/shard_factory_mock.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/write_ops/batched_update_request.h" #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" @@ -89,7 +92,8 @@ public: } std::shared_ptr<RemoteCommandTargeterMock> targeter() { - return RemoteCommandTargeterMock::get(_shardRegistry->getConfigShard()->getTargeter()); + return RemoteCommandTargeterMock::get( + grid.shardRegistry()->getConfigShard()->getTargeter()); } DistLockCatalogImpl* catalog() { @@ -97,10 +101,10 @@ public: } // Not thread safe - void shutdownShardRegistry() { + void shutdownExecutor() { if (!_shutdownCalled) { _shutdownCalled = true; - _shardRegistry->shutdown(); + grid.getExecutorPool()->shutdownAndJoin(); } } @@ -121,19 +125,27 @@ private: auto executorPool = stdx::make_unique<executor::TaskExecutorPool>(); executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExecutor)); + executorPool->startup(); ConnectionString configCS(HostAndPort("dummy:1234")); - _shardRegistry = stdx::make_unique<ShardRegistry>( - stdx::make_unique<ShardFactoryMock>(), std::move(executorPool), network, configCS); - _shardRegistry->startup(); + auto shardRegistry = + stdx::make_unique<ShardRegistry>(stdx::make_unique<ShardFactoryMock>(), configCS); - _distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(_shardRegistry.get()); + _distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry.get()); + + grid.init(stdx::make_unique<CatalogManagerMock>(), + stdx::make_unique<CatalogCache>(), + std::move(shardRegistry), + std::unique_ptr<ClusterCursorManager>{nullptr}, + std::move(executorPool), + network); targeter()->setFindHostReturnValue(dummyHost); } void tearDown() override { - shutdownShardRegistry(); + shutdownExecutor(); + grid.clearForUnitTests(); } bool _shutdownCalled = false; @@ -142,7 +154,6 @@ private: CatalogManagerMock _catalogMgr; - std::unique_ptr<ShardRegistry> _shardRegistry; std::unique_ptr<DistLockCatalogImpl> _distLockCatalog; OperationContextNoop _txn; }; @@ -198,7 +209,7 @@ TEST_F(DistLockCatalogFixture, PingTargetError) { } TEST_F(DistLockCatalogFixture, PingRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->ping(txn(), "abcd", Date_t::now()); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); @@ -442,7 +453,7 @@ TEST_F(DistLockCatalogFixture, GrabLockTargetError) { } TEST_F(DistLockCatalogFixture, GrabLockRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->grabLock(txn(), "", OID::gen(), "", "", Date_t::now(), "").getStatus(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); @@ -761,7 +772,7 @@ TEST_F(DistLockCatalogFixture, OvertakeLockTargetError) { } TEST_F(DistLockCatalogFixture, OvertakeLockRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->overtakeLock(txn(), "", OID(), OID(), "", "", Date_t::now(), "").getStatus(); @@ -936,7 +947,7 @@ TEST_F(DistLockCatalogFixture, UnlockTargetError) { } TEST_F(DistLockCatalogFixture, UnlockRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->unlock(txn(), OID()); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); @@ -1144,7 +1155,7 @@ TEST_F(DistLockCatalogFixture, GetServerTargetError) { } TEST_F(DistLockCatalogFixture, GetServerRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->getServerInfo(txn()).getStatus(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); @@ -1290,7 +1301,7 @@ TEST_F(DistLockCatalogFixture, StopPingTargetError) { } TEST_F(DistLockCatalogFixture, StopPingRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->stopPing(txn(), ""); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); @@ -1435,7 +1446,7 @@ TEST_F(DistLockCatalogFixture, GetPingTargetError) { } TEST_F(DistLockCatalogFixture, GetPingRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->getPing(txn(), "").getStatus(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); @@ -1520,7 +1531,7 @@ TEST_F(DistLockCatalogFixture, GetLockByTSTargetError) { } TEST_F(DistLockCatalogFixture, GetLockByTSRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->getLockByTS(txn(), OID()).getStatus(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); ASSERT_FALSE(status.reason().empty()); @@ -1607,7 +1618,7 @@ TEST_F(DistLockCatalogFixture, GetLockByNameTargetError) { } TEST_F(DistLockCatalogFixture, GetLockByNameRunCmdError) { - shutdownShardRegistry(); + shutdownExecutor(); auto status = catalog()->getLockByName(txn(), "x").getStatus(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 007942c9f06..cbbf3e18bc0 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -170,12 +170,8 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kWriteConcernErrors{ ErrorCodes::CannotSatisfyWriteConcern}; ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, - std::unique_ptr<executor::TaskExecutorPool> executorPool, - executor::NetworkInterface* network, ConnectionString configServerCS) - : _shardFactory(std::move(shardFactory)), - _executorPool(std::move(executorPool)), - _network(network) { + : _shardFactory(std::move(shardFactory)) { updateConfigServerConnectionString(configServerCS); } @@ -198,14 +194,6 @@ void ShardRegistry::_updateConfigServerConnectionString_inlock(ConnectionString _addConfigShard_inlock(); } -void ShardRegistry::startup() { - _executorPool->startup(); -} - -void ShardRegistry::shutdown() { - _executorPool->shutdownAndJoin(); -} - bool ShardRegistry::reload(OperationContext* txn) { stdx::unique_lock<stdx::mutex> lk(_mutex); @@ -410,11 +398,6 @@ void ShardRegistry::toBSON(BSONObjBuilder* result) { } } -void ShardRegistry::appendConnectionStats(executor::ConnectionPoolStats* stats) const { - // Get stats from the pool of task executors, including fixed executor within. - _executorPool->appendConnectionStats(stats); -} - void ShardRegistry::_addConfigShard_inlock() { _addShard_inlock("config", _configServerCS); } @@ -579,7 +562,7 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig( findCmdBuilder.append(LiteParsedQuery::cmdOptionMaxTimeMS, durationCount<Milliseconds>(maxTime)); - QueryFetcher fetcher(_executorPool->getFixedExecutor(), + QueryFetcher fetcher(Grid::get(txn)->getExecutorPool()->getFixedExecutor(), host.getValue(), nss, findCmdBuilder.done(), @@ -636,7 +619,7 @@ StatusWith<BSONObj> ShardRegistry::runIdempotentCommandOnShard( const std::string& dbName, const BSONObj& cmdObj) { auto response = _runCommandWithRetries(txn, - _executorPool->getFixedExecutor(), + Grid::get(txn)->getExecutorPool()->getFixedExecutor(), shard, readPref, dbName, @@ -672,7 +655,7 @@ StatusWith<BSONObj> ShardRegistry::runIdempotentCommandOnConfig( const BSONObj& cmdObj) { auto response = _runCommandWithRetries( txn, - _executorPool->getFixedExecutor(), + Grid::get(txn)->getExecutorPool()->getFixedExecutor(), getConfigShard(), readPref, dbName, @@ -693,7 +676,7 @@ StatusWith<BSONObj> ShardRegistry::runCommandOnConfigWithRetries( const BSONObj& cmdObj, const ShardRegistry::ErrorCodesSet& errorsToCheck) { auto response = _runCommandWithRetries(txn, - _executorPool->getFixedExecutor(), + Grid::get(txn)->getExecutorPool()->getFixedExecutor(), getConfigShard(), ReadPreferenceSetting{ReadPreference::PrimaryOnly}, dbname, diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 8c16ce310a5..866d0536625 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -86,42 +86,12 @@ public: * Instantiates a new shard registry. * * @param shardFactory Makes shards - * @param commandRunner Command runner for executing commands against hosts - * @param executor Asynchronous task executor to use for making calls to shards and - * config servers. - * @param network Network interface backing executor. * @param configServerCS ConnectionString used for communicating with the config servers */ - ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, - std::unique_ptr<executor::TaskExecutorPool> executorPool, - executor::NetworkInterface* network, - ConnectionString configServerCS); + ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, ConnectionString configServerCS); ~ShardRegistry(); - /** - * Invokes the executor's startup method, which will start any networking/async execution - * threads. - */ - void startup(); - - /** - * Stops the executor thread and waits for it to join. - */ - void shutdown(); - - executor::TaskExecutor* getExecutor() const { - return _executorPool->getFixedExecutor(); - } - - executor::TaskExecutorPool* getExecutorPool() const { - return _executorPool.get(); - } - - executor::NetworkInterface* getNetwork() const { - return _network; - } - ConnectionString getConfigServerConnectionString() const; /** @@ -206,11 +176,6 @@ public: void toBSON(BSONObjBuilder* result); /** - * Append information about the sharding subsystem's connection pools. - */ - void appendConnectionStats(executor::ConnectionPoolStats* stats) const; - - /** * Executes 'find' command against a config server matching the given read preference, and * fetches *all* the results that the host will return until there are no more or until an error * is returned. @@ -369,15 +334,6 @@ private: // to access outside of _mutex. const std::unique_ptr<ShardFactory> _shardFactory; - // Executor pool for scheduling work and remote commands to shards and config servers. Each - // contained executor has a connection hook set on it for initialization sharding data on shards - // and detecting if the catalog manager needs swapping. - const std::unique_ptr<executor::TaskExecutorPool> _executorPool; - - // Network interface being used by _executor. Used for asking questions about the network - // configuration, such as getting the current server's hostname. - executor::NetworkInterface* const _network; - // Protects the _reloadState, config server connections string, and the lookup maps below. mutable stdx::mutex _mutex; diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index e81f698ea53..eba2fb46523 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -205,7 +205,7 @@ public: if (!needSplit) { invariant(shardResults.size() == 1); invariant(shardResults[0].target.getServers().size() == 1); - auto executorPool = grid.shardRegistry()->getExecutorPool(); + auto executorPool = grid.getExecutorPool(); const BSONObj reply = uassertStatusOK(storePossibleCursor(shardResults[0].target.getServers()[0], shardResults[0].result, @@ -411,7 +411,7 @@ BSONObj PipelineCommand::aggRunCommand(DBClientBase* conn, throw RecvStaleConfigException("command failed because of stale config", result); } - auto executorPool = grid.shardRegistry()->getExecutorPool(); + auto executorPool = grid.getExecutorPool(); result = uassertStatusOK(storePossibleCursor(HostAndPort(cursor->originalHost()), result, executorPool->getArbitraryExecutor(), diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 1d533d1dd36..1af094c366f 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -112,7 +112,7 @@ bool cursorCommandPassthrough(OperationContext* txn, StatusWith<BSONObj> transformedResponse = storePossibleCursor(HostAndPort(cursor->originalHost()), response, - grid.shardRegistry()->getExecutorPool()->getArbitraryExecutor(), + grid.getExecutorPool()->getArbitraryExecutor(), grid.getCursorManager()); if (!transformedResponse.isOK()) { return Command::appendCommandStatus(*out, transformedResponse.getStatus()); diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 9d36641bd05..3f6ff1f06bb 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -32,6 +32,8 @@ #include "mongo/s/grid.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/task_executor_pool.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/client/shard_registry.h" @@ -44,7 +46,7 @@ namespace mongo { // Global grid instance Grid grid; -Grid::Grid() : _allowLocalShard(true) {} +Grid::Grid() : _network(nullptr), _allowLocalShard(true) {} Grid::~Grid() = default; @@ -55,16 +57,22 @@ Grid* Grid::get(OperationContext* operationContext) { void Grid::init(std::unique_ptr<CatalogManager> catalogManager, std::unique_ptr<CatalogCache> catalogCache, std::unique_ptr<ShardRegistry> shardRegistry, - std::unique_ptr<ClusterCursorManager> cursorManager) { + std::unique_ptr<ClusterCursorManager> cursorManager, + std::unique_ptr<executor::TaskExecutorPool> executorPool, + executor::NetworkInterface* network) { invariant(!_catalogManager); invariant(!_catalogCache); invariant(!_shardRegistry); invariant(!_cursorManager); + invariant(!_executorPool); + invariant(!_network); _catalogManager = std::move(catalogManager); _catalogCache = std::move(catalogCache); _shardRegistry = std::move(shardRegistry); _cursorManager = std::move(cursorManager); + _executorPool = std::move(executorPool); + _network = network; } bool Grid::allowLocalHost() const { @@ -87,6 +95,8 @@ void Grid::clearForUnitTests() { _catalogCache.reset(); _shardRegistry.reset(); _cursorManager.reset(); + _executorPool.reset(); + _network = nullptr; _configOpTime = repl::OpTime(); } diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 7c96ab4e5cb..f0746b5f142 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -40,6 +40,11 @@ class ClusterCursorManager; class OperationContext; class ShardRegistry; +namespace executor { +class NetworkInterface; +class TaskExecutorPool; +} // namespace executor + /** * Holds the global sharding context. Single instance exists for a running server. Exists on * both MongoD and MongoS. @@ -64,7 +69,9 @@ public: void init(std::unique_ptr<CatalogManager> catalogManager, std::unique_ptr<CatalogCache> catalogCache, std::unique_ptr<ShardRegistry> shardRegistry, - std::unique_ptr<ClusterCursorManager> cursorManager); + std::unique_ptr<ClusterCursorManager> cursorManager, + std::unique_ptr<executor::TaskExecutorPool> executorPool, + executor::NetworkInterface* network); /** * @return true if shards and config servers are allowed to use 'localhost' in address @@ -96,6 +103,14 @@ public: return _cursorManager.get(); } + executor::TaskExecutorPool* getExecutorPool() { + return _executorPool.get(); + } + + executor::NetworkInterface* getNetwork() { + return _network; + } + repl::OpTime configOpTime() const { stdx::lock_guard<stdx::mutex> lk(_mutex); return _configOpTime; @@ -120,6 +135,14 @@ private: std::unique_ptr<ShardRegistry> _shardRegistry; std::unique_ptr<ClusterCursorManager> _cursorManager; + // Executor pool for scheduling work and remote commands to shards and config servers. Each + // contained executor has a connection hook set on it for sending/receiving sharding metadata. + std::unique_ptr<executor::TaskExecutorPool> _executorPool; + + // Network interface being used by the fixed executor in _executorPool. Used for asking + // questions about the network configuration, such as getting the current server's hostname. + executor::NetworkInterface* _network; + // Protects _configOpTime. mutable stdx::mutex _mutex; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 17826b9a0da..43dab10205e 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -67,8 +67,6 @@ public: void setUp() override { ShardingTestFixture::setUp(); - executor = shardRegistry()->getExecutor(); - getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345)); configTargeter()->setFindHostReturnValue(kTestConfigShardHost); @@ -123,7 +121,7 @@ protected: params.remotes.emplace_back(shardId, findCmd); } - arm = stdx::make_unique<AsyncResultsMerger>(executor, std::move(params)); + arm = stdx::make_unique<AsyncResultsMerger>(executor(), std::move(params)); } /** @@ -138,7 +136,7 @@ protected: params.remotes.emplace_back(hostIdPair.first, hostIdPair.second); } - arm = stdx::make_unique<AsyncResultsMerger>(executor, std::move(params)); + arm = stdx::make_unique<AsyncResultsMerger>(executor(), std::move(params)); } /** @@ -200,8 +198,6 @@ protected: const NamespaceString _nss; - executor::TaskExecutor* executor; - std::unique_ptr<AsyncResultsMerger> arm; }; @@ -234,7 +230,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); @@ -269,7 +265,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(12), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); @@ -299,7 +295,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { responses.emplace_back(_nss, CursorId(0), batch6); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); @@ -321,7 +317,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { responses.emplace_back(_nss, CursorId(0), batch7); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); @@ -349,7 +345,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) { fromjson("{_id: 8, $sortKey: {'': 8}}")}; responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady())); @@ -386,7 +382,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { fromjson("{$sortKey: {'': 8}}")}; responses.emplace_back(_nss, CursorId(2), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady())); @@ -407,7 +403,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady())); @@ -426,7 +422,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { responses.emplace_back(_nss, CursorId(0), batch5); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady())); @@ -457,7 +453,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { fromjson("{$sortKey: {'': 5, '': 9}}")}; responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"), *unittest::assertGet(arm->nextReady())); @@ -488,7 +484,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); auto statusWithNext = arm->nextReady(); @@ -497,7 +493,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } @@ -528,7 +524,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -551,7 +547,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); @@ -573,7 +569,7 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) { responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -603,7 +599,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -626,7 +622,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); blackHoleNextRequest(); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady())); @@ -643,7 +639,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady())); @@ -670,7 +666,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -689,14 +685,14 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { responses.emplace_back(_nss, CursorId(456), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT(!arm->nextReady().isOK()); // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { @@ -715,7 +711,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3) .toBSON(CursorResponse::ResponseType::InitialResponse); scheduleNetworkResponseObjs({response1, response2, response3}); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); auto statusWithNext = arm->nextReady(); @@ -723,7 +719,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { @@ -742,7 +738,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); auto statusWithNext = arm->nextReady(); @@ -752,7 +748,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { @@ -769,7 +765,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -780,13 +776,13 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, kTestShardIds); - executor->shutdown(); + executor()->shutdown(); ASSERT_NOT_OK(arm->nextEvent().getStatus()); auto killEvent = arm->kill(); ASSERT_FALSE(killEvent.isValid()); @@ -803,7 +799,7 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch blackHoleNextRequest(); // Executor shuts down before a response is received. - executor->shutdown(); + executor()->shutdown(); auto killEvent = arm->kill(); ASSERT_FALSE(killEvent.isValid()); } @@ -820,7 +816,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); - executor->waitForEvent(killedEvent); + executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { @@ -844,7 +840,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { auto killedEvent = arm->kill(); ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); - executor->waitForEvent(killedEvent); + executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { @@ -882,8 +878,8 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { // Ensure that we properly signal both those waiting for the kill, and those waiting for more // results to be ready. - executor->waitForEvent(readyEvent); - executor->waitForEvent(killedEvent); + executor()->waitForEvent(readyEvent); + executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { @@ -928,8 +924,8 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { // Ensure that we properly signal both those waiting for the kill, and those waiting for more // results to be ready. - executor->waitForEvent(readyEvent); - executor->waitForEvent(killedEvent); + executor()->waitForEvent(readyEvent); + executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { @@ -950,7 +946,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { // Attempting to schedule more network operations on a killed arm is an error. ASSERT_NOT_OK(arm->nextEvent().getStatus()); - executor->waitForEvent(killedEvent); + executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillCalledTwice) { @@ -960,8 +956,8 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) { ASSERT(killedEvent1.isValid()); auto killedEvent2 = arm->kill(); ASSERT(killedEvent2.isValid()); - executor->waitForEvent(killedEvent1); - executor->waitForEvent(killedEvent2); + executor()->waitForEvent(killedEvent1); + executor()->waitForEvent(killedEvent2); } TEST_F(AsyncResultsMergerTest, TailableBasic) { @@ -976,7 +972,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -997,7 +993,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { responses.emplace_back(_nss, CursorId(123), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); @@ -1007,7 +1003,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { ASSERT_FALSE(arm->remotesExhausted()); auto killedEvent = arm->kill(); - executor->waitForEvent(killedEvent); + executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { @@ -1023,7 +1019,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { std::vector<BSONObj> batch; responses.emplace_back(_nss, CursorId(123), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); // After receiving an empty batch, the ARM should return boost::none, but remotes should not be // marked as exhausted. @@ -1032,7 +1028,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { ASSERT_FALSE(arm->remotesExhausted()); auto killedEvent = arm->kill(); - executor->waitForEvent(killedEvent); + executor()->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { @@ -1048,7 +1044,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { std::vector<BSONObj> batch; responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); // Afterwards, the ARM should return boost::none and remote cursors should be marked as // exhausted. @@ -1070,7 +1066,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -1091,7 +1087,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { ASSERT_EQ(request.getValue().cursorid, 1LL); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); @@ -1115,7 +1111,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -1143,7 +1139,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(99), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -1164,7 +1160,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.emplace_back(_nss, CursorId(99), batch3); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); @@ -1179,7 +1175,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); @@ -1197,7 +1193,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(98), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -1235,7 +1231,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) { std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -1270,7 +1266,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) { // Protocol is to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { @@ -1319,7 +1315,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); @@ -1343,7 +1339,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - executor->waitForEvent(readyEvent); + executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); @@ -1356,7 +1352,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { @@ -1364,7 +1360,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); auto killEvent = arm->kill(); - executor->waitForEvent(killEvent); + executor()->waitForEvent(killEvent); } } // namespace diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 9b64eca0fb5..692d74e932c 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -209,7 +209,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, } auto ccc = ClusterClientCursorImpl::make( - shardRegistry->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 848b7057cca..4b48cc4eaa4 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -125,7 +125,7 @@ static void cleanupTask() { auto cursorManager = grid.getCursorManager(); cursorManager->shutdown(); - grid.shardRegistry()->shutdown(); + grid.getExecutorPool()->shutdownAndJoin(); grid.catalogManager(txn)->shutDown(txn); } diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 5c005659809..1879fad0ae3 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -151,22 +151,23 @@ Status initializeGlobalShardingState(OperationContext* txn, stdx::make_unique<ShardingNetworkConnectionHook>(), std::move(metadataHook)); auto networkPtr = network.get(); + auto executorPool = makeTaskExecutorPool(std::move(network), isMongos); + executorPool->startup(); + auto shardRegistry( - stdx::make_unique<ShardRegistry>(stdx::make_unique<ShardFactoryImpl>(), - makeTaskExecutorPool(std::move(network), isMongos), - networkPtr, - configCS)); + stdx::make_unique<ShardRegistry>(stdx::make_unique<ShardFactoryImpl>(), configCS)); auto catalogManager = makeCatalogManager(getGlobalServiceContext(), shardRegistry.get(), HostAndPort(getHostName(), serverGlobalParams.port)); - shardRegistry->startup(); - grid.init(std::move(catalogManager), - stdx::make_unique<CatalogCache>(), - std::move(shardRegistry), - stdx::make_unique<ClusterCursorManager>( - getGlobalServiceContext()->getPreciseClockSource())); + grid.init( + std::move(catalogManager), + stdx::make_unique<CatalogCache>(), + std::move(shardRegistry), + stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()), + std::move(executorPool), + networkPtr); while (!inShutdown()) { try { diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 1587bfca09f..8584741c0c5 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -129,20 +129,21 @@ void ShardingTestFixture::setUp() { _configTargeter = configTargeter.get(); _shardFactory->addTargeterToReturn(configCS, std::move(configTargeter)); - auto shardRegistry(stdx::make_unique<ShardRegistry>( - std::move(shardFactory), std::move(executorPool), _mockNetwork, configCS)); - shardRegistry->startup(); + auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); + executorPool->startup(); // For now initialize the global grid object. All sharding objects will be accessible // from there until we get rid of it. grid.init(std::move(cm), stdx::make_unique<CatalogCache>(), std::move(shardRegistry), - stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource())); + stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource()), + std::move(executorPool), + _mockNetwork); } void ShardingTestFixture::tearDown() { - grid.shardRegistry()->shutdown(); + grid.getExecutorPool()->shutdownAndJoin(); grid.catalogManager(_opCtx.get())->shutDown(_opCtx.get()); grid.clearForUnitTests(); @@ -190,6 +191,12 @@ executor::NetworkInterfaceMock* ShardingTestFixture::network() const { return _mockNetwork; } +executor::TaskExecutor* ShardingTestFixture::executor() const { + invariant(_executor); + + return _executor; +} + MessagingPortMock* ShardingTestFixture::getMessagingPort() const { return _messagePort.get(); } @@ -346,7 +353,7 @@ void ShardingTestFixture::expectConfigCollectionInsert(const HostAndPort& config const std::string timePiece = changeId.substr(firstDash + 1, lastDash - firstDash - 1); const std::string oidPiece = changeId.substr(lastDash + 1); - ASSERT_EQUALS(shardRegistry()->getNetwork()->getHostName(), serverPiece); + ASSERT_EQUALS(grid.getNetwork()->getHostName(), serverPiece); ASSERT_EQUALS(timestamp.toString(), timePiece); OID generatedOID; diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h index 6019bfe928d..894d039eb7f 100644 --- a/src/mongo/s/sharding_test_fixture.h +++ b/src/mongo/s/sharding_test_fixture.h @@ -89,6 +89,8 @@ protected: executor::NetworkInterfaceMock* network() const; + executor::TaskExecutor* executor() const; + MessagingPortMock* getMessagingPort() const; DistLockManagerMock* distLock() const; |