From 721d2547c6c2883b522740dc2b7ff420aeebb7e9 Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Fri, 2 Feb 2018 10:00:19 -0500 Subject: SERVER-29908 Move sharding_task_executor under mongo/s --- .../client/embedded/service_context_embedded.cpp | 1 + .../db/auth/authz_manager_external_state_s.cpp | 2 - src/mongo/db/auth/user_cache_invalidator_job.cpp | 1 - src/mongo/db/catalog/collection_catalog_entry.h | 1 + src/mongo/db/commands.h | 1 + src/mongo/db/db.cpp | 1 - src/mongo/db/index/index_descriptor.h | 1 + src/mongo/db/index/sort_key_generator_test.cpp | 2 +- src/mongo/db/keys_collection_client.h | 10 +- src/mongo/db/keys_collection_client_direct.cpp | 11 +- src/mongo/db/keys_collection_client_sharded.cpp | 4 +- src/mongo/db/keys_collection_client_sharded.h | 9 +- src/mongo/db/mongod_options.cpp | 19 +- src/mongo/db/query/parsed_distinct.cpp | 1 + src/mongo/db/repl/topology_coordinator.cpp | 5 +- src/mongo/db/s/SConscript | 15 +- src/mongo/db/s/balancer/balancer.cpp | 1 - .../balancer_chunk_selection_policy_impl.cpp | 1 - .../db/s/balancer/cluster_statistics_impl.cpp | 19 +- src/mongo/db/s/balancer/migration_manager.cpp | 2 +- src/mongo/db/s/balancer/migration_manager_test.cpp | 2 - .../db/s/balancer/scoped_migration_request.cpp | 12 +- src/mongo/db/s/chunk_splitter.cpp | 1 - .../s/config/configsvr_drop_collection_command.cpp | 3 +- .../s/config/configsvr_drop_database_command.cpp | 2 - .../db/s/config/configsvr_move_primary_command.cpp | 8 +- src/mongo/db/s/metadata_manager_test.cpp | 1 - src/mongo/db/s/migration_source_manager.cpp | 2 - src/mongo/db/s/sharding_task_executor.cpp | 207 --------------------- src/mongo/db/s/sharding_task_executor.h | 82 -------- src/mongo/db/server_options.cpp | 2 + src/mongo/db/server_options.h | 1 - src/mongo/db/service_context_d.cpp | 1 + src/mongo/rpc/command_request.cpp | 1 + src/mongo/s/SConscript | 22 ++- .../s/catalog/sharding_catalog_client_impl.cpp | 1 - src/mongo/s/catalog/sharding_catalog_client_mock.h | 1 - src/mongo/s/catalog_cache.cpp | 2 - src/mongo/s/client/version_manager.cpp | 3 - src/mongo/s/commands/cluster_add_shard_cmd.cpp | 29 +-- .../s/commands/cluster_add_shard_to_zone_cmd.cpp | 19 +- .../s/commands/cluster_enable_sharding_cmd.cpp | 28 ++- .../s/commands/cluster_list_databases_cmd.cpp | 45 ++--- src/mongo/s/commands/cluster_list_shards_cmd.cpp | 5 +- .../commands/cluster_user_management_commands.cpp | 4 +- src/mongo/s/commands/cluster_write.cpp | 1 - src/mongo/s/commands/commands_public.cpp | 1 - src/mongo/s/grid.cpp | 3 - src/mongo/s/grid.h | 4 +- src/mongo/s/server.cpp | 10 +- src/mongo/s/sharding_initialization.cpp | 4 +- src/mongo/s/sharding_mongod_test_fixture.h | 12 +- src/mongo/s/sharding_task_executor.cpp | 207 +++++++++++++++++++++ src/mongo/s/sharding_task_executor.h | 82 ++++++++ src/mongo/s/sharding_test_fixture.cpp | 2 +- src/mongo/s/sharding_uptime_reporter.cpp | 1 - src/mongo/transport/service_state_machine_test.cpp | 4 +- src/mongo/util/signal_handlers.cpp | 1 + 58 files changed, 425 insertions(+), 498 deletions(-) delete mode 100644 src/mongo/db/s/sharding_task_executor.cpp delete mode 100644 src/mongo/db/s/sharding_task_executor.h create mode 100644 src/mongo/s/sharding_task_executor.cpp create mode 100644 src/mongo/s/sharding_task_executor.h diff --git a/src/mongo/client/embedded/service_context_embedded.cpp b/src/mongo/client/embedded/service_context_embedded.cpp index 4ab14c35152..597792a7846 100644 --- a/src/mongo/client/embedded/service_context_embedded.cpp +++ b/src/mongo/client/embedded/service_context_embedded.cpp @@ -36,6 +36,7 @@ #include "mongo/client/embedded/service_entry_point_embedded.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/operation_context.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_engine_lock_file.h" #include "mongo/db/storage/storage_engine_metadata.h" diff --git a/src/mongo/db/auth/authz_manager_external_state_s.cpp b/src/mongo/db/auth/authz_manager_external_state_s.cpp index 2fb91f5b8ea..53700bb9ac9 100644 --- a/src/mongo/db/auth/authz_manager_external_state_s.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_s.cpp @@ -44,14 +44,12 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/stringutils.h" namespace mongo { - namespace { /** diff --git a/src/mongo/db/auth/user_cache_invalidator_job.cpp b/src/mongo/db/auth/user_cache_invalidator_job.cpp index 81602d77a14..8d6a690f19e 100644 --- a/src/mongo/db/auth/user_cache_invalidator_job.cpp +++ b/src/mongo/db/auth/user_cache_invalidator_job.cpp @@ -41,7 +41,6 @@ #include "mongo/db/commands.h" #include "mongo/db/server_parameters.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/grid.h" #include "mongo/stdx/mutex.h" #include "mongo/util/background.h" diff --git a/src/mongo/db/catalog/collection_catalog_entry.h b/src/mongo/db/catalog/collection_catalog_entry.h index 805bd130c46..ca07d2b17bc 100644 --- a/src/mongo/db/catalog/collection_catalog_entry.h +++ b/src/mongo/db/catalog/collection_catalog_entry.h @@ -40,6 +40,7 @@ namespace mongo { +class Collection; class IndexDescriptor; class OperationContext; diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 7986cc6fe08..48cce22e3a6 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -41,6 +41,7 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/explain.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/reply_builder_interface.h" #include "mongo/stdx/functional.h" diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 14c628b5d35..19ffd88d700 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -110,7 +110,6 @@ #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state_recovery.h" -#include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" diff --git a/src/mongo/db/index/index_descriptor.h b/src/mongo/db/index/index_descriptor.h index 5ba231f7664..3fae7128915 100644 --- a/src/mongo/db/index/index_descriptor.h +++ b/src/mongo/db/index/index_descriptor.h @@ -45,6 +45,7 @@ class Collection; class IndexCatalog; class IndexCatalogEntry; class IndexCatalogEntryContainer; +class OperationContext; /** * A cache of information computed from the memory-mapped per-index data (OnDiskIndexData). diff --git a/src/mongo/db/index/sort_key_generator_test.cpp b/src/mongo/db/index/sort_key_generator_test.cpp index 854bb912f6e..f541919836d 100644 --- a/src/mongo/db/index/sort_key_generator_test.cpp +++ b/src/mongo/db/index/sort_key_generator_test.cpp @@ -28,8 +28,8 @@ #include "mongo/platform/basic.h" +#include "mongo/bson/json.h" #include "mongo/db/index/sort_key_generator.h" - #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" diff --git a/src/mongo/db/keys_collection_client.h b/src/mongo/db/keys_collection_client.h index 4e93343ff3f..44cd5e2dd0b 100644 --- a/src/mongo/db/keys_collection_client.h +++ b/src/mongo/db/keys_collection_client.h @@ -28,17 +28,17 @@ #pragma once -#include -#include +#include -#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/base/string_data.h" #include "mongo/db/keys_collection_document.h" namespace mongo { -class OperationContext; -class LogicalTime; class BSONObj; +class LogicalTime; +class OperationContext; class KeysCollectionClient { public: diff --git a/src/mongo/db/keys_collection_client_direct.cpp b/src/mongo/db/keys_collection_client_direct.cpp index 4148b589bab..8ffc75f29ef 100644 --- a/src/mongo/db/keys_collection_client_direct.cpp +++ b/src/mongo/db/keys_collection_client_direct.cpp @@ -30,11 +30,11 @@ #include "mongo/platform/basic.h" +#include "mongo/db/keys_collection_client_direct.h" + #include #include -#include "mongo/db/keys_collection_client_direct.h" - #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/read_preference.h" @@ -43,7 +43,6 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/logical_time.h" #include "mongo/db/operation_context.h" -#include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" @@ -52,8 +51,8 @@ #include "mongo/util/log.h" namespace mongo { - namespace { + const int kOnErrorNumRetries = 3; bool isRetriableError(ErrorCodes::Error code, Shard::RetryPolicy options) { @@ -68,7 +67,8 @@ bool isRetriableError(ErrorCodes::Error code, Shard::RetryPolicy options) { return false; } } -} + +} // namespace KeysCollectionClientDirect::KeysCollectionClientDirect() : _rsLocalClient() {} @@ -165,4 +165,5 @@ Status KeysCollectionClientDirect::insertNewKey(OperationContext* opCtx, const B return _insert( opCtx, KeysCollectionDocument::ConfigNS, doc, ShardingCatalogClient::kMajorityWriteConcern); } + } // namespace mongo diff --git a/src/mongo/db/keys_collection_client_sharded.cpp b/src/mongo/db/keys_collection_client_sharded.cpp index 2f5bb8d6c9c..4b2cb51ce24 100644 --- a/src/mongo/db/keys_collection_client_sharded.cpp +++ b/src/mongo/db/keys_collection_client_sharded.cpp @@ -29,12 +29,11 @@ #include "mongo/platform/basic.h" #include "mongo/db/keys_collection_client_sharded.h" + #include "mongo/s/catalog/sharding_catalog_client.h" namespace mongo { -namespace {} // namespace - KeysCollectionClientSharded::KeysCollectionClientSharded(ShardingCatalogClient* client) : _catalogClient(client) {} @@ -50,4 +49,5 @@ Status KeysCollectionClientSharded::insertNewKey(OperationContext* opCtx, const return _catalogClient->insertConfigDocument( opCtx, KeysCollectionDocument::ConfigNS, doc, ShardingCatalogClient::kMajorityWriteConcern); } + } // namespace mongo diff --git a/src/mongo/db/keys_collection_client_sharded.h b/src/mongo/db/keys_collection_client_sharded.h index 2ced14f2135..b3f337e60ff 100644 --- a/src/mongo/db/keys_collection_client_sharded.h +++ b/src/mongo/db/keys_collection_client_sharded.h @@ -28,22 +28,16 @@ #pragma once -#include -#include - -#include "mongo/base/status.h" #include "mongo/db/keys_collection_client.h" namespace mongo { -class OperationContext; -class LogicalTime; -class BSONObj; class ShardingCatalogClient; class KeysCollectionClientSharded : public KeysCollectionClient { public: KeysCollectionClientSharded(ShardingCatalogClient*); + /** * Returns keys for the given purpose and with an expiresAt value greater than newerThanThis. */ @@ -62,4 +56,5 @@ public: private: ShardingCatalogClient* const _catalogClient; }; + } // namespace mongo diff --git a/src/mongo/db/mongod_options.cpp b/src/mongo/db/mongod_options.cpp index 67b8b63545e..f77dce0fcc5 100644 --- a/src/mongo/db/mongod_options.cpp +++ b/src/mongo/db/mongod_options.cpp @@ -44,7 +44,6 @@ #include "mongo/db/server_options.h" #include "mongo/db/server_options_helpers.h" #include "mongo/db/storage/mmap_v1/mmap_v1_options.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/ssl_options.h" @@ -56,8 +55,6 @@ namespace mongo { using std::cout; using std::endl; -using std::string; - MongodGlobalParams mongodGlobalParams; @@ -640,7 +637,7 @@ Status validateMongodOptions(const moe::Environment& params) { #ifdef _WIN32 if (params.count("install") || params.count("reinstall")) { if (params.count("storage.dbPath") && - !boost::filesystem::path(params["storage.dbPath"].as()).is_absolute()) { + !boost::filesystem::path(params["storage.dbPath"].as()).is_absolute()) { return Status(ErrorCodes::BadValue, "dbPath requires an absolute file path with Windows services"); } @@ -943,12 +940,12 @@ Status storeMongodOptions(const moe::Environment& params) { } if (params.count("storage.engine")) { - storageGlobalParams.engine = params["storage.engine"].as(); + storageGlobalParams.engine = params["storage.engine"].as(); storageGlobalParams.engineSetByUser = true; } if (params.count("storage.dbPath")) { - storageGlobalParams.dbpath = params["storage.dbPath"].as(); + storageGlobalParams.dbpath = params["storage.dbPath"].as(); if (params.count("processManagement.fork") && storageGlobalParams.dbpath[0] != '/') { // we need to change dbpath if we fork since we change // cwd to "/" @@ -1116,17 +1113,17 @@ Status storeMongodOptions(const moe::Environment& params) { } if (params.count("source")) { /* specifies what the source in local.sources should be */ - replSettings.setSource(params["source"].as().c_str()); + replSettings.setSource(params["source"].as().c_str()); } if (params.count("pretouch")) { replSettings.setPretouch(params["pretouch"].as()); } if (params.count("replication.replSetName")) { - replSettings.setReplSetString(params["replication.replSetName"].as().c_str()); + replSettings.setReplSetString(params["replication.replSetName"].as().c_str()); } if (params.count("replication.replSet")) { /* seed list of hosts for the repl set */ - replSettings.setReplSetString(params["replication.replSet"].as().c_str()); + replSettings.setReplSetString(params["replication.replSet"].as().c_str()); } if (params.count("replication.secondaryIndexPrefetch")) { replSettings.setPrefetchIndexMode( @@ -1146,7 +1143,7 @@ Status storeMongodOptions(const moe::Environment& params) { } if (params.count("only")) { - replSettings.setOnly(params["only"].as().c_str()); + replSettings.setOnly(params["only"].as().c_str()); } if (params.count("storage.mmapv1.nsSize")) { int x = params["storage.mmapv1.nsSize"].as(); @@ -1255,7 +1252,7 @@ Status storeMongodOptions(const moe::Environment& params) { // needs to be after things like --configsvr parsing, thus here. if (params.count("storage.repairPath")) { - storageGlobalParams.repairpath = params["storage.repairPath"].as(); + storageGlobalParams.repairpath = params["storage.repairPath"].as(); if (!storageGlobalParams.repairpath.size()) { return Status(ErrorCodes::BadValue, "repairpath is empty"); } diff --git a/src/mongo/db/query/parsed_distinct.cpp b/src/mongo/db/query/parsed_distinct.cpp index 7fbb38648b2..8ff2277bee6 100644 --- a/src/mongo/db/query/parsed_distinct.cpp +++ b/src/mongo/db/query/parsed_distinct.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/query_request.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/stdx/memory.h" #include "mongo/util/mongoutils/str.h" diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 19c5293c0cc..e8b3710f283 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -35,6 +35,7 @@ #include #include +#include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/audit.h" #include "mongo/db/client.h" #include "mongo/db/mongod_options.h" @@ -62,7 +63,7 @@ namespace mongo { namespace repl { -using std::vector; + const Seconds TopologyCoordinator::VoteLease::leaseTime = Seconds(30); // Controls how caught up in replication a secondary with higher priority than the current primary @@ -1995,7 +1996,7 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu BSONObjBuilder* response, Status* result) { // output for each member - vector membersOut; + std::vector membersOut; const MemberState myState = getMemberState(); const Date_t now = rsStatusArgs.now; const OpTime lastOpApplied = getMyLastAppliedOpTime(); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index b5c82cdae1d..fd53ece397b 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -19,19 +19,6 @@ env.Library( ], ) -env.Library( - target='sharding_task_executor', - source=[ - 'sharding_task_executor.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/s/client/sharding_client', - '$BUILD_DIR/mongo/s/cluster_last_error_info', - ], -) - env.Library( target='migration_types', source=[ @@ -106,11 +93,11 @@ env.Library( '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/mongo/s/sharding_initialization', + '$BUILD_DIR/mongo/s/sharding_task_executor', '$BUILD_DIR/mongo/util/elapsed_tracker', 'balancer', 'collection_metadata', 'migration_types', - 'sharding_task_executor', 'type_shard_identity', ], LIBDEPS_TAGS=[ diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index d537260c666..4b6c79c343c 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -44,7 +44,6 @@ #include "mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h" #include "mongo/db/s/balancer/cluster_statistics_impl.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index 5e89baca4e1..499c7605924 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -36,7 +36,6 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj_comparator_interface.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_tags.h" diff --git a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp index f0f50c1c09f..c23b6b41e32 100644 --- a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp +++ b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp @@ -35,7 +35,6 @@ #include "mongo/base/status_with.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/read_preference.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -44,10 +43,6 @@ #include "mongo/util/mongoutils/str.h" namespace mongo { - -using std::string; -using std::vector; - namespace { const char kVersionField[] = "version"; @@ -60,7 +55,7 @@ const char kVersionField[] = "version"; * ShardNotFound if shard by that id is not available on the registry * NoSuchKey if the version could not be retrieved */ -StatusWith retrieveShardMongoDVersion(OperationContext* opCtx, ShardId shardId) { +StatusWith retrieveShardMongoDVersion(OperationContext* opCtx, ShardId shardId) { auto shardRegistry = Grid::get(opCtx)->shardRegistry(); auto shardStatus = shardRegistry->getShard(opCtx, shardId); if (!shardStatus.isOK()) { @@ -83,7 +78,7 @@ StatusWith retrieveShardMongoDVersion(OperationContext* opCtx, ShardId s BSONObj serverStatus = std::move(commandResponse.getValue().response); - string version; + std::string version; Status status = bsonExtractStringField(serverStatus, kVersionField, &version); if (!status.isOK()) { return status; @@ -100,7 +95,7 @@ ClusterStatisticsImpl::ClusterStatisticsImpl() = default; ClusterStatisticsImpl::~ClusterStatisticsImpl() = default; -StatusWith> ClusterStatisticsImpl::getStats(OperationContext* opCtx) { +StatusWith> ClusterStatisticsImpl::getStats(OperationContext* opCtx) { // Get a list of all the shards that are participating in this balance round along with any // maximum allowed quotas and current utilization. We get the latter by issuing // db.serverStatus() (mem.mapped) to all shards. @@ -112,9 +107,9 @@ StatusWith> ClusterStatisticsImpl::getStats(OperationCon return shardsStatus.getStatus(); } - const vector shards(std::move(shardsStatus.getValue().value)); + const auto& shards = shardsStatus.getValue().value; - vector stats; + std::vector stats; for (const auto& shard : shards) { const auto shardSizeStatus = [&]() -> StatusWith { @@ -133,7 +128,7 @@ StatusWith> ClusterStatisticsImpl::getStats(OperationCon << shard.getName()); } - string mongoDVersion; + std::string mongoDVersion; auto mongoDVersionStatus = retrieveShardMongoDVersion(opCtx, shard.getName()); if (mongoDVersionStatus.isOK()) { @@ -145,7 +140,7 @@ StatusWith> ClusterStatisticsImpl::getStats(OperationCon << causedBy(mongoDVersionStatus.getStatus()); } - std::set shardTags; + std::set shardTags; for (const auto& shardTag : shard.getTags()) { shardTags.insert(shardTag); diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index ce7ed13d743..542b2410810 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -44,7 +44,6 @@ #include "mongo/db/s/balancer/type_migration.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -64,6 +63,7 @@ using str::stream; namespace { const char kChunkTooBig[] = "chunkTooBig"; // TODO: delete in 3.8 + const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(15)); diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp index 515ebceb7b2..72a32c50627 100644 --- a/src/mongo/db/s/balancer/migration_manager_test.cpp +++ b/src/mongo/db/s/balancer/migration_manager_test.cpp @@ -29,13 +29,11 @@ #include "mongo/platform/basic.h" #include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/s/balancer/migration_manager.h" #include "mongo/db/s/balancer/type_migration.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" -#include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_locks.h" diff --git a/src/mongo/db/s/balancer/scoped_migration_request.cpp b/src/mongo/db/s/balancer/scoped_migration_request.cpp index 6cf19c1f4bf..74511ab1823 100644 --- a/src/mongo/db/s/balancer/scoped_migration_request.cpp +++ b/src/mongo/db/s/balancer/scoped_migration_request.cpp @@ -32,22 +32,21 @@ #include "mongo/db/s/balancer/scoped_migration_request.h" -#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/balancer/type_migration.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" namespace mongo { - namespace { + const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(15)); const int kDuplicateKeyErrorMaxRetries = 2; -} + +} // namespace ScopedMigrationRequest::ScopedMigrationRequest(OperationContext* opCtx, const NamespaceString& nss, @@ -93,12 +92,13 @@ ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest StatusWith ScopedMigrationRequest::writeMigration( OperationContext* opCtx, const MigrateInfo& migrateInfo, bool waitForDelete) { + auto const grid = Grid::get(opCtx); // Try to write a unique migration document to config.migrations. const MigrationType migrationType(migrateInfo, waitForDelete); for (int retry = 0; retry < kDuplicateKeyErrorMaxRetries; ++retry) { - Status result = grid.catalogClient()->insertConfigDocument( + Status result = grid->catalogClient()->insertConfigDocument( opCtx, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern); if (result == ErrorCodes::DuplicateKey) { @@ -106,7 +106,7 @@ StatusWith ScopedMigrationRequest::writeMigration( // for the request because this migration request will join the active one once // scheduled. auto statusWithMigrationQueryResult = - grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + grid->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index f36ed19a0ea..d314b9c5f84 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -42,7 +42,6 @@ #include "mongo/db/s/split_chunk.h" #include "mongo/db/s/split_vector.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/chunk_manager.h" diff --git a/src/mongo/db/s/config/configsvr_drop_collection_command.cpp b/src/mongo/db/s/config/configsvr_drop_collection_command.cpp index f9689722426..bffad346dc6 100644 --- a/src/mongo/db/s/config/configsvr_drop_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_drop_collection_command.cpp @@ -35,7 +35,6 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/s/catalog/dist_lock_manager.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache.h" @@ -48,7 +47,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { - namespace { MONGO_FP_DECLARE(setDropCollDistLockWait); @@ -205,6 +203,7 @@ private: shardId, cmdDropResult.response["writeConcernError"], *result); } }; + } configsvrDropCollectionCmd; } // namespace diff --git a/src/mongo/db/s/config/configsvr_drop_database_command.cpp b/src/mongo/db/s/config/configsvr_drop_database_command.cpp index 8a9d35df9e8..5a39eee3174 100644 --- a/src/mongo/db/s/config/configsvr_drop_database_command.cpp +++ b/src/mongo/db/s/config/configsvr_drop_database_command.cpp @@ -34,11 +34,9 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/s/catalog/dist_lock_manager.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/s/config/configsvr_move_primary_command.cpp b/src/mongo/db/s/config/configsvr_move_primary_command.cpp index 3ff60940242..ad02277cc69 100644 --- a/src/mongo/db/s/config/configsvr_move_primary_command.cpp +++ b/src/mongo/db/s/config/configsvr_move_primary_command.cpp @@ -40,7 +40,6 @@ #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache.h" @@ -52,9 +51,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { - -using std::string; - namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, @@ -118,7 +114,7 @@ public: auto movePrimaryRequest = MovePrimary::parse(IDLParserErrorContext("ConfigSvrMovePrimary"), cmdObj); - const string dbname = parseNs("", cmdObj); + const auto dbname = parseNs("", cmdObj); uassert( ErrorCodes::InvalidNamespace, @@ -249,7 +245,7 @@ public: // reload catalogCache->purgeDatabase(dbname); - const string oldPrimary = fromShard->getConnString().toString(); + const auto oldPrimary = fromShard->getConnString().toString(); ScopedDbConnection fromconn(fromShard->getConnString()); ON_BLOCK_EXIT([&fromconn] { fromconn.done(); }); diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 040d4638c70..c66835e5135 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -40,7 +40,6 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 6c831b1a0ae..fb1ebee0293 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -45,11 +45,9 @@ #include "mongo/db/s/sharding_statistics.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_shard_collection.h" #include "mongo/s/catalog_cache_loader.h" -#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/commit_chunk_migration_request_type.h" #include "mongo/s/request_types/set_shard_version_request.h" diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp deleted file mode 100644 index 5ba9ac969d1..00000000000 --- a/src/mongo/db/s/sharding_task_executor.cpp +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/sharding_task_executor.h" - -#include "mongo/base/disallow_copying.h" -#include "mongo/base/status_with.h" -#include "mongo/bson/timestamp.h" -#include "mongo/db/logical_time.h" -#include "mongo/db/operation_time_tracker.h" -#include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/rpc/metadata/sharding_metadata.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/cluster_last_error_info.h" -#include "mongo/s/grid.h" -#include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace executor { - -namespace { -const std::string kOperationTimeField = "operationTime"; -} - -ShardingTaskExecutor::ShardingTaskExecutor(std::unique_ptr executor) - : _executor(std::move(executor)) {} - -void ShardingTaskExecutor::startup() { - _executor->startup(); -} - -void ShardingTaskExecutor::shutdown() { - _executor->shutdown(); -} - -void ShardingTaskExecutor::join() { - _executor->join(); -} - -void ShardingTaskExecutor::appendDiagnosticBSON(mongo::BSONObjBuilder* builder) const { - _executor->appendDiagnosticBSON(builder); -} - -Date_t ShardingTaskExecutor::now() { - return _executor->now(); -} - -StatusWith ShardingTaskExecutor::makeEvent() { - return _executor->makeEvent(); -} - -void ShardingTaskExecutor::signalEvent(const EventHandle& event) { - return _executor->signalEvent(event); -} - -StatusWith ShardingTaskExecutor::onEvent(const EventHandle& event, - const CallbackFn& work) { - return _executor->onEvent(event, work); -} - -void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { - _executor->waitForEvent(event); -} - -StatusWith ShardingTaskExecutor::waitForEvent(OperationContext* opCtx, - const EventHandle& event, - Date_t deadline) { - return _executor->waitForEvent(opCtx, event, deadline); -} - -StatusWith ShardingTaskExecutor::scheduleWork( - const CallbackFn& work) { - return _executor->scheduleWork(work); -} - -StatusWith ShardingTaskExecutor::scheduleWorkAt( - Date_t when, const CallbackFn& work) { - return _executor->scheduleWorkAt(when, work); -} - -StatusWith ShardingTaskExecutor::scheduleRemoteCommand( - const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { - - // schedule the user's callback if there is not opCtx - if (!request.opCtx) { - return _executor->scheduleRemoteCommand(request, cb); - } - - boost::optional newRequest; - - if (request.opCtx->getLogicalSessionId() && !request.cmdObj.hasField("lsid")) { - newRequest.emplace(request); - BSONObjBuilder bob(std::move(newRequest->cmdObj)); - { - BSONObjBuilder subbob(bob.subobjStart("lsid")); - request.opCtx->getLogicalSessionId()->serialize(&subbob); - } - - newRequest->cmdObj = bob.obj(); - } - - std::shared_ptr timeTracker = OperationTimeTracker::get(request.opCtx); - - auto clusterGLE = ClusterLastErrorInfo::get(request.opCtx->getClient()); - - auto shardingCb = [timeTracker, clusterGLE, cb]( - const TaskExecutor::RemoteCommandCallbackArgs& args) { - ON_BLOCK_EXIT([&cb, &args]() { cb(args); }); - - // Update replica set monitor info. - auto shard = grid.shardRegistry()->getShardForHostNoReload(args.request.target); - if (!shard) { - LOG(1) << "Could not find shard containing host: " << args.request.target.toString(); - } - - if (!args.response.isOK()) { - if (shard) { - shard->updateReplSetMonitor(args.request.target, args.response.status); - } - LOG(1) << "Error processing the remote request, not updating operationTime or gLE"; - return; - } - - if (shard) { - shard->updateReplSetMonitor(args.request.target, - getStatusFromCommandResult(args.response.data)); - } - - // Update the logical clock. - invariant(timeTracker); - auto operationTime = args.response.data[kOperationTimeField]; - if (!operationTime.eoo()) { - invariant(operationTime.type() == BSONType::bsonTimestamp); - timeTracker->updateOperationTime(LogicalTime(operationTime.timestamp())); - } - - // Update getLastError info for the client if we're tracking it. - if (clusterGLE) { - auto swShardingMetadata = - rpc::ShardingMetadata::readFromMetadata(args.response.metadata); - if (swShardingMetadata.isOK()) { - auto shardingMetadata = std::move(swShardingMetadata.getValue()); - - auto shardConn = ConnectionString::parse(args.request.target.toString()); - if (!shardConn.isOK()) { - severe() << "got bad host string in saveGLEStats: " << args.request.target; - } - - clusterGLE->addHostOpTime(shardConn.getValue(), - HostOpTime(shardingMetadata.getLastOpTime(), - shardingMetadata.getLastElectionId())); - } else if (swShardingMetadata.getStatus() != ErrorCodes::NoSuchKey) { - warning() << "Got invalid sharding metadata " - << redact(swShardingMetadata.getStatus()) << " metadata object was '" - << redact(args.response.metadata) << "'"; - } - } - }; - - return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb); -} - -void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { - _executor->cancel(cbHandle); -} - -void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) { - _executor->wait(cbHandle); -} - -void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { - _executor->appendConnectionStats(stats); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/db/s/sharding_task_executor.h b/src/mongo/db/s/sharding_task_executor.h deleted file mode 100644 index 4c2571c684a..00000000000 --- a/src/mongo/db/s/sharding_task_executor.h +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include - -#include "mongo/base/disallow_copying.h" -#include "mongo/base/status_with.h" -#include "mongo/executor/task_executor.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/list.h" -#include "mongo/stdx/mutex.h" - -namespace mongo { -namespace executor { - -struct ConnectionPoolStats; -class ThreadPoolTaskExecutor; - -/** - * Implementation of a TaskExecutor that uses ThreadPoolTaskExecutor to submit tasks and allows to - * override methods if needed. - */ -class ShardingTaskExecutor final : public TaskExecutor { - MONGO_DISALLOW_COPYING(ShardingTaskExecutor); - -public: - ShardingTaskExecutor(std::unique_ptr executor); - - void startup() override; - void shutdown() override; - void join() override; - void appendDiagnosticBSON(BSONObjBuilder* builder) const override; - Date_t now() override; - StatusWith makeEvent() override; - void signalEvent(const EventHandle& event) override; - StatusWith onEvent(const EventHandle& event, const CallbackFn& work) override; - void waitForEvent(const EventHandle& event) override; - StatusWith waitForEvent(OperationContext* opCtx, - const EventHandle& event, - Date_t deadline) override; - StatusWith scheduleWork(const CallbackFn& work) override; - StatusWith scheduleWorkAt(Date_t when, const CallbackFn& work) override; - StatusWith scheduleRemoteCommand(const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override; - void cancel(const CallbackHandle& cbHandle) override; - void wait(const CallbackHandle& cbHandle) override; - - void appendConnectionStats(ConnectionPoolStats* stats) const override; - -private: - std::unique_ptr _executor; -}; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/db/server_options.cpp b/src/mongo/db/server_options.cpp index d37442e4519..bc01417b6f8 100644 --- a/src/mongo/db/server_options.cpp +++ b/src/mongo/db/server_options.cpp @@ -25,6 +25,8 @@ * then also delete it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/server_options.h" namespace mongo { diff --git a/src/mongo/db/server_options.h b/src/mongo/db/server_options.h index 826f5760313..478fdbab935 100644 --- a/src/mongo/db/server_options.h +++ b/src/mongo/db/server_options.h @@ -30,7 +30,6 @@ #include "mongo/db/jsobj.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/process_id.h" -#include "mongo/s/catalog/sharding_catalog_client.h" namespace mongo { diff --git a/src/mongo/db/service_context_d.cpp b/src/mongo/db/service_context_d.cpp index 15e39126539..980bea314b0 100644 --- a/src/mongo/db/service_context_d.cpp +++ b/src/mongo/db/service_context_d.cpp @@ -36,6 +36,7 @@ #include "mongo/base/initializer.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/operation_context.h" #include "mongo/db/service_entry_point_mongod.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_engine_lock_file.h" diff --git a/src/mongo/rpc/command_request.cpp b/src/mongo/rpc/command_request.cpp index 44accc8b8f0..98e85f02f5f 100644 --- a/src/mongo/rpc/command_request.cpp +++ b/src/mongo/rpc/command_request.cpp @@ -38,6 +38,7 @@ #include "mongo/base/data_type_terminated.h" #include "mongo/base/data_type_validated.h" #include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/client/read_preference.h" #include "mongo/db/jsobj.h" #include "mongo/rpc/object_check.h" #include "mongo/util/assert_util.h" diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index de20bdb4489..16687b6567d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -24,17 +24,17 @@ env.Library( 'sharding_initialization.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos', '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/executor/network_interface_thread_pool', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl', '$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager', - '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos', - '$BUILD_DIR/mongo/db/s/sharding_task_executor', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/util/periodic_runner_factory', 'client/sharding_connection_hook', 'coreshard', + 'sharding_task_executor', ], ) @@ -106,7 +106,6 @@ env.Library( '$BUILD_DIR/mongo/client/remote_command_targeter_mock', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/collation/collator_factory_mock', - '$BUILD_DIR/mongo/db/s/sharding_task_executor', '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/task_executor_pool', @@ -117,6 +116,7 @@ env.Library( '$BUILD_DIR/mongo/transport/transport_layer_mock', '$BUILD_DIR/mongo/util/clock_source_mock', 'sharding_egress_metadata_hook_for_mongos', + 'sharding_task_executor', ], ) @@ -235,6 +235,20 @@ env.Library( ], ) + +env.Library( + target='sharding_task_executor', + source=[ + 'sharding_task_executor.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/s/client/sharding_client', + '$BUILD_DIR/mongo/s/cluster_last_error_info', + ], +) + env.Library( target='stale_config', source=[ diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 7da59f057c3..bf99877d6df 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -50,7 +50,6 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/type_shard_identity.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 7ab8c63ddef..5b81aadbdac 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -28,7 +28,6 @@ #pragma once -#include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client.h" namespace mongo { diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index bf072e10df4..a1f44b678f2 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -37,8 +37,6 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/optime_with.h" -#include "mongo/platform/unordered_set.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/s/client/version_manager.cpp b/src/mongo/s/client/version_manager.cpp index 27016c0a566..bbd3d5f8555 100644 --- a/src/mongo/s/client/version_manager.cpp +++ b/src/mongo/s/client/version_manager.cpp @@ -34,10 +34,7 @@ #include "mongo/client/dbclient_rs.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" diff --git a/src/mongo/s/commands/cluster_add_shard_cmd.cpp b/src/mongo/s/commands/cluster_add_shard_cmd.cpp index df13a652f37..39225a4b854 100644 --- a/src/mongo/s/commands/cluster_add_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_add_shard_cmd.cpp @@ -30,13 +30,7 @@ #include "mongo/platform/basic.h" -#include - -#include "mongo/bson/util/bson_extract.h" -#include "mongo/db/audit.h" #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/add_shard_request_type.h" @@ -44,9 +38,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { - -using std::string; - namespace { const ReadPreferenceSetting kPrimaryOnlyReadPreference{ReadPreference::PrimaryOnly}; @@ -60,11 +51,11 @@ public: return AllowedOnSecondary::kAlways; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } @@ -72,18 +63,18 @@ public: return "add a new shard to the system"; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector* out) { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector* out) override { ActionSet actions; actions.addAction(ActionType::addShard); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { auto parsedRequest = uassertStatusOK(AddShardRequest::parseFromMongosCommand(cmdObj)); // Force a reload of this node's shard list cache at the end of this command. @@ -106,7 +97,7 @@ public: return true; } -} addShard; +} addShardCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp b/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp index ca9f56b0ab7..8289bef85b0 100644 --- a/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp +++ b/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp @@ -30,13 +30,9 @@ #include "mongo/platform/basic.h" -#include - -#include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -44,9 +40,6 @@ #include "mongo/util/log.h" namespace mongo { - -using std::string; - namespace { const ReadPreferenceSetting kPrimaryOnlyReadPreference{ReadPreference::PrimaryOnly}; @@ -73,11 +66,11 @@ public: return AllowedOnSecondary::kAlways; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -95,10 +88,10 @@ public: return Status::OK(); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { auto parsedRequest = uassertStatusOK(AddShardToZoneRequest::parseFromMongosCommand(cmdObj)); BSONObjBuilder cmdBuilder; diff --git a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp index 1869632ef68..ba7bd608612 100644 --- a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp +++ b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp @@ -30,17 +30,13 @@ #include "mongo/platform/basic.h" -#include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -56,11 +52,11 @@ public: return AllowedOnSecondary::kAlways; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } @@ -70,9 +66,9 @@ public: " { enablesharding : \"\" }\n"; } - virtual Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) { + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forDatabaseName(parseNs(dbname, cmdObj)), ActionType::enableSharding)) { @@ -82,15 +78,15 @@ public: return Status::OK(); } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname_unused, const BSONObj& cmdObj) const override { return cmdObj.firstElement().str(); } - virtual bool errmsgRun(OperationContext* opCtx, - const std::string& dbname_unused, - const BSONObj& cmdObj, - std::string& errmsg, - BSONObjBuilder& result) { + bool errmsgRun(OperationContext* opCtx, + const std::string& dbname_unused, + const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result) override { const std::string db = parseNs("", cmdObj); // Invalidate the routing table cache entry for this database so that we reload the @@ -110,7 +106,7 @@ public: return true; } -} clusterEnableShardingCmd; +} enableShardingCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_list_databases_cmd.cpp b/src/mongo/s/commands/cluster_list_databases_cmd.cpp index 064b89d3ca4..10d9c802a46 100644 --- a/src/mongo/s/commands/cluster_list_databases_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_databases_cmd.cpp @@ -29,7 +29,6 @@ #include "mongo/platform/basic.h" #include -#include #include #include "mongo/bson/util/bson_extract.h" @@ -37,35 +36,28 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" namespace mongo { - -using std::unique_ptr; -using std::map; -using std::string; -using std::vector; - namespace { class ListDatabasesCmd : public BasicCommand { public: ListDatabasesCmd() : BasicCommand("listDatabases", "listdatabases") {} - AllowedOnSecondary secondaryAllowed() const final { + AllowedOnSecondary secondaryAllowed() const override { return AllowedOnSecondary::kAlways; } - bool adminOnly() const final { + bool adminOnly() const override { return true; } - bool supportsWriteConcern(const BSONObj& cmd) const final { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -80,7 +72,7 @@ public: */ Status checkAuthForCommand(Client* client, const std::string& dbname, - const BSONObj& cmdObj) final { + const BSONObj& cmdObj) override { return Status::OK(); } @@ -88,13 +80,13 @@ public: bool run(OperationContext* opCtx, const std::string& dbname_unused, const BSONObj& cmdObj, - BSONObjBuilder& result) final { + BSONObjBuilder& result) override { const bool nameOnly = cmdObj["nameOnly"].trueValue(); - map sizes; - map> dbShardInfo; + std::map sizes; + std::map> dbShardInfo; - vector shardIds; + std::vector shardIds; grid.shardRegistry()->getAllShardIds(&shardIds); shardIds.emplace_back(ShardRegistry::kConfigServerShardId); @@ -120,7 +112,7 @@ public: while (j.more()) { BSONObj dbObj = j.next().Obj(); - const string name = dbObj["name"].String(); + const auto name = dbObj["name"].String(); // If this is the admin db, only collect its stats from the config servers. if (name == "admin" && !s->isConfig()) { @@ -143,8 +135,8 @@ public: sizeSumForDbAcrossShards += size; } - unique_ptr& bb = dbShardInfo[name]; - if (!bb.get()) { + auto& bb = dbShardInfo[name]; + if (!bb) { bb.reset(new BSONObjBuilder()); } @@ -163,15 +155,16 @@ public: // Now that we have aggregated results for all the shards, convert to a response, // and compute total sizes. long long totalSize = 0; + { BSONArrayBuilder dbListBuilder(result.subarrayStart("databases")); - for (map::iterator i = sizes.begin(); i != sizes.end(); ++i) { - const string name = i->first; + for (const auto& sizeEntry : sizes) { + const auto& name = sizeEntry.first; + const long long size = sizeEntry.second; - if (name == "local") { - // We don't return local, since all shards have their own independent local + // Skip the local database, since all shards have their own independent local + if (name == NamespaceString::kLocalDb) continue; - } if (checkAuth && as && !as->isAuthorizedForActionsOnResource(ResourcePattern::forDatabaseName(name), @@ -180,8 +173,6 @@ public: continue; } - long long size = i->second; - BSONObjBuilder temp; temp.append("name", name); if (!nameOnly) { @@ -208,7 +199,7 @@ public: return true; } -} clusterCmdListDatabases; +} listDatabasesCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_list_shards_cmd.cpp b/src/mongo/s/commands/cluster_list_shards_cmd.cpp index 59ceb90f106..e3d65fabe99 100644 --- a/src/mongo/s/commands/cluster_list_shards_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_shards_cmd.cpp @@ -28,10 +28,7 @@ #include "mongo/platform/basic.h" -#include - #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/grid.h" @@ -83,7 +80,7 @@ public: return true; } -} listShards; +} listShardsCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp index 1bbe5984ca8..aa048b3b575 100644 --- a/src/mongo/s/commands/cluster_user_management_commands.cpp +++ b/src/mongo/s/commands/cluster_user_management_commands.cpp @@ -30,8 +30,6 @@ #include "mongo/platform/basic.h" -#include "mongo/db/commands/user_management_commands.h" - #include "mongo/base/status.h" #include "mongo/bson/mutable/document.h" #include "mongo/client/dbclientinterface.h" @@ -40,9 +38,9 @@ #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/user_management_commands_parser.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/user_management_commands.h" #include "mongo/db/jsobj.h" #include "mongo/rpc/write_concern_error_detail.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp index 012ec0b17d8..517f71606d3 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/commands/cluster_write.cpp @@ -39,7 +39,6 @@ #include "mongo/db/lasterror.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 1ce1611ff25..32ce21b65ea 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -53,7 +53,6 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/async_requests_sender.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index b96f4986197..e27d34f3891 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -37,12 +37,9 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_factory.h" -#include "mongo/s/client/shard_registry.h" #include "mongo/s/query/cluster_cursor_manager.h" -#include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 1a14ad361e5..ba47fa07c4e 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -29,6 +29,8 @@ #pragma once #include "mongo/db/repl/optime.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -37,11 +39,9 @@ namespace mongo { class BalancerConfiguration; class CatalogCache; -class ShardingCatalogClient; class ClusterCursorManager; class OperationContext; class ServiceContext; -class ShardRegistry; namespace executor { struct ConnectionPoolStats; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index c0d99811094..2a38a9a136e 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -70,7 +70,6 @@ #include "mongo/platform/process_id.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_factory.h" @@ -133,10 +132,13 @@ boost::optional shardingUptimeReporter; static constexpr auto kRetryInterval = Seconds{1}; Status waitForSigningKeys(OperationContext* opCtx) { + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + while (true) { - // this should be true when shard registry is up - invariant(grid.shardRegistry()->isUp()); - auto configCS = grid.shardRegistry()->getConfigServerConnectionString(); + // This should be true when shard registry is up + invariant(shardRegistry->isUp()); + + auto configCS = shardRegistry->getConfigServerConnectionString(); auto rsm = ReplicaSetMonitor::get(configCS.getSetName()); // mongod will set minWireVersion == maxWireVersion for isMaster requests from // internalClient. diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 3dd9b60f8ce..1bb4215fa2c 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -42,7 +42,6 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/sharding_task_executor.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -66,6 +65,7 @@ #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/sharding_task_executor.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -278,7 +278,7 @@ Status waitForShardRegistryReload(OperationContext* opCtx) { try { uassertStatusOK(ClusterIdentityLoader::get(opCtx)->loadClusterId( opCtx, repl::ReadConcernLevel::kMajorityReadConcern)); - if (grid.shardRegistry()->isUp()) { + if (Grid::get(opCtx)->shardRegistry()->isUp()) { return Status::OK(); } sleepFor(kRetryInterval); diff --git a/src/mongo/s/sharding_mongod_test_fixture.h b/src/mongo/s/sharding_mongod_test_fixture.h index 6ac9a636326..aca1a3f22c0 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.h +++ b/src/mongo/s/sharding_mongod_test_fixture.h @@ -33,29 +33,19 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/executor/network_test_env.h" +#include "mongo/s/grid.h" #include "mongo/unittest/unittest.h" namespace mongo { -class BalancerConfiguration; -class CatalogCache; class CatalogCacheLoader; class ConnectionString; -class ClusterCursorManager; class DistLockCatalog; class DistLockManager; class NamespaceString; class RemoteCommandTargeterFactoryMock; -class ShardingCatalogClient; class ShardRegistry; -namespace executor { -class NetworkInterfaceMock; -class NetworkTestEnv; -class TaskExecutor; -class TaskExecutorPool; -} // namespace executor - namespace repl { class ReplicationCoordinatorMock; class ReplSettings; diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp new file mode 100644 index 00000000000..45c3407c6d1 --- /dev/null +++ b/src/mongo/s/sharding_task_executor.cpp @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/sharding_task_executor.h" + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/logical_time.h" +#include "mongo/db/operation_time_tracker.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/sharding_metadata.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_last_error_info.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace executor { + +namespace { +const std::string kOperationTimeField = "operationTime"; +} + +ShardingTaskExecutor::ShardingTaskExecutor(std::unique_ptr executor) + : _executor(std::move(executor)) {} + +void ShardingTaskExecutor::startup() { + _executor->startup(); +} + +void ShardingTaskExecutor::shutdown() { + _executor->shutdown(); +} + +void ShardingTaskExecutor::join() { + _executor->join(); +} + +void ShardingTaskExecutor::appendDiagnosticBSON(mongo::BSONObjBuilder* builder) const { + _executor->appendDiagnosticBSON(builder); +} + +Date_t ShardingTaskExecutor::now() { + return _executor->now(); +} + +StatusWith ShardingTaskExecutor::makeEvent() { + return _executor->makeEvent(); +} + +void ShardingTaskExecutor::signalEvent(const EventHandle& event) { + return _executor->signalEvent(event); +} + +StatusWith ShardingTaskExecutor::onEvent(const EventHandle& event, + const CallbackFn& work) { + return _executor->onEvent(event, work); +} + +void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { + _executor->waitForEvent(event); +} + +StatusWith ShardingTaskExecutor::waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) { + return _executor->waitForEvent(opCtx, event, deadline); +} + +StatusWith ShardingTaskExecutor::scheduleWork( + const CallbackFn& work) { + return _executor->scheduleWork(work); +} + +StatusWith ShardingTaskExecutor::scheduleWorkAt( + Date_t when, const CallbackFn& work) { + return _executor->scheduleWorkAt(when, work); +} + +StatusWith ShardingTaskExecutor::scheduleRemoteCommand( + const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + + // schedule the user's callback if there is not opCtx + if (!request.opCtx) { + return _executor->scheduleRemoteCommand(request, cb); + } + + boost::optional newRequest; + + if (request.opCtx->getLogicalSessionId() && !request.cmdObj.hasField("lsid")) { + newRequest.emplace(request); + BSONObjBuilder bob(std::move(newRequest->cmdObj)); + { + BSONObjBuilder subbob(bob.subobjStart("lsid")); + request.opCtx->getLogicalSessionId()->serialize(&subbob); + } + + newRequest->cmdObj = bob.obj(); + } + + std::shared_ptr timeTracker = OperationTimeTracker::get(request.opCtx); + + auto clusterGLE = ClusterLastErrorInfo::get(request.opCtx->getClient()); + + auto shardingCb = [timeTracker, clusterGLE, cb]( + const TaskExecutor::RemoteCommandCallbackArgs& args) { + ON_BLOCK_EXIT([&cb, &args]() { cb(args); }); + + // Update replica set monitor info. + auto shard = grid.shardRegistry()->getShardForHostNoReload(args.request.target); + if (!shard) { + LOG(1) << "Could not find shard containing host: " << args.request.target.toString(); + } + + if (!args.response.isOK()) { + if (shard) { + shard->updateReplSetMonitor(args.request.target, args.response.status); + } + LOG(1) << "Error processing the remote request, not updating operationTime or gLE"; + return; + } + + if (shard) { + shard->updateReplSetMonitor(args.request.target, + getStatusFromCommandResult(args.response.data)); + } + + // Update the logical clock. + invariant(timeTracker); + auto operationTime = args.response.data[kOperationTimeField]; + if (!operationTime.eoo()) { + invariant(operationTime.type() == BSONType::bsonTimestamp); + timeTracker->updateOperationTime(LogicalTime(operationTime.timestamp())); + } + + // Update getLastError info for the client if we're tracking it. + if (clusterGLE) { + auto swShardingMetadata = + rpc::ShardingMetadata::readFromMetadata(args.response.metadata); + if (swShardingMetadata.isOK()) { + auto shardingMetadata = std::move(swShardingMetadata.getValue()); + + auto shardConn = ConnectionString::parse(args.request.target.toString()); + if (!shardConn.isOK()) { + severe() << "got bad host string in saveGLEStats: " << args.request.target; + } + + clusterGLE->addHostOpTime(shardConn.getValue(), + HostOpTime(shardingMetadata.getLastOpTime(), + shardingMetadata.getLastElectionId())); + } else if (swShardingMetadata.getStatus() != ErrorCodes::NoSuchKey) { + warning() << "Got invalid sharding metadata " + << redact(swShardingMetadata.getStatus()) << " metadata object was '" + << redact(args.response.metadata) << "'"; + } + } + }; + + return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb); +} + +void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { + _executor->cancel(cbHandle); +} + +void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) { + _executor->wait(cbHandle); +} + +void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { + _executor->appendConnectionStats(stats); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h new file mode 100644 index 00000000000..4c2571c684a --- /dev/null +++ b/src/mongo/s/sharding_task_executor.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { +namespace executor { + +struct ConnectionPoolStats; +class ThreadPoolTaskExecutor; + +/** + * Implementation of a TaskExecutor that uses ThreadPoolTaskExecutor to submit tasks and allows to + * override methods if needed. + */ +class ShardingTaskExecutor final : public TaskExecutor { + MONGO_DISALLOW_COPYING(ShardingTaskExecutor); + +public: + ShardingTaskExecutor(std::unique_ptr executor); + + void startup() override; + void shutdown() override; + void join() override; + void appendDiagnosticBSON(BSONObjBuilder* builder) const override; + Date_t now() override; + StatusWith makeEvent() override; + void signalEvent(const EventHandle& event) override; + StatusWith onEvent(const EventHandle& event, const CallbackFn& work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) override; + StatusWith scheduleWork(const CallbackFn& work) override; + StatusWith scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle) override; + + void appendConnectionStats(ConnectionPoolStats* stats) const override; + +private: + std::unique_ptr _executor; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 235054b36c1..b33a434c637 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -44,7 +44,6 @@ #include "mongo/db/query/collation/collator_factory_mock.h" #include "mongo/db/query/query_request.h" #include "mongo/db/repl/read_concern_args.h" -#include "mongo/db/s/sharding_task_executor.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor_pool.h" @@ -66,6 +65,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" +#include "mongo/s/sharding_task_executor.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" #include "mongo/transport/mock_session.h" diff --git a/src/mongo/s/sharding_uptime_reporter.cpp b/src/mongo/s/sharding_uptime_reporter.cpp index 0b4a1ff9e5e..3fa5a96fc27 100644 --- a/src/mongo/s/sharding_uptime_reporter.cpp +++ b/src/mongo/s/sharding_uptime_reporter.cpp @@ -35,7 +35,6 @@ #include "mongo/db/client.h" #include "mongo/db/server_options.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_mongos.h" #include "mongo/s/grid.h" #include "mongo/util/concurrency/idle_thread_block.h" diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index 1a065e24e38..d74fcbcdfc7 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -33,6 +33,7 @@ #include "mongo/base/checked_cast.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/service_context_noop.h" #include "mongo/stdx/memory.h" @@ -52,7 +53,8 @@ namespace mongo { namespace { -inline std::string stateToString(ServiceStateMachine::State state) { + +std::string stateToString(ServiceStateMachine::State state) { std::string ret = str::stream() << state; return ret; } diff --git a/src/mongo/util/signal_handlers.cpp b/src/mongo/util/signal_handlers.cpp index ed84a273b91..26eee867369 100644 --- a/src/mongo/util/signal_handlers.cpp +++ b/src/mongo/util/signal_handlers.cpp @@ -41,6 +41,7 @@ #include "mongo/db/log_process_details.h" #include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" #include "mongo/platform/process_id.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" -- cgit v1.2.1