diff options
Diffstat (limited to 'src')
29 files changed, 326 insertions, 434 deletions
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index dec27398071..91f41fbf274 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -723,8 +723,7 @@ DBClientConnection* DBClientReplicaSet::selectNodeUsingTags( if (_authPooledSecondaryConn) { _authConnection(_lastSlaveOkConn.get()); } else { - // Mongos pooled connections are authenticated through - // ShardingConnectionHook::onCreate(). + // Mongos pooled connections are authenticated through ShardingConnectionHook::onCreate() } LOG(3) << "dbclient_rs selecting node " << _lastSlaveOkHost << endl; diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 92123979b87..6f59c76a4e3 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -930,7 +930,6 @@ env.Library( "index/index_descriptor", "index_d", "introspect", - 'keys_collection_client_direct', "matcher/expressions_mongod_only", "op_observer_d", "ops/write_ops_parsers", @@ -972,6 +971,7 @@ env.Library( "update/update_driver", "update_index_data", "views/views_mongod", + 'keys_collection_client_direct', ], ) diff --git a/src/mongo/db/commands/conn_pool_stats.cpp b/src/mongo/db/commands/conn_pool_stats.cpp index ac1a87973e8..d85c484dd22 100644 --- a/src/mongo/db/commands/conn_pool_stats.cpp +++ b/src/mongo/db/commands/conn_pool_stats.cpp @@ -1,4 +1,4 @@ -/* +/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify @@ -40,6 +40,7 @@ #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/task_executor_pool.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/grid.h" namespace mongo { @@ -114,5 +115,48 @@ public: } poolStatsCmd; +class ShardedPoolStats final : public BasicCommand { +public: + ShardedPoolStats() : BasicCommand("shardConnPoolStats") {} + + std::string help() const override { + return "stats about the shard connection pool"; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + + AllowedOnSecondary secondaryAllowed() const override { + return AllowedOnSecondary::kAlways; + } + + /** + * Requires the same privileges as the connPoolStats command. + */ + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) const override { + ActionSet actions; + actions.addAction(ActionType::connPoolStats); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + + bool run(OperationContext* opCtx, + const std::string& dbname, + const mongo::BSONObj& cmdObj, + mongo::BSONObjBuilder& result) override { + // Connection information + executor::ConnectionPoolStats stats{}; + shardConnectionPool.appendConnectionStats(&stats); + stats.appendToBSON(result); + + // Thread connection information + ShardConnection::reportActiveClientConnections(&result); + return true; + } + +} shardedPoolStatsCmd; + } // namespace } // namespace mongo diff --git a/src/mongo/db/commands/conn_pool_sync.cpp b/src/mongo/db/commands/conn_pool_sync.cpp index cd3526bb885..12e279d15fa 100644 --- a/src/mongo/db/commands/conn_pool_sync.cpp +++ b/src/mongo/db/commands/conn_pool_sync.cpp @@ -28,44 +28,47 @@ #include "mongo/platform/basic.h" -#include <sstream> - -#include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" #include "mongo/db/commands.h" #include "mongo/s/client/shard_connection.h" namespace mongo { +namespace { class PoolFlushCmd : public BasicCommand { public: PoolFlushCmd() : BasicCommand("connPoolSync", "connpoolsync") {} + std::string help() const override { return "internal"; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { + + AllowedOnSecondary secondaryAllowed() const override { + return AllowedOnSecondary::kAlways; + } + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) const override { ActionSet actions; actions.addAction(ActionType::connPoolSync); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - virtual bool run(OperationContext* opCtx, - const std::string&, - const mongo::BSONObj&, - mongo::BSONObjBuilder& result) { + bool run(OperationContext* opCtx, + const std::string&, + const mongo::BSONObj&, + mongo::BSONObjBuilder& result) override { shardConnectionPool.flush(); globalConnPool.flush(); return true; } - AllowedOnSecondary secondaryAllowed() const override { - return AllowedOnSecondary::kAlways; - } } poolFlushCmd; +} // namespace } // namespace mongo diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index b999b982835..be9817a7b59 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -104,16 +104,16 @@ const DeleteStats* getDeleteStats(const PlanExecutor* exec) { * to return to the client. If no matching document to update or remove was found, then none * is returned. Otherwise, the updated or deleted document is returned. * - * If the operation failed, then an error Status is returned. + * If the operation failed, throws. */ -StatusWith<boost::optional<BSONObj>> advanceExecutor(OperationContext* opCtx, - PlanExecutor* exec, - bool isRemove) { +boost::optional<BSONObj> advanceExecutor(OperationContext* opCtx, + PlanExecutor* exec, + bool isRemove) { BSONObj value; PlanExecutor::ExecState state = exec->getNext(&value, nullptr); if (PlanExecutor::ADVANCED == state) { - return boost::optional<BSONObj>(std::move(value)); + return {std::move(value)}; } if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { @@ -121,19 +121,18 @@ StatusWith<boost::optional<BSONObj>> advanceExecutor(OperationContext* opCtx, << ", stats: " << redact(Explain::getWinningPlanStats(exec)); if (WorkingSetCommon::isValidStatusMemberObject(value)) { - const Status errorStatus = WorkingSetCommon::getMemberObjectStatus(value); - invariant(!errorStatus.isOK()); - return errorStatus; + uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(value)); + MONGO_UNREACHABLE; } - const std::string opstr = isRemove ? "delete" : "update"; - return {ErrorCodes::OperationFailed, - str::stream() << "executor returned " << PlanExecutor::statestr(state) - << " while executing " - << opstr}; + + uasserted(ErrorCodes::OperationFailed, + str::stream() << "executor returned " << PlanExecutor::statestr(state) + << " while executing " + << (isRemove ? "delete" : "update")); } invariant(state == PlanExecutor::IS_EOF); - return boost::optional<BSONObj>(boost::none); + return boost::none; } void makeUpdateRequest(const FindAndModifyRequest& args, @@ -185,14 +184,11 @@ void appendCommandResponse(const PlanExecutor* exec, } } -Status checkCanAcceptWritesForDatabase(OperationContext* opCtx, const NamespaceString& nsString) { - if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nsString)) { - return Status(ErrorCodes::NotMaster, - str::stream() - << "Not primary while running findAndModify command on collection " - << nsString.ns()); - } - return Status::OK(); +void assertCanWrite(OperationContext* opCtx, const NamespaceString& nsString) { + uassert(ErrorCodes::NotMaster, + str::stream() << "Not primary while running findAndModify command on collection " + << nsString.ns(), + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nsString)); } void recordStatsForTopCommand(OperationContext* opCtx) { @@ -251,20 +247,12 @@ public: ExplainOptions::Verbosity verbosity, BSONObjBuilder* out) const override { const NamespaceString fullNs = CommandHelpers::parseNsCollectionRequired(dbName, cmdObj); - Status allowedWriteStatus = userAllowedWriteNS(fullNs.ns()); - if (!allowedWriteStatus.isOK()) { - return allowedWriteStatus; - } - - StatusWith<FindAndModifyRequest> parseStatus = - FindAndModifyRequest::parseFromBSON(NamespaceString(fullNs.ns()), cmdObj); - if (!parseStatus.isOK()) { - return parseStatus.getStatus(); - } + uassertStatusOK(userAllowedWriteNS(fullNs.ns())); - const FindAndModifyRequest& args = parseStatus.getValue(); + const auto args(uassertStatusOK(FindAndModifyRequest::parseFromBSON(fullNs, cmdObj))); const NamespaceString& nsString = args.getNamespaceString(); - OpDebug* opDebug = &CurOp::get(opCtx)->debug(); + auto const curOp = CurOp::get(opCtx); + OpDebug* const opDebug = &curOp->debug(); if (args.isRemove()) { DeleteRequest request(nsString); @@ -272,29 +260,22 @@ public: makeDeleteRequest(args, isExplain, &request); ParsedDelete parsedDelete(opCtx, &request); - Status parsedDeleteStatus = parsedDelete.parseRequest(); - if (!parsedDeleteStatus.isOK()) { - return parsedDeleteStatus; - } + uassertStatusOK(parsedDelete.parseRequest()); // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. AutoGetCollection autoColl(opCtx, nsString, MODE_IX); - if (!autoColl.getDb()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "database " << dbName << " does not exist."}; - } + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "database " << dbName << " does not exist", + autoColl.getDb()); auto css = CollectionShardingState::get(opCtx, nsString); css->checkShardVersionOrThrow(opCtx); Collection* const collection = autoColl.getCollection(); - auto statusWithPlanExecutor = - getExecutorDelete(opCtx, opDebug, collection, &parsedDelete); - if (!statusWithPlanExecutor.isOK()) { - return statusWithPlanExecutor.getStatus(); - } - const auto exec = std::move(statusWithPlanExecutor.getValue()); + const auto exec = + uassertStatusOK(getExecutorDelete(opCtx, opDebug, collection, &parsedDelete)); + Explain::explainStages(exec.get(), collection, verbosity, out); } else { UpdateRequest request(nsString); @@ -303,29 +284,22 @@ public: makeUpdateRequest(args, isExplain, &updateLifecycle, &request); ParsedUpdate parsedUpdate(opCtx, &request); - Status parsedUpdateStatus = parsedUpdate.parseRequest(); - if (!parsedUpdateStatus.isOK()) { - return parsedUpdateStatus; - } + uassertStatusOK(parsedUpdate.parseRequest()); // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. AutoGetCollection autoColl(opCtx, nsString, MODE_IX); - if (!autoColl.getDb()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "database " << dbName << " does not exist."}; - } + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "database " << dbName << " does not exist", + autoColl.getDb()); auto css = CollectionShardingState::get(opCtx, nsString); css->checkShardVersionOrThrow(opCtx); Collection* const collection = autoColl.getCollection(); - auto statusWithPlanExecutor = - getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate); - if (!statusWithPlanExecutor.isOK()) { - return statusWithPlanExecutor.getStatus(); - } - const auto exec = std::move(statusWithPlanExecutor.getValue()); + const auto exec = + uassertStatusOK(getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate)); + Explain::explainStages(exec.get(), collection, verbosity, out); } @@ -338,20 +312,14 @@ public: BSONObjBuilder& result) override { // findAndModify command is not replicated directly. invariant(opCtx->writesAreReplicated()); - const NamespaceString fullNs = CommandHelpers::parseNsCollectionRequired(dbName, cmdObj); - Status allowedWriteStatus = userAllowedWriteNS(fullNs.ns()); - if (!allowedWriteStatus.isOK()) { - return CommandHelpers::appendCommandStatus(result, allowedWriteStatus); - } - StatusWith<FindAndModifyRequest> parseStatus = - FindAndModifyRequest::parseFromBSON(NamespaceString(fullNs.ns()), cmdObj); - if (!parseStatus.isOK()) { - return CommandHelpers::appendCommandStatus(result, parseStatus.getStatus()); - } + const NamespaceString fullNs = CommandHelpers::parseNsCollectionRequired(dbName, cmdObj); + uassertStatusOK(userAllowedWriteNS(fullNs.ns())); - const FindAndModifyRequest& args = parseStatus.getValue(); + const auto args(uassertStatusOK(FindAndModifyRequest::parseFromBSON(fullNs, cmdObj))); const NamespaceString& nsString = args.getNamespaceString(); + auto const curOp = CurOp::get(opCtx); + OpDebug* const opDebug = &curOp->debug(); boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) @@ -369,9 +337,6 @@ public: } } - auto curOp = CurOp::get(opCtx); - OpDebug* opDebug = &curOp->debug(); - // Although usually the PlanExecutor handles WCE internally, it will throw WCEs when it is // executing a findAndModify. This is done to ensure that we can always match, modify, and // return the document under concurrency, if a matching document exists. @@ -386,16 +351,11 @@ public: } ParsedDelete parsedDelete(opCtx, &request); - Status parsedDeleteStatus = parsedDelete.parseRequest(); - if (!parsedDeleteStatus.isOK()) { - CommandHelpers::appendCommandStatus(result, parsedDeleteStatus); - return false; - } + uassertStatusOK(parsedDelete.parseRequest()); AutoGetOrCreateDb autoDb(opCtx, dbName, MODE_IX); Lock::CollectionLock collLock(opCtx->lockState(), nsString.ns(), MODE_IX); - // Attach the namespace and database profiling level to the current op. { stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), @@ -405,11 +365,7 @@ public: auto css = CollectionShardingState::get(opCtx, nsString); css->checkShardVersionOrThrow(opCtx); - Status isPrimary = checkCanAcceptWritesForDatabase(opCtx, nsString); - if (!isPrimary.isOK()) { - CommandHelpers::appendCommandStatus(result, isPrimary); - return false; - } + assertCanWrite(opCtx, nsString); Collection* const collection = autoDb.getDb()->getCollection(opCtx, nsString); if (!collection && autoDb.getDb()->getViewCatalog()->lookup(opCtx, nsString.ns())) { @@ -418,25 +374,15 @@ public: "findAndModify not supported on a view"}); return false; } - auto statusWithPlanExecutor = - getExecutorDelete(opCtx, opDebug, collection, &parsedDelete); - if (!statusWithPlanExecutor.isOK()) { - CommandHelpers::appendCommandStatus(result, statusWithPlanExecutor.getStatus()); - return false; - } - const auto exec = std::move(statusWithPlanExecutor.getValue()); + const auto exec = + uassertStatusOK(getExecutorDelete(opCtx, opDebug, collection, &parsedDelete)); { stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); } - StatusWith<boost::optional<BSONObj>> advanceStatus = - advanceExecutor(opCtx, exec.get(), args.isRemove()); - if (!advanceStatus.isOK()) { - CommandHelpers::appendCommandStatus(result, advanceStatus.getStatus()); - return false; - } + auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); // Nothing after advancing the plan executor should throw a WriteConflictException, // so the following bookkeeping with execution stats won't end up being done // multiple times. @@ -458,8 +404,7 @@ public: } recordStatsForTopCommand(opCtx); - appendCommandResponse( - exec.get(), args.isRemove(), advanceStatus.getValue(), &result); + appendCommandResponse(exec.get(), args.isRemove(), docFound, &result); } else { UpdateRequest request(nsString); UpdateLifecycleImpl updateLifecycle(nsString); @@ -471,16 +416,11 @@ public: } ParsedUpdate parsedUpdate(opCtx, &request); - Status parsedUpdateStatus = parsedUpdate.parseRequest(); - if (!parsedUpdateStatus.isOK()) { - CommandHelpers::appendCommandStatus(result, parsedUpdateStatus); - return false; - } + uassertStatusOK(parsedUpdate.parseRequest()); AutoGetOrCreateDb autoDb(opCtx, dbName, MODE_IX); Lock::CollectionLock collLock(opCtx->lockState(), nsString.ns(), MODE_IX); - // Attach the namespace and database profiling level to the current op. { stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), @@ -490,11 +430,7 @@ public: auto css = CollectionShardingState::get(opCtx, nsString); css->checkShardVersionOrThrow(opCtx); - Status isPrimary = checkCanAcceptWritesForDatabase(opCtx, nsString); - if (!isPrimary.isOK()) { - CommandHelpers::appendCommandStatus(result, isPrimary); - return false; - } + assertCanWrite(opCtx, nsString); Collection* collection = autoDb.getDb()->getCollection(opCtx, nsString.ns()); if (!collection && autoDb.getDb()->getViewCatalog()->lookup(opCtx, nsString.ns())) { @@ -511,11 +447,7 @@ public: // in exclusive mode in order to create the collection. collLock.relockAsDatabaseExclusive(autoDb.lock()); collection = autoDb.getDb()->getCollection(opCtx, nsString); - Status isPrimaryAfterRelock = checkCanAcceptWritesForDatabase(opCtx, nsString); - if (!isPrimaryAfterRelock.isOK()) { - CommandHelpers::appendCommandStatus(result, isPrimaryAfterRelock); - return false; - } + assertCanWrite(opCtx, nsString); if (collection) { // Someone else beat us to creating the collection, do nothing. @@ -534,25 +466,15 @@ public: } } - auto statusWithPlanExecutor = - getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate); - if (!statusWithPlanExecutor.isOK()) { - CommandHelpers::appendCommandStatus(result, statusWithPlanExecutor.getStatus()); - return false; - } - const auto exec = std::move(statusWithPlanExecutor.getValue()); + const auto exec = + uassertStatusOK(getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate)); { stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); } - StatusWith<boost::optional<BSONObj>> advanceStatus = - advanceExecutor(opCtx, exec.get(), args.isRemove()); - if (!advanceStatus.isOK()) { - CommandHelpers::appendCommandStatus(result, advanceStatus.getStatus()); - return false; - } + auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); // Nothing after advancing the plan executor should throw a WriteConflictException, // so the following bookkeeping with execution stats won't end up being done // multiple times. @@ -572,8 +494,7 @@ public: } recordStatsForTopCommand(opCtx); - appendCommandResponse( - exec.get(), args.isRemove(), advanceStatus.getValue(), &result); + appendCommandResponse(exec.get(), args.isRemove(), docFound, &result); } return true; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d09fc1ab9de..82c3f51f3f8 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -87,7 +87,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/dcommands_fcv', '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/db/repl/repl_coordinator_global', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/s/client/shard_local', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/is_mongos', diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp index e8f93fc34f3..9045cbef69a 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp @@ -76,17 +76,12 @@ namespace mongo { namespace { -using std::vector; - using CallbackHandle = executor::TaskExecutor::CallbackHandle; using CallbackArgs = executor::TaskExecutor::CallbackArgs; using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; -const Seconds kDefaultFindHostMaxWaitTime(20); - const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); -const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); /** * Generates a unique name to be given to a newly added shard. @@ -715,7 +710,7 @@ StatusWith<std::string> ShardingCatalogManager::addShard( ->catalogClient() ->logChange( opCtx, "addShard", "", shardDetails.obj(), ShardingCatalogClient::kMajorityWriteConcern) - .transitional_ignore(); + .ignore(); // Ensure the added shard is visible to this process. auto shardRegistry = Grid::get(opCtx)->shardRegistry(); @@ -789,7 +784,7 @@ StatusWith<ShardDrainingStatus> ShardingCatalogManager::removeShard(OperationCon "", BSON("shard" << name), ShardingCatalogClient::kLocalWriteConcern) - .transitional_ignore(); + .ignore(); return ShardDrainingStatus::STARTED; } @@ -845,7 +840,7 @@ StatusWith<ShardDrainingStatus> ShardingCatalogManager::removeShard(OperationCon "", BSON("shard" << name), ShardingCatalogClient::kLocalWriteConcern) - .transitional_ignore(); + .ignore(); return ShardDrainingStatus::COMPLETED; } @@ -884,7 +879,7 @@ BSONObj ShardingCatalogManager::createShardIdentityUpsertForAddShard(OperationCo // static StatusWith<ShardId> ShardingCatalogManager::_selectShardForNewDatabase( OperationContext* opCtx, ShardRegistry* shardRegistry) { - vector<ShardId> allShardIds; + std::vector<ShardId> allShardIds; shardRegistry->getAllShardIds(&allShardIds); if (allShardIds.empty()) { diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index a575b0a3018..c6f2496bc3d 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -36,7 +36,7 @@ #include <vector> #include "mongo/client/connpool.h" -#include "mongo/db/auth/authorization_manager_global.h" +#include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/document_validation.h" @@ -47,15 +47,13 @@ #include "mongo/db/operation_context.h" #include "mongo/db/ops/delete.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/move_timing_helper.h" -#include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" #include "mongo/util/concurrency/notification.h" @@ -66,6 +64,9 @@ namespace mongo { namespace { +const auto getMigrationDestinationManager = + ServiceContext::declareDecoration<MigrationDestinationManager>(); + const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, // Note: Even though we're setting UNSET here, // kMajority implies JOURNAL if journaling is @@ -210,6 +211,10 @@ MigrationDestinationManager::MigrationDestinationManager() = default; MigrationDestinationManager::~MigrationDestinationManager() = default; +MigrationDestinationManager* MigrationDestinationManager::get(OperationContext* opCtx) { + return &getMigrationDestinationManager(opCtx->getServiceContext()); +} + MigrationDestinationManager::State MigrationDestinationManager::getState() const { stdx::lock_guard<stdx::mutex> sl(_mutex); return _state; @@ -428,7 +433,7 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, auto opCtx = Client::getCurrent()->makeOperationContext(); - if (getGlobalAuthorizationManager()->isAuthEnabled()) { + if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) { AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(); } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index cc59483d699..bcbfb65facb 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -69,6 +69,13 @@ public: MigrationDestinationManager(); ~MigrationDestinationManager(); + /** + * Returns the singleton instance of the migration destination manager. + * + * TODO (SERVER-25333): This should become per-collection instance instead of singleton. + */ + static MigrationDestinationManager* get(OperationContext* opCtx); + State getState() const; void setState(State newState); diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 37cde498251..98c2b85cd18 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -30,11 +30,6 @@ #include "mongo/platform/basic.h" -#include <algorithm> -#include <map> -#include <string> -#include <vector> - #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" @@ -43,6 +38,7 @@ #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/db/s/chunk_move_write_concern_options.h" +#include "mongo/db/s/migration_destination_manager.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/chunk_version.h" #include "mongo/s/request_types/migration_secondary_throttle_options.h" @@ -50,9 +46,6 @@ #include "mongo/util/log.h" namespace mongo { - -using std::string; - namespace { class RecvChunkStartCommand : public ErrmsgCommandDeprecated { @@ -67,28 +60,28 @@ public: return AllowedOnSecondary::kNever; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { // This is required to be true to support moveChunk. return true; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) const override { ActionSet actions; actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } bool errmsgRun(OperationContext* opCtx, - const string&, + const std::string& dbname, const BSONObj& cmdObj, - string& errmsg, - BSONObjBuilder& result) { + std::string& errmsg, + BSONObjBuilder& result) override { auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); @@ -136,7 +129,7 @@ public: auto scopedRegisterReceiveChunk( uassertStatusOK(shardingState->registerReceiveChunk(nss, chunkRange, fromShard))); - uassertStatusOK(shardingState->migrationDestinationManager()->start( + uassertStatusOK(MigrationDestinationManager::get(opCtx)->start( nss, std::move(scopedRegisterReceiveChunk), migrationSessionId, @@ -167,27 +160,27 @@ public: return AllowedOnSecondary::kNever; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) const override { ActionSet actions; actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } bool run(OperationContext* opCtx, - const string&, + const std::string& dbname, const BSONObj& cmdObj, - BSONObjBuilder& result) { - ShardingState::get(opCtx)->migrationDestinationManager()->report(result); + BSONObjBuilder& result) override { + MigrationDestinationManager::get(opCtx)->report(result); return true; } @@ -205,29 +198,29 @@ public: return AllowedOnSecondary::kNever; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) const override { ActionSet actions; actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } bool run(OperationContext* opCtx, - const string&, + const std::string& dbname, const BSONObj& cmdObj, - BSONObjBuilder& result) { + BSONObjBuilder& result) override { auto const sessionId = uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)); - auto mdm = ShardingState::get(opCtx)->migrationDestinationManager(); + auto const mdm = MigrationDestinationManager::get(opCtx); Status const status = mdm->startCommit(sessionId); mdm->report(result); if (!status.isOK()) { @@ -251,28 +244,27 @@ public: return AllowedOnSecondary::kNever; } - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) const override { ActionSet actions; actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } bool run(OperationContext* opCtx, - const string&, + const std::string&, const BSONObj& cmdObj, - BSONObjBuilder& result) { - auto const mdm = ShardingState::get(opCtx)->migrationDestinationManager(); + BSONObjBuilder& result) override { + auto const mdm = MigrationDestinationManager::get(opCtx); auto migrationSessionIdStatus(MigrationSessionId::extractFromBSON(cmdObj)); @@ -287,6 +279,7 @@ public: mdm->abortWithoutSessionIdCheck(); mdm->report(result); } + uassertStatusOK(migrationSessionIdStatus.getStatus()); return true; } diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index f14815502b3..c42d0c5f111 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -37,7 +37,6 @@ #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/migration_destination_manager.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/stdx/functional.h" @@ -111,10 +110,6 @@ public: std::string getShardName(); - MigrationDestinationManager* migrationDestinationManager() { - return &_migrationDestManager; - } - /** * Initializes the sharding state of this server from the shard identity document argument * and sets secondary or primary state information on the catalog cache loader. @@ -303,9 +298,6 @@ private: */ ChunkVersion _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss); - // Manages the state of the migration recipient shard - MigrationDestinationManager _migrationDestManager; - // Tracks the active move chunk operations running on this shard ActiveMigrationsRegistry _activeMigrationsRegistry; diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index f5c8e90935d..a0142e2a2fd 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -37,12 +37,12 @@ #include "mongo/db/sessions_collection_rs.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/commands/cluster_write.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/net/op_msg.h" namespace mongo { diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index a05befab239..940583a992e 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -64,7 +64,6 @@ #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/client/shard.h" -#include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/set_shard_version_request.h" diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index 033c6e947ea..0e7dfb0b845 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -34,9 +34,7 @@ #include <set> -#include "mongo/db/commands.h" #include "mongo/db/lasterror.h" -#include "mongo/executor/connection_pool_stats.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" @@ -50,112 +48,48 @@ #include "mongo/util/stacktrace.h" namespace mongo { - -using std::unique_ptr; -using std::map; -using std::set; -using std::string; -using std::stringstream; -using std::vector; - namespace { class ClientConnections; /** - * Class which tracks ClientConnections (the client connection pool) for each incoming - * connection, allowing stats access. + * Class which tracks ClientConnections (the client connection pool) for each incoming connection, + * allowing stats access. */ class ActiveClientConnections { public: void add(const ClientConnections* cc) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); _clientConnections.insert(cc); } void remove(const ClientConnections* cc) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); _clientConnections.erase(cc); } - void appendInfo(BSONObjBuilder& b); + void appendInfo(BSONObjBuilder* b) const; private: - stdx::mutex _mutex; - set<const ClientConnections*> _clientConnections; + mutable stdx::mutex _mutex; + std::set<const ClientConnections*> _clientConnections; } activeClientConnections; /** - * Command to allow access to the sharded conn pool information in mongos. - */ -class ShardedPoolStats : public BasicCommand { -public: - ShardedPoolStats() : BasicCommand("shardConnPoolStats") {} - std::string help() const override { - return "stats about the shard connection pool"; - } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } - AllowedOnSecondary secondaryAllowed() const override { - return AllowedOnSecondary::kAlways; - } - - // Same privs as connPoolStats - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { - ActionSet actions; - actions.addAction(ActionType::connPoolStats); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - - virtual bool run(OperationContext* opCtx, - const string& dbname, - const mongo::BSONObj& cmdObj, - mongo::BSONObjBuilder& result) { - // Connection information - executor::ConnectionPoolStats stats{}; - shardConnectionPool.appendConnectionStats(&stats); - stats.appendToBSON(result); - - // Thread connection information - activeClientConnections.appendInfo(result); - - return true; - } - -} shardedPoolStatsCmd; - -/** - * holds all the actual db connections for a client to various servers 1 per thread, so - * doesn't have to be thread safe. + * Holds all the actual db connections for a client to various servers 1 per thread, so doesn't have + * to be thread safe. */ class ClientConnections { MONGO_DISALLOW_COPYING(ClientConnections); public: struct Status { - Status() : created(0), avail(0) {} - - // May be read concurrently, but only written from - // this thread. - long long created; - DBClientBase* avail; + // May be read concurrently, but only written from this thread + long long created = 0; + DBClientBase* avail = nullptr; }; - // Gets or creates the status object for the host - Status* _getStatus(const string& addr) { - scoped_spinlock lock(_lock); - Status*& temp = _hosts[addr]; - if (!temp) { - temp = new Status(); - } - - return temp; - } - ClientConnections() { // Start tracking client connections activeClientConnections.add(this); @@ -168,11 +102,45 @@ public: releaseAll(true); } + static ClientConnections* threadInstance() { + if (!_perThread) { + _perThread = stdx::make_unique<ClientConnections>(); + } + return _perThread.get(); + } + + DBClientBase* get(const std::string& addr, const std::string& ns) { + { + // We want to report ns stats + scoped_spinlock lock(_lock); + if (ns.size() > 0) + _seenNS.insert(ns); + } + + Status* s = _getStatus(addr); + + std::unique_ptr<DBClientBase> c; + if (s->avail) { + c.reset(s->avail); + s->avail = 0; + + // May throw an exception + shardConnectionPool.onHandedOut(c.get()); + } else { + c.reset(shardConnectionPool.get(addr)); + + // After, so failed creation doesn't get counted + s->created++; + } + + return c.release(); + } + void releaseAll(bool fromDestructor = false) { // Don't need spinlock protection because if not in the destructor, we don't modify // _hosts, and if in the destructor we are not accessible to external threads. for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) { - const string addr = i->first; + const auto addr = i->first; Status* ss = i->second; invariant(ss); @@ -202,34 +170,7 @@ public: } } - DBClientBase* get(const string& addr, const string& ns) { - { - // We want to report ns stats - scoped_spinlock lock(_lock); - if (ns.size() > 0) - _seenNS.insert(ns); - } - - Status* s = _getStatus(addr); - - unique_ptr<DBClientBase> c; - if (s->avail) { - c.reset(s->avail); - s->avail = 0; - - // May throw an exception - shardConnectionPool.onHandedOut(c.get()); - } else { - c.reset(shardConnectionPool.get(addr)); - - // After, so failed creation doesn't get counted - s->created++; - } - - return c.release(); - } - - void done(const string& addr, DBClientBase* conn) { + void done(const std::string& addr, DBClientBase* conn) { Status* s = _hosts[addr]; verify(s); @@ -263,17 +204,18 @@ public: return; } - // Note: Although we try our best to clear bad connections as much as possible, - // some of them can still slip through because of how ClientConnections are being - // used - as thread local variables. This means that threads won't be able to - // see the s->avail connection of other threads. - + // Note: Although we try our best to clear bad connections as much as possible, some of them + // can still slip through because of how ClientConnections are being used - as thread local + // variables. This means that threads won't be able to see the s->avail connection of other + // threads. s->avail = conn; } - void checkVersions(OperationContext* opCtx, const string& ns) { - vector<ShardId> all; - grid.shardRegistry()->getAllShardIds(&all); + void checkVersions(OperationContext* opCtx, const std::string& ns) { + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + + std::vector<ShardId> all; + shardRegistry->getAllShardIds(&all); // Don't report exceptions here as errors in GetLastError LastError::Disabled ignoreForGLE(&LastError::get(cc())); @@ -281,16 +223,16 @@ public: // Now only check top-level shard connections for (const ShardId& shardId : all) { try { - auto shardStatus = grid.shardRegistry()->getShard(opCtx, shardId); + auto shardStatus = shardRegistry->getShard(opCtx, shardId); if (!shardStatus.isOK()) { invariant(shardStatus == ErrorCodes::ShardNotFound); continue; } const auto shard = shardStatus.getValue(); - string sconnString = shard->getConnString().toString(); - Status* s = _getStatus(sconnString); + const auto sconnString = shard->getConnString().toString(); + Status* s = _getStatus(sconnString); if (!s->avail) { s->avail = shardConnectionPool.get(sconnString); s->created++; // After, so failed creation doesn't get counted @@ -298,22 +240,41 @@ public: versionManager.checkShardVersionCB(opCtx, s->avail, ns, false, 1); } catch (const DBException& ex) { - warning() << "problem while initially checking shard versions on" - << " " << shardId << causedBy(ex); + warning() << "Problem while initially checking shard versions on" + << " " << shardId << causedBy(redact(ex)); - // NOTE: This is only a heuristic, to avoid multiple stale version retries - // across multiple shards, and does not affect correctness. + // NOTE: This is only a heuristic, to avoid multiple stale version retries across + // multiple shards, and does not affect correctness. } } } - void release(const string& addr, DBClientBase* conn) { + void release(const std::string& addr, DBClientBase* conn) { shardConnectionPool.release(addr, conn); } + void forgetNS(const std::string& ns) { + scoped_spinlock lock(_lock); + _seenNS.erase(ns); + } + + /** + * Clears the connections kept by this pool (ie, not including the global pool) + */ + void clearPool() { + for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) { + if (iter->second->avail != NULL) { + delete iter->second->avail; + } + delete iter->second; + } + + _hosts.clear(); + } + /** - * Appends info about the client connection pool to a BOBuilder - * Safe to call with activeClientConnections lock + * Appends info about the client connection pool to a BSONObjBuilder. Safe to call with + * activeClientConnections lock. */ void appendInfo(BSONObjBuilder& b) const { scoped_spinlock lock(_lock); @@ -329,77 +290,61 @@ public: hostsArrB.done(); BSONArrayBuilder nsArrB(b.subarrayStart("seenNS")); - for (set<string>::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i) { - nsArrB.append(*i); + for (const auto& ns : _seenNS) { + nsArrB.append(ns); } nsArrB.done(); } - // Protects only the creation of new entries in the _hosts and _seenNS map - // from external threads. Reading _hosts / _seenNS in this thread doesn't - // need protection. - mutable SpinLock _lock; - typedef map<string, Status*, DBConnectionPool::serverNameCompare> HostMap; - HostMap _hosts; - set<string> _seenNS; - +private: /** - * Clears the connections kept by this pool (ie, not including the global pool) + * Gets or creates the status object for the host. */ - void clearPool() { - for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) { - if (iter->second->avail != NULL) { - delete iter->second->avail; - } - delete iter->second; + Status* _getStatus(const std::string& addr) { + scoped_spinlock lock(_lock); + Status*& temp = _hosts[addr]; + if (!temp) { + temp = new Status(); } - _hosts.clear(); - } - - void forgetNS(const string& ns) { - scoped_spinlock lock(_lock); - _seenNS.erase(ns); + return temp; } - // ----- - static thread_local std::unique_ptr<ClientConnections> _perThread; - static ClientConnections* threadInstance() { - if (!_perThread) { - _perThread = stdx::make_unique<ClientConnections>(); - } - return _perThread.get(); - } -}; + // Protects only the creation of new entries in the _hosts and _seenNS map from external + // threads. Reading _hosts/_seenNS in this thread doesn't need protection. + mutable SpinLock _lock; -thread_local std::unique_ptr<ClientConnections> ClientConnections::_perThread; + using HostMap = std::map<std::string, Status*, DBConnectionPool::serverNameCompare>; + HostMap _hosts; + std::set<std::string> _seenNS; +}; -void ActiveClientConnections::appendInfo(BSONObjBuilder& b) { - BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads +void ActiveClientConnections::appendInfo(BSONObjBuilder* b) const { + // Preallocate the buffer because there may be quite a few threads to report + BSONArrayBuilder arr(64 * 1024); { stdx::lock_guard<stdx::mutex> lock(_mutex); - for (set<const ClientConnections*>::const_iterator i = _clientConnections.begin(); - i != _clientConnections.end(); - ++i) { + for (const auto* conn : _clientConnections) { BSONObjBuilder bb(arr.subobjStart()); - (*i)->appendInfo(bb); - bb.done(); + conn->appendInfo(bb); + bb.doneFast(); } } - b.appendArray("threads", arr.obj()); + b->appendArray("threads", arr.obj()); } +thread_local std::unique_ptr<ClientConnections> ClientConnections::_perThread; + } // namespace -// The global connection pool DBConnectionPool shardConnectionPool; ShardConnection::ShardConnection(const ConnectionString& connectionString, - const string& ns, + const std::string& ns, std::shared_ptr<ChunkManager> manager) : _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) { invariant(_cs.isValid()); @@ -450,7 +395,7 @@ void ShardConnection::_finishInit() { _setVersion = versionManager.checkShardVersionCB(opCtx, this, false, 1); } else { // Make sure we didn't specify a manager for a non-versionable connection (i.e. config) - verify(!_manager); + invariant(!_manager); _setVersion = false; } } @@ -458,7 +403,7 @@ void ShardConnection::_finishInit() { void ShardConnection::done() { if (_conn) { ClientConnections::threadInstance()->done(_cs.toString(), _conn); - _conn = 0; + _conn = nullptr; _finishedInit = true; } } @@ -481,7 +426,11 @@ void ShardConnection::kill() { } } -void ShardConnection::checkMyConnectionVersions(OperationContext* opCtx, const string& ns) { +void ShardConnection::reportActiveClientConnections(BSONObjBuilder* builder) { + activeClientConnections.appendInfo(builder); +} + +void ShardConnection::checkMyConnectionVersions(OperationContext* opCtx, const std::string& ns) { ClientConnections::threadInstance()->checkVersions(opCtx, ns); } @@ -494,7 +443,7 @@ void ShardConnection::clearPool() { ClientConnections::threadInstance()->clearPool(); } -void ShardConnection::forgetNS(const string& ns) { +void ShardConnection::forgetNS(const std::string& ns) { ClientConnections::threadInstance()->forgetNS(ns); } diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h index 62afb593f74..bfe3237e434 100644 --- a/src/mongo/s/client/shard_connection.h +++ b/src/mongo/s/client/shard_connection.h @@ -28,8 +28,6 @@ #pragma once -#include <string> - #include "mongo/client/connpool.h" namespace mongo { @@ -112,6 +110,9 @@ public: return _conn != NULL; } + /** reports all thread local connections on this instance */ + static void reportActiveClientConnections(BSONObjBuilder* builder); + /** checks all of my thread local connections for the version of this ns */ static void checkMyConnectionVersions(OperationContext* opCtx, const std::string& ns); @@ -145,6 +146,9 @@ private: bool _setVersion; }; +/** + * Global sharded connection pool, used by all instances of ShardConnection. + */ extern DBConnectionPool shardConnectionPool; } // namespace mongo diff --git a/src/mongo/s/client/shard_connection_test.cpp b/src/mongo/s/client/shard_connection_test.cpp index aa43bf8f740..dc734b82640 100644 --- a/src/mongo/s/client/shard_connection_test.cpp +++ b/src/mongo/s/client/shard_connection_test.cpp @@ -30,19 +30,11 @@ #include <cstdint> #include <vector> -#include "mongo/base/init.h" -#include "mongo/client/connpool.h" -#include "mongo/db/auth/authorization_manager.h" -#include "mongo/db/auth/authorization_manager_global.h" -#include "mongo/db/auth/authz_manager_external_state_mock.h" #include "mongo/db/client.h" #include "mongo/db/service_context.h" -#include "mongo/db/service_context_noop.h" #include "mongo/dbtests/mock/mock_conn_registry.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/s/client/shard_connection.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" #include "mongo/util/net/socket_exception.h" diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 3cfa6aa678b..aa8279ed2f1 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -87,8 +87,8 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmongos', '$BUILD_DIR/mongo/db/commands/killcursors_common', - '$BUILD_DIR/mongo/db/ftdc/ftdc_server', '$BUILD_DIR/mongo/db/commands/write_commands_common', + '$BUILD_DIR/mongo/db/ftdc/ftdc_server', '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/pipeline/aggregation', '$BUILD_DIR/mongo/db/views/views', diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 154108b279a..6c3f604bdd2 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -52,7 +52,6 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/commands/pipeline_s.h" diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp index 2f985d2aee6..bb4775f2e30 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.cpp +++ b/src/mongo/s/commands/cluster_commands_helpers.cpp @@ -41,9 +41,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/parallel.h" -#include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/client/version_manager.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/create_database_gen.h" #include "mongo/s/shard_id.h" diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 836f1dc0081..90c2c2e08c8 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -42,10 +42,10 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/commands/cluster_explain.h" -#include "mongo/s/commands/cluster_write.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" +#include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/timer.h" namespace mongo { diff --git a/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp b/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp index a0a14dd054f..a6ca81416a4 100644 --- a/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp @@ -28,12 +28,7 @@ #include "mongo/platform/basic.h" -#include <set> -#include <string> - #include "mongo/db/commands.h" -#include "mongo/db/lasterror.h" -#include "mongo/s/client/shard_connection.h" namespace mongo { namespace { @@ -42,8 +37,7 @@ class GetPrevErrorCmd : public ErrmsgCommandDeprecated { public: GetPrevErrorCmd() : ErrmsgCommandDeprecated("getPrevError", "getpreverror") {} - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -55,19 +49,19 @@ public: return "get previous error (since last reseterror command)"; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) const { + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) const override { // No auth required } - virtual bool errmsgRun(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - std::string& errmsg, - BSONObjBuilder& result) { - errmsg += "getpreverror not supported for sharded environments"; - return false; + bool errmsgRun(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result) override { + uasserted(ErrorCodes::CommandNotSupported, + "getPrevError is not supported in sharded environments"); } } cmdGetPrevError; diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index d89a51531ea..c96ea7be671 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -48,7 +48,6 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" -#include "mongo/s/commands/cluster_write.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/shard_collection_gen.h" diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index ac87faa6ff1..f7ac158045a 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -55,7 +55,6 @@ #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" -#include "mongo/s/commands/cluster_write.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/migration_secondary_throttle_options.h" diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 79cab4816ef..667a2790280 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -41,12 +41,12 @@ #include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" -#include "mongo/s/commands/chunk_manager_targeter.h" #include "mongo/s/commands/cluster_explain.h" -#include "mongo/s/commands/cluster_write.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/s/write_ops/chunk_manager_targeter.h" +#include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript index 0bd003e8ddf..138c7e8083d 100644 --- a/src/mongo/s/write_ops/SConscript +++ b/src/mongo/s/write_ops/SConscript @@ -25,19 +25,19 @@ env.Library( env.Library( target='cluster_write_op', source=[ - 'write_op.cpp', - 'batch_write_op.cpp', 'batch_write_exec.cpp', - '$BUILD_DIR/mongo/s/commands/chunk_manager_targeter.cpp', - '$BUILD_DIR/mongo/s/commands/cluster_write.cpp', + 'batch_write_op.cpp', + 'chunk_manager_targeter.cpp', + 'cluster_write.cpp', + 'write_op.cpp', ], LIBDEPS=[ - 'batch_write_types', '$BUILD_DIR/mongo/client/connection_string', '$BUILD_DIR/mongo/s/async_requests_sender', '$BUILD_DIR/mongo/s/client/sharding_client', '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', '$BUILD_DIR/mongo/s/coreshard', + 'batch_write_types', ], ) diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index 8cde8fa627c..04b0d3a1e21 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/commands/chunk_manager_targeter.h" +#include "mongo/s/write_ops/chunk_manager_targeter.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/query/canonical_query.h" diff --git a/src/mongo/s/commands/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 6de41ed3fe7..6de41ed3fe7 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp index 517f71606d3..fe288bfbd56 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/write_ops/cluster_write.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/commands/cluster_write.h" +#include "mongo/s/write_ops/cluster_write.h" #include <algorithm> @@ -42,10 +42,10 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/chunk_manager_targeter.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" +#include "mongo/s/write_ops/chunk_manager_targeter.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" diff --git a/src/mongo/s/commands/cluster_write.h b/src/mongo/s/write_ops/cluster_write.h index f9face8ff80..f9face8ff80 100644 --- a/src/mongo/s/commands/cluster_write.h +++ b/src/mongo/s/write_ops/cluster_write.h |