diff options
Diffstat (limited to 'src/mongo/s')
22 files changed, 367 insertions, 119 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index de20bdb4489..16687b6567d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -24,17 +24,17 @@ env.Library( 'sharding_initialization.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos', '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/executor/network_interface_thread_pool', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl', '$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager', - '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos', - '$BUILD_DIR/mongo/db/s/sharding_task_executor', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/util/periodic_runner_factory', 'client/sharding_connection_hook', 'coreshard', + 'sharding_task_executor', ], ) @@ -106,7 +106,6 @@ env.Library( '$BUILD_DIR/mongo/client/remote_command_targeter_mock', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/collation/collator_factory_mock', - '$BUILD_DIR/mongo/db/s/sharding_task_executor', '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/task_executor_pool', @@ -117,6 +116,7 @@ env.Library( '$BUILD_DIR/mongo/transport/transport_layer_mock', '$BUILD_DIR/mongo/util/clock_source_mock', 'sharding_egress_metadata_hook_for_mongos', + 'sharding_task_executor', ], ) @@ -235,6 +235,20 @@ env.Library( ], ) + +env.Library( + target='sharding_task_executor', + source=[ + 'sharding_task_executor.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/s/client/sharding_client', + '$BUILD_DIR/mongo/s/cluster_last_error_info', + ], +) + env.Library( target='stale_config', source=[ diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 7da59f057c3..bf99877d6df 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -50,7 +50,6 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/type_shard_identity.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 7ab8c63ddef..5b81aadbdac 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -28,7 +28,6 @@ #pragma once -#include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client.h" namespace mongo { diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index bf072e10df4..a1f44b678f2 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -37,8 +37,6 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/optime_with.h" -#include "mongo/platform/unordered_set.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/s/client/version_manager.cpp b/src/mongo/s/client/version_manager.cpp index 27016c0a566..bbd3d5f8555 100644 --- a/src/mongo/s/client/version_manager.cpp +++ b/src/mongo/s/client/version_manager.cpp @@ -34,10 +34,7 @@ #include "mongo/client/dbclient_rs.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" diff --git a/src/mongo/s/commands/cluster_add_shard_cmd.cpp b/src/mongo/s/commands/cluster_add_shard_cmd.cpp index df13a652f37..39225a4b854 100644 --- a/src/mongo/s/commands/cluster_add_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_add_shard_cmd.cpp @@ -30,13 +30,7 @@ #include "mongo/platform/basic.h" -#include <vector> - -#include "mongo/bson/util/bson_extract.h" -#include "mongo/db/audit.h" #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/add_shard_request_type.h" @@ -44,9 +38,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { - -using std::string; - namespace { const ReadPreferenceSetting kPrimaryOnlyReadPreference{ReadPreference::PrimaryOnly}; @@ -60,11 +51,11 @@ public: return AllowedOnSecondary::kAlways; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } @@ -72,18 +63,18 @@ public: return "add a new shard to the system"; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override { ActionSet actions; actions.addAction(ActionType::addShard); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { auto parsedRequest = uassertStatusOK(AddShardRequest::parseFromMongosCommand(cmdObj)); // Force a reload of this node's shard list cache at the end of this command. @@ -106,7 +97,7 @@ public: return true; } -} addShard; +} addShardCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp b/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp index ca9f56b0ab7..8289bef85b0 100644 --- a/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp +++ b/src/mongo/s/commands/cluster_add_shard_to_zone_cmd.cpp @@ -30,13 +30,9 @@ #include "mongo/platform/basic.h" -#include <vector> - -#include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -44,9 +40,6 @@ #include "mongo/util/log.h" namespace mongo { - -using std::string; - namespace { const ReadPreferenceSetting kPrimaryOnlyReadPreference{ReadPreference::PrimaryOnly}; @@ -73,11 +66,11 @@ public: return AllowedOnSecondary::kAlways; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -95,10 +88,10 @@ public: return Status::OK(); } - virtual bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { auto parsedRequest = uassertStatusOK(AddShardToZoneRequest::parseFromMongosCommand(cmdObj)); BSONObjBuilder cmdBuilder; diff --git a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp index 1869632ef68..ba7bd608612 100644 --- a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp +++ b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp @@ -30,17 +30,13 @@ #include "mongo/platform/basic.h" -#include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -56,11 +52,11 @@ public: return AllowedOnSecondary::kAlways; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } @@ -70,9 +66,9 @@ public: " { enablesharding : \"<dbname>\" }\n"; } - virtual Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) { + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forDatabaseName(parseNs(dbname, cmdObj)), ActionType::enableSharding)) { @@ -82,15 +78,15 @@ public: return Status::OK(); } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + std::string parseNs(const std::string& dbname_unused, const BSONObj& cmdObj) const override { return cmdObj.firstElement().str(); } - virtual bool errmsgRun(OperationContext* opCtx, - const std::string& dbname_unused, - const BSONObj& cmdObj, - std::string& errmsg, - BSONObjBuilder& result) { + bool errmsgRun(OperationContext* opCtx, + const std::string& dbname_unused, + const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result) override { const std::string db = parseNs("", cmdObj); // Invalidate the routing table cache entry for this database so that we reload the @@ -110,7 +106,7 @@ public: return true; } -} clusterEnableShardingCmd; +} enableShardingCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_list_databases_cmd.cpp b/src/mongo/s/commands/cluster_list_databases_cmd.cpp index 064b89d3ca4..10d9c802a46 100644 --- a/src/mongo/s/commands/cluster_list_databases_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_databases_cmd.cpp @@ -29,7 +29,6 @@ #include "mongo/platform/basic.h" #include <map> -#include <string> #include <vector> #include "mongo/bson/util/bson_extract.h" @@ -37,35 +36,28 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" namespace mongo { - -using std::unique_ptr; -using std::map; -using std::string; -using std::vector; - namespace { class ListDatabasesCmd : public BasicCommand { public: ListDatabasesCmd() : BasicCommand("listDatabases", "listdatabases") {} - AllowedOnSecondary secondaryAllowed() const final { + AllowedOnSecondary secondaryAllowed() const override { return AllowedOnSecondary::kAlways; } - bool adminOnly() const final { + bool adminOnly() const override { return true; } - bool supportsWriteConcern(const BSONObj& cmd) const final { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -80,7 +72,7 @@ public: */ Status checkAuthForCommand(Client* client, const std::string& dbname, - const BSONObj& cmdObj) final { + const BSONObj& cmdObj) override { return Status::OK(); } @@ -88,13 +80,13 @@ public: bool run(OperationContext* opCtx, const std::string& dbname_unused, const BSONObj& cmdObj, - BSONObjBuilder& result) final { + BSONObjBuilder& result) override { const bool nameOnly = cmdObj["nameOnly"].trueValue(); - map<string, long long> sizes; - map<string, unique_ptr<BSONObjBuilder>> dbShardInfo; + std::map<std::string, long long> sizes; + std::map<std::string, std::unique_ptr<BSONObjBuilder>> dbShardInfo; - vector<ShardId> shardIds; + std::vector<ShardId> shardIds; grid.shardRegistry()->getAllShardIds(&shardIds); shardIds.emplace_back(ShardRegistry::kConfigServerShardId); @@ -120,7 +112,7 @@ public: while (j.more()) { BSONObj dbObj = j.next().Obj(); - const string name = dbObj["name"].String(); + const auto name = dbObj["name"].String(); // If this is the admin db, only collect its stats from the config servers. if (name == "admin" && !s->isConfig()) { @@ -143,8 +135,8 @@ public: sizeSumForDbAcrossShards += size; } - unique_ptr<BSONObjBuilder>& bb = dbShardInfo[name]; - if (!bb.get()) { + auto& bb = dbShardInfo[name]; + if (!bb) { bb.reset(new BSONObjBuilder()); } @@ -163,15 +155,16 @@ public: // Now that we have aggregated results for all the shards, convert to a response, // and compute total sizes. long long totalSize = 0; + { BSONArrayBuilder dbListBuilder(result.subarrayStart("databases")); - for (map<string, long long>::iterator i = sizes.begin(); i != sizes.end(); ++i) { - const string name = i->first; + for (const auto& sizeEntry : sizes) { + const auto& name = sizeEntry.first; + const long long size = sizeEntry.second; - if (name == "local") { - // We don't return local, since all shards have their own independent local + // Skip the local database, since all shards have their own independent local + if (name == NamespaceString::kLocalDb) continue; - } if (checkAuth && as && !as->isAuthorizedForActionsOnResource(ResourcePattern::forDatabaseName(name), @@ -180,8 +173,6 @@ public: continue; } - long long size = i->second; - BSONObjBuilder temp; temp.append("name", name); if (!nameOnly) { @@ -208,7 +199,7 @@ public: return true; } -} clusterCmdListDatabases; +} listDatabasesCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_list_shards_cmd.cpp b/src/mongo/s/commands/cluster_list_shards_cmd.cpp index 59ceb90f106..e3d65fabe99 100644 --- a/src/mongo/s/commands/cluster_list_shards_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_shards_cmd.cpp @@ -28,10 +28,7 @@ #include "mongo/platform/basic.h" -#include <vector> - #include "mongo/db/commands.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/grid.h" @@ -83,7 +80,7 @@ public: return true; } -} listShards; +} listShardsCmd; } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp index 1bbe5984ca8..aa048b3b575 100644 --- a/src/mongo/s/commands/cluster_user_management_commands.cpp +++ b/src/mongo/s/commands/cluster_user_management_commands.cpp @@ -30,8 +30,6 @@ #include "mongo/platform/basic.h" -#include "mongo/db/commands/user_management_commands.h" - #include "mongo/base/status.h" #include "mongo/bson/mutable/document.h" #include "mongo/client/dbclientinterface.h" @@ -40,9 +38,9 @@ #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/user_management_commands_parser.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/user_management_commands.h" #include "mongo/db/jsobj.h" #include "mongo/rpc/write_concern_error_detail.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp index 012ec0b17d8..517f71606d3 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/commands/cluster_write.cpp @@ -39,7 +39,6 @@ #include "mongo/db/lasterror.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 1ce1611ff25..32ce21b65ea 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -53,7 +53,6 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/async_requests_sender.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index b96f4986197..e27d34f3891 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -37,12 +37,9 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_factory.h" -#include "mongo/s/client/shard_registry.h" #include "mongo/s/query/cluster_cursor_manager.h" -#include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 1a14ad361e5..ba47fa07c4e 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -29,6 +29,8 @@ #pragma once #include "mongo/db/repl/optime.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -37,11 +39,9 @@ namespace mongo { class BalancerConfiguration; class CatalogCache; -class ShardingCatalogClient; class ClusterCursorManager; class OperationContext; class ServiceContext; -class ShardRegistry; namespace executor { struct ConnectionPoolStats; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index c0d99811094..2a38a9a136e 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -70,7 +70,6 @@ #include "mongo/platform/process_id.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_factory.h" @@ -133,10 +132,13 @@ boost::optional<ShardingUptimeReporter> shardingUptimeReporter; static constexpr auto kRetryInterval = Seconds{1}; Status waitForSigningKeys(OperationContext* opCtx) { + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + while (true) { - // this should be true when shard registry is up - invariant(grid.shardRegistry()->isUp()); - auto configCS = grid.shardRegistry()->getConfigServerConnectionString(); + // This should be true when shard registry is up + invariant(shardRegistry->isUp()); + + auto configCS = shardRegistry->getConfigServerConnectionString(); auto rsm = ReplicaSetMonitor::get(configCS.getSetName()); // mongod will set minWireVersion == maxWireVersion for isMaster requests from // internalClient. diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 3dd9b60f8ce..1bb4215fa2c 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -42,7 +42,6 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/sharding_task_executor.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -66,6 +65,7 @@ #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/sharding_task_executor.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -278,7 +278,7 @@ Status waitForShardRegistryReload(OperationContext* opCtx) { try { uassertStatusOK(ClusterIdentityLoader::get(opCtx)->loadClusterId( opCtx, repl::ReadConcernLevel::kMajorityReadConcern)); - if (grid.shardRegistry()->isUp()) { + if (Grid::get(opCtx)->shardRegistry()->isUp()) { return Status::OK(); } sleepFor(kRetryInterval); diff --git a/src/mongo/s/sharding_mongod_test_fixture.h b/src/mongo/s/sharding_mongod_test_fixture.h index 6ac9a636326..aca1a3f22c0 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.h +++ b/src/mongo/s/sharding_mongod_test_fixture.h @@ -33,29 +33,19 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/executor/network_test_env.h" +#include "mongo/s/grid.h" #include "mongo/unittest/unittest.h" namespace mongo { -class BalancerConfiguration; -class CatalogCache; class CatalogCacheLoader; class ConnectionString; -class ClusterCursorManager; class DistLockCatalog; class DistLockManager; class NamespaceString; class RemoteCommandTargeterFactoryMock; -class ShardingCatalogClient; class ShardRegistry; -namespace executor { -class NetworkInterfaceMock; -class NetworkTestEnv; -class TaskExecutor; -class TaskExecutorPool; -} // namespace executor - namespace repl { class ReplicationCoordinatorMock; class ReplSettings; diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp new file mode 100644 index 00000000000..45c3407c6d1 --- /dev/null +++ b/src/mongo/s/sharding_task_executor.cpp @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/sharding_task_executor.h" + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/logical_time.h" +#include "mongo/db/operation_time_tracker.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/sharding_metadata.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_last_error_info.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace executor { + +namespace { +const std::string kOperationTimeField = "operationTime"; +} + +ShardingTaskExecutor::ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor) + : _executor(std::move(executor)) {} + +void ShardingTaskExecutor::startup() { + _executor->startup(); +} + +void ShardingTaskExecutor::shutdown() { + _executor->shutdown(); +} + +void ShardingTaskExecutor::join() { + _executor->join(); +} + +void ShardingTaskExecutor::appendDiagnosticBSON(mongo::BSONObjBuilder* builder) const { + _executor->appendDiagnosticBSON(builder); +} + +Date_t ShardingTaskExecutor::now() { + return _executor->now(); +} + +StatusWith<TaskExecutor::EventHandle> ShardingTaskExecutor::makeEvent() { + return _executor->makeEvent(); +} + +void ShardingTaskExecutor::signalEvent(const EventHandle& event) { + return _executor->signalEvent(event); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::onEvent(const EventHandle& event, + const CallbackFn& work) { + return _executor->onEvent(event, work); +} + +void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { + _executor->waitForEvent(event); +} + +StatusWith<stdx::cv_status> ShardingTaskExecutor::waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) { + return _executor->waitForEvent(opCtx, event, deadline); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork( + const CallbackFn& work) { + return _executor->scheduleWork(work); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt( + Date_t when, const CallbackFn& work) { + return _executor->scheduleWorkAt(when, work); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand( + const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + + // schedule the user's callback if there is not opCtx + if (!request.opCtx) { + return _executor->scheduleRemoteCommand(request, cb); + } + + boost::optional<RemoteCommandRequest> newRequest; + + if (request.opCtx->getLogicalSessionId() && !request.cmdObj.hasField("lsid")) { + newRequest.emplace(request); + BSONObjBuilder bob(std::move(newRequest->cmdObj)); + { + BSONObjBuilder subbob(bob.subobjStart("lsid")); + request.opCtx->getLogicalSessionId()->serialize(&subbob); + } + + newRequest->cmdObj = bob.obj(); + } + + std::shared_ptr<OperationTimeTracker> timeTracker = OperationTimeTracker::get(request.opCtx); + + auto clusterGLE = ClusterLastErrorInfo::get(request.opCtx->getClient()); + + auto shardingCb = [timeTracker, clusterGLE, cb]( + const TaskExecutor::RemoteCommandCallbackArgs& args) { + ON_BLOCK_EXIT([&cb, &args]() { cb(args); }); + + // Update replica set monitor info. + auto shard = grid.shardRegistry()->getShardForHostNoReload(args.request.target); + if (!shard) { + LOG(1) << "Could not find shard containing host: " << args.request.target.toString(); + } + + if (!args.response.isOK()) { + if (shard) { + shard->updateReplSetMonitor(args.request.target, args.response.status); + } + LOG(1) << "Error processing the remote request, not updating operationTime or gLE"; + return; + } + + if (shard) { + shard->updateReplSetMonitor(args.request.target, + getStatusFromCommandResult(args.response.data)); + } + + // Update the logical clock. + invariant(timeTracker); + auto operationTime = args.response.data[kOperationTimeField]; + if (!operationTime.eoo()) { + invariant(operationTime.type() == BSONType::bsonTimestamp); + timeTracker->updateOperationTime(LogicalTime(operationTime.timestamp())); + } + + // Update getLastError info for the client if we're tracking it. + if (clusterGLE) { + auto swShardingMetadata = + rpc::ShardingMetadata::readFromMetadata(args.response.metadata); + if (swShardingMetadata.isOK()) { + auto shardingMetadata = std::move(swShardingMetadata.getValue()); + + auto shardConn = ConnectionString::parse(args.request.target.toString()); + if (!shardConn.isOK()) { + severe() << "got bad host string in saveGLEStats: " << args.request.target; + } + + clusterGLE->addHostOpTime(shardConn.getValue(), + HostOpTime(shardingMetadata.getLastOpTime(), + shardingMetadata.getLastElectionId())); + } else if (swShardingMetadata.getStatus() != ErrorCodes::NoSuchKey) { + warning() << "Got invalid sharding metadata " + << redact(swShardingMetadata.getStatus()) << " metadata object was '" + << redact(args.response.metadata) << "'"; + } + } + }; + + return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb); +} + +void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { + _executor->cancel(cbHandle); +} + +void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) { + _executor->wait(cbHandle); +} + +void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { + _executor->appendConnectionStats(stats); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h new file mode 100644 index 00000000000..4c2571c684a --- /dev/null +++ b/src/mongo/s/sharding_task_executor.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { +namespace executor { + +struct ConnectionPoolStats; +class ThreadPoolTaskExecutor; + +/** + * Implementation of a TaskExecutor that uses ThreadPoolTaskExecutor to submit tasks and allows to + * override methods if needed. + */ +class ShardingTaskExecutor final : public TaskExecutor { + MONGO_DISALLOW_COPYING(ShardingTaskExecutor); + +public: + ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor); + + void startup() override; + void shutdown() override; + void join() override; + void appendDiagnosticBSON(BSONObjBuilder* builder) const override; + Date_t now() override; + StatusWith<EventHandle> makeEvent() override; + void signalEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) override; + StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle) override; + + void appendConnectionStats(ConnectionPoolStats* stats) const override; + +private: + std::unique_ptr<ThreadPoolTaskExecutor> _executor; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 235054b36c1..b33a434c637 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -44,7 +44,6 @@ #include "mongo/db/query/collation/collator_factory_mock.h" #include "mongo/db/query/query_request.h" #include "mongo/db/repl/read_concern_args.h" -#include "mongo/db/s/sharding_task_executor.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor_pool.h" @@ -66,6 +65,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" +#include "mongo/s/sharding_task_executor.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" #include "mongo/transport/mock_session.h" diff --git a/src/mongo/s/sharding_uptime_reporter.cpp b/src/mongo/s/sharding_uptime_reporter.cpp index 0b4a1ff9e5e..3fa5a96fc27 100644 --- a/src/mongo/s/sharding_uptime_reporter.cpp +++ b/src/mongo/s/sharding_uptime_reporter.cpp @@ -35,7 +35,6 @@ #include "mongo/db/client.h" #include "mongo/db/server_options.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_mongos.h" #include "mongo/s/grid.h" #include "mongo/util/concurrency/idle_thread_block.h" |