summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/dbclient_rs.cpp3
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/commands/conn_pool_stats.cpp46
-rw-r--r--src/mongo/db/commands/conn_pool_sync.cpp31
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp187
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp13
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp15
-rw-r--r--src/mongo/db/s/migration_destination_manager.h7
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp77
-rw-r--r--src/mongo/db/s/sharding_state.h8
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp2
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp1
-rw-r--r--src/mongo/s/client/shard_connection.cpp293
-rw-r--r--src/mongo/s/client/shard_connection.h8
-rw-r--r--src/mongo/s/client/shard_connection_test.cpp8
-rw-r--r--src/mongo/s/commands/SConscript2
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp1
-rw-r--r--src/mongo/s/commands/cluster_commands_helpers.cpp2
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_get_prev_error_cmd.cpp28
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp1
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp1
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp4
-rw-r--r--src/mongo/s/write_ops/SConscript10
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp (renamed from src/mongo/s/commands/chunk_manager_targeter.cpp)2
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h (renamed from src/mongo/s/commands/chunk_manager_targeter.h)0
-rw-r--r--src/mongo/s/write_ops/cluster_write.cpp (renamed from src/mongo/s/commands/cluster_write.cpp)4
-rw-r--r--src/mongo/s/write_ops/cluster_write.h (renamed from src/mongo/s/commands/cluster_write.h)0
29 files changed, 326 insertions, 434 deletions
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp
index dec27398071..91f41fbf274 100644
--- a/src/mongo/client/dbclient_rs.cpp
+++ b/src/mongo/client/dbclient_rs.cpp
@@ -723,8 +723,7 @@ DBClientConnection* DBClientReplicaSet::selectNodeUsingTags(
if (_authPooledSecondaryConn) {
_authConnection(_lastSlaveOkConn.get());
} else {
- // Mongos pooled connections are authenticated through
- // ShardingConnectionHook::onCreate().
+ // Mongos pooled connections are authenticated through ShardingConnectionHook::onCreate()
}
LOG(3) << "dbclient_rs selecting node " << _lastSlaveOkHost << endl;
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 92123979b87..6f59c76a4e3 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -930,7 +930,6 @@ env.Library(
"index/index_descriptor",
"index_d",
"introspect",
- 'keys_collection_client_direct',
"matcher/expressions_mongod_only",
"op_observer_d",
"ops/write_ops_parsers",
@@ -972,6 +971,7 @@ env.Library(
"update/update_driver",
"update_index_data",
"views/views_mongod",
+ 'keys_collection_client_direct',
],
)
diff --git a/src/mongo/db/commands/conn_pool_stats.cpp b/src/mongo/db/commands/conn_pool_stats.cpp
index ac1a87973e8..d85c484dd22 100644
--- a/src/mongo/db/commands/conn_pool_stats.cpp
+++ b/src/mongo/db/commands/conn_pool_stats.cpp
@@ -1,4 +1,4 @@
-/*
+/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
@@ -40,6 +40,7 @@
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/client/shard_connection.h"
#include "mongo/s/grid.h"
namespace mongo {
@@ -114,5 +115,48 @@ public:
} poolStatsCmd;
+class ShardedPoolStats final : public BasicCommand {
+public:
+ ShardedPoolStats() : BasicCommand("shardConnPoolStats") {}
+
+ std::string help() const override {
+ return "stats about the shard connection pool";
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return false;
+ }
+
+ AllowedOnSecondary secondaryAllowed() const override {
+ return AllowedOnSecondary::kAlways;
+ }
+
+ /**
+ * Requires the same privileges as the connPoolStats command.
+ */
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {
+ ActionSet actions;
+ actions.addAction(ActionType::connPoolStats);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string& dbname,
+ const mongo::BSONObj& cmdObj,
+ mongo::BSONObjBuilder& result) override {
+ // Connection information
+ executor::ConnectionPoolStats stats{};
+ shardConnectionPool.appendConnectionStats(&stats);
+ stats.appendToBSON(result);
+
+ // Thread connection information
+ ShardConnection::reportActiveClientConnections(&result);
+ return true;
+ }
+
+} shardedPoolStatsCmd;
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/commands/conn_pool_sync.cpp b/src/mongo/db/commands/conn_pool_sync.cpp
index cd3526bb885..12e279d15fa 100644
--- a/src/mongo/db/commands/conn_pool_sync.cpp
+++ b/src/mongo/db/commands/conn_pool_sync.cpp
@@ -28,44 +28,47 @@
#include "mongo/platform/basic.h"
-#include <sstream>
-
-#include "mongo/client/connpool.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/db/commands.h"
#include "mongo/s/client/shard_connection.h"
namespace mongo {
+namespace {
class PoolFlushCmd : public BasicCommand {
public:
PoolFlushCmd() : BasicCommand("connPoolSync", "connpoolsync") {}
+
std::string help() const override {
return "internal";
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
+
+ AllowedOnSecondary secondaryAllowed() const override {
+ return AllowedOnSecondary::kAlways;
+ }
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {
ActionSet actions;
actions.addAction(ActionType::connPoolSync);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
- virtual bool run(OperationContext* opCtx,
- const std::string&,
- const mongo::BSONObj&,
- mongo::BSONObjBuilder& result) {
+ bool run(OperationContext* opCtx,
+ const std::string&,
+ const mongo::BSONObj&,
+ mongo::BSONObjBuilder& result) override {
shardConnectionPool.flush();
globalConnPool.flush();
return true;
}
- AllowedOnSecondary secondaryAllowed() const override {
- return AllowedOnSecondary::kAlways;
- }
} poolFlushCmd;
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index b999b982835..be9817a7b59 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -104,16 +104,16 @@ const DeleteStats* getDeleteStats(const PlanExecutor* exec) {
* to return to the client. If no matching document to update or remove was found, then none
* is returned. Otherwise, the updated or deleted document is returned.
*
- * If the operation failed, then an error Status is returned.
+ * If the operation failed, throws.
*/
-StatusWith<boost::optional<BSONObj>> advanceExecutor(OperationContext* opCtx,
- PlanExecutor* exec,
- bool isRemove) {
+boost::optional<BSONObj> advanceExecutor(OperationContext* opCtx,
+ PlanExecutor* exec,
+ bool isRemove) {
BSONObj value;
PlanExecutor::ExecState state = exec->getNext(&value, nullptr);
if (PlanExecutor::ADVANCED == state) {
- return boost::optional<BSONObj>(std::move(value));
+ return {std::move(value)};
}
if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
@@ -121,19 +121,18 @@ StatusWith<boost::optional<BSONObj>> advanceExecutor(OperationContext* opCtx,
<< ", stats: " << redact(Explain::getWinningPlanStats(exec));
if (WorkingSetCommon::isValidStatusMemberObject(value)) {
- const Status errorStatus = WorkingSetCommon::getMemberObjectStatus(value);
- invariant(!errorStatus.isOK());
- return errorStatus;
+ uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(value));
+ MONGO_UNREACHABLE;
}
- const std::string opstr = isRemove ? "delete" : "update";
- return {ErrorCodes::OperationFailed,
- str::stream() << "executor returned " << PlanExecutor::statestr(state)
- << " while executing "
- << opstr};
+
+ uasserted(ErrorCodes::OperationFailed,
+ str::stream() << "executor returned " << PlanExecutor::statestr(state)
+ << " while executing "
+ << (isRemove ? "delete" : "update"));
}
invariant(state == PlanExecutor::IS_EOF);
- return boost::optional<BSONObj>(boost::none);
+ return boost::none;
}
void makeUpdateRequest(const FindAndModifyRequest& args,
@@ -185,14 +184,11 @@ void appendCommandResponse(const PlanExecutor* exec,
}
}
-Status checkCanAcceptWritesForDatabase(OperationContext* opCtx, const NamespaceString& nsString) {
- if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nsString)) {
- return Status(ErrorCodes::NotMaster,
- str::stream()
- << "Not primary while running findAndModify command on collection "
- << nsString.ns());
- }
- return Status::OK();
+void assertCanWrite(OperationContext* opCtx, const NamespaceString& nsString) {
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Not primary while running findAndModify command on collection "
+ << nsString.ns(),
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nsString));
}
void recordStatsForTopCommand(OperationContext* opCtx) {
@@ -251,20 +247,12 @@ public:
ExplainOptions::Verbosity verbosity,
BSONObjBuilder* out) const override {
const NamespaceString fullNs = CommandHelpers::parseNsCollectionRequired(dbName, cmdObj);
- Status allowedWriteStatus = userAllowedWriteNS(fullNs.ns());
- if (!allowedWriteStatus.isOK()) {
- return allowedWriteStatus;
- }
-
- StatusWith<FindAndModifyRequest> parseStatus =
- FindAndModifyRequest::parseFromBSON(NamespaceString(fullNs.ns()), cmdObj);
- if (!parseStatus.isOK()) {
- return parseStatus.getStatus();
- }
+ uassertStatusOK(userAllowedWriteNS(fullNs.ns()));
- const FindAndModifyRequest& args = parseStatus.getValue();
+ const auto args(uassertStatusOK(FindAndModifyRequest::parseFromBSON(fullNs, cmdObj)));
const NamespaceString& nsString = args.getNamespaceString();
- OpDebug* opDebug = &CurOp::get(opCtx)->debug();
+ auto const curOp = CurOp::get(opCtx);
+ OpDebug* const opDebug = &curOp->debug();
if (args.isRemove()) {
DeleteRequest request(nsString);
@@ -272,29 +260,22 @@ public:
makeDeleteRequest(args, isExplain, &request);
ParsedDelete parsedDelete(opCtx, &request);
- Status parsedDeleteStatus = parsedDelete.parseRequest();
- if (!parsedDeleteStatus.isOK()) {
- return parsedDeleteStatus;
- }
+ uassertStatusOK(parsedDelete.parseRequest());
// Explain calls of the findAndModify command are read-only, but we take write
// locks so that the timing information is more accurate.
AutoGetCollection autoColl(opCtx, nsString, MODE_IX);
- if (!autoColl.getDb()) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "database " << dbName << " does not exist."};
- }
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "database " << dbName << " does not exist",
+ autoColl.getDb());
auto css = CollectionShardingState::get(opCtx, nsString);
css->checkShardVersionOrThrow(opCtx);
Collection* const collection = autoColl.getCollection();
- auto statusWithPlanExecutor =
- getExecutorDelete(opCtx, opDebug, collection, &parsedDelete);
- if (!statusWithPlanExecutor.isOK()) {
- return statusWithPlanExecutor.getStatus();
- }
- const auto exec = std::move(statusWithPlanExecutor.getValue());
+ const auto exec =
+ uassertStatusOK(getExecutorDelete(opCtx, opDebug, collection, &parsedDelete));
+
Explain::explainStages(exec.get(), collection, verbosity, out);
} else {
UpdateRequest request(nsString);
@@ -303,29 +284,22 @@ public:
makeUpdateRequest(args, isExplain, &updateLifecycle, &request);
ParsedUpdate parsedUpdate(opCtx, &request);
- Status parsedUpdateStatus = parsedUpdate.parseRequest();
- if (!parsedUpdateStatus.isOK()) {
- return parsedUpdateStatus;
- }
+ uassertStatusOK(parsedUpdate.parseRequest());
// Explain calls of the findAndModify command are read-only, but we take write
// locks so that the timing information is more accurate.
AutoGetCollection autoColl(opCtx, nsString, MODE_IX);
- if (!autoColl.getDb()) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "database " << dbName << " does not exist."};
- }
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "database " << dbName << " does not exist",
+ autoColl.getDb());
auto css = CollectionShardingState::get(opCtx, nsString);
css->checkShardVersionOrThrow(opCtx);
Collection* const collection = autoColl.getCollection();
- auto statusWithPlanExecutor =
- getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate);
- if (!statusWithPlanExecutor.isOK()) {
- return statusWithPlanExecutor.getStatus();
- }
- const auto exec = std::move(statusWithPlanExecutor.getValue());
+ const auto exec =
+ uassertStatusOK(getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate));
+
Explain::explainStages(exec.get(), collection, verbosity, out);
}
@@ -338,20 +312,14 @@ public:
BSONObjBuilder& result) override {
// findAndModify command is not replicated directly.
invariant(opCtx->writesAreReplicated());
- const NamespaceString fullNs = CommandHelpers::parseNsCollectionRequired(dbName, cmdObj);
- Status allowedWriteStatus = userAllowedWriteNS(fullNs.ns());
- if (!allowedWriteStatus.isOK()) {
- return CommandHelpers::appendCommandStatus(result, allowedWriteStatus);
- }
- StatusWith<FindAndModifyRequest> parseStatus =
- FindAndModifyRequest::parseFromBSON(NamespaceString(fullNs.ns()), cmdObj);
- if (!parseStatus.isOK()) {
- return CommandHelpers::appendCommandStatus(result, parseStatus.getStatus());
- }
+ const NamespaceString fullNs = CommandHelpers::parseNsCollectionRequired(dbName, cmdObj);
+ uassertStatusOK(userAllowedWriteNS(fullNs.ns()));
- const FindAndModifyRequest& args = parseStatus.getValue();
+ const auto args(uassertStatusOK(FindAndModifyRequest::parseFromBSON(fullNs, cmdObj)));
const NamespaceString& nsString = args.getNamespaceString();
+ auto const curOp = CurOp::get(opCtx);
+ OpDebug* const opDebug = &curOp->debug();
boost::optional<DisableDocumentValidation> maybeDisableValidation;
if (shouldBypassDocumentValidationForCommand(cmdObj))
@@ -369,9 +337,6 @@ public:
}
}
- auto curOp = CurOp::get(opCtx);
- OpDebug* opDebug = &curOp->debug();
-
// Although usually the PlanExecutor handles WCE internally, it will throw WCEs when it is
// executing a findAndModify. This is done to ensure that we can always match, modify, and
// return the document under concurrency, if a matching document exists.
@@ -386,16 +351,11 @@ public:
}
ParsedDelete parsedDelete(opCtx, &request);
- Status parsedDeleteStatus = parsedDelete.parseRequest();
- if (!parsedDeleteStatus.isOK()) {
- CommandHelpers::appendCommandStatus(result, parsedDeleteStatus);
- return false;
- }
+ uassertStatusOK(parsedDelete.parseRequest());
AutoGetOrCreateDb autoDb(opCtx, dbName, MODE_IX);
Lock::CollectionLock collLock(opCtx->lockState(), nsString.ns(), MODE_IX);
- // Attach the namespace and database profiling level to the current op.
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(),
@@ -405,11 +365,7 @@ public:
auto css = CollectionShardingState::get(opCtx, nsString);
css->checkShardVersionOrThrow(opCtx);
- Status isPrimary = checkCanAcceptWritesForDatabase(opCtx, nsString);
- if (!isPrimary.isOK()) {
- CommandHelpers::appendCommandStatus(result, isPrimary);
- return false;
- }
+ assertCanWrite(opCtx, nsString);
Collection* const collection = autoDb.getDb()->getCollection(opCtx, nsString);
if (!collection && autoDb.getDb()->getViewCatalog()->lookup(opCtx, nsString.ns())) {
@@ -418,25 +374,15 @@ public:
"findAndModify not supported on a view"});
return false;
}
- auto statusWithPlanExecutor =
- getExecutorDelete(opCtx, opDebug, collection, &parsedDelete);
- if (!statusWithPlanExecutor.isOK()) {
- CommandHelpers::appendCommandStatus(result, statusWithPlanExecutor.getStatus());
- return false;
- }
- const auto exec = std::move(statusWithPlanExecutor.getValue());
+ const auto exec =
+ uassertStatusOK(getExecutorDelete(opCtx, opDebug, collection, &parsedDelete));
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
}
- StatusWith<boost::optional<BSONObj>> advanceStatus =
- advanceExecutor(opCtx, exec.get(), args.isRemove());
- if (!advanceStatus.isOK()) {
- CommandHelpers::appendCommandStatus(result, advanceStatus.getStatus());
- return false;
- }
+ auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
// Nothing after advancing the plan executor should throw a WriteConflictException,
// so the following bookkeeping with execution stats won't end up being done
// multiple times.
@@ -458,8 +404,7 @@ public:
}
recordStatsForTopCommand(opCtx);
- appendCommandResponse(
- exec.get(), args.isRemove(), advanceStatus.getValue(), &result);
+ appendCommandResponse(exec.get(), args.isRemove(), docFound, &result);
} else {
UpdateRequest request(nsString);
UpdateLifecycleImpl updateLifecycle(nsString);
@@ -471,16 +416,11 @@ public:
}
ParsedUpdate parsedUpdate(opCtx, &request);
- Status parsedUpdateStatus = parsedUpdate.parseRequest();
- if (!parsedUpdateStatus.isOK()) {
- CommandHelpers::appendCommandStatus(result, parsedUpdateStatus);
- return false;
- }
+ uassertStatusOK(parsedUpdate.parseRequest());
AutoGetOrCreateDb autoDb(opCtx, dbName, MODE_IX);
Lock::CollectionLock collLock(opCtx->lockState(), nsString.ns(), MODE_IX);
- // Attach the namespace and database profiling level to the current op.
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(),
@@ -490,11 +430,7 @@ public:
auto css = CollectionShardingState::get(opCtx, nsString);
css->checkShardVersionOrThrow(opCtx);
- Status isPrimary = checkCanAcceptWritesForDatabase(opCtx, nsString);
- if (!isPrimary.isOK()) {
- CommandHelpers::appendCommandStatus(result, isPrimary);
- return false;
- }
+ assertCanWrite(opCtx, nsString);
Collection* collection = autoDb.getDb()->getCollection(opCtx, nsString.ns());
if (!collection && autoDb.getDb()->getViewCatalog()->lookup(opCtx, nsString.ns())) {
@@ -511,11 +447,7 @@ public:
// in exclusive mode in order to create the collection.
collLock.relockAsDatabaseExclusive(autoDb.lock());
collection = autoDb.getDb()->getCollection(opCtx, nsString);
- Status isPrimaryAfterRelock = checkCanAcceptWritesForDatabase(opCtx, nsString);
- if (!isPrimaryAfterRelock.isOK()) {
- CommandHelpers::appendCommandStatus(result, isPrimaryAfterRelock);
- return false;
- }
+ assertCanWrite(opCtx, nsString);
if (collection) {
// Someone else beat us to creating the collection, do nothing.
@@ -534,25 +466,15 @@ public:
}
}
- auto statusWithPlanExecutor =
- getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate);
- if (!statusWithPlanExecutor.isOK()) {
- CommandHelpers::appendCommandStatus(result, statusWithPlanExecutor.getStatus());
- return false;
- }
- const auto exec = std::move(statusWithPlanExecutor.getValue());
+ const auto exec =
+ uassertStatusOK(getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate));
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
}
- StatusWith<boost::optional<BSONObj>> advanceStatus =
- advanceExecutor(opCtx, exec.get(), args.isRemove());
- if (!advanceStatus.isOK()) {
- CommandHelpers::appendCommandStatus(result, advanceStatus.getStatus());
- return false;
- }
+ auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
// Nothing after advancing the plan executor should throw a WriteConflictException,
// so the following bookkeeping with execution stats won't end up being done
// multiple times.
@@ -572,8 +494,7 @@ public:
}
recordStatsForTopCommand(opCtx);
- appendCommandResponse(
- exec.get(), args.isRemove(), advanceStatus.getValue(), &result);
+ appendCommandResponse(exec.get(), args.isRemove(), docFound, &result);
}
return true;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d09fc1ab9de..82c3f51f3f8 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -87,7 +87,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/dcommands_fcv',
'$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/repl/repl_coordinator_global',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/s/client/shard_local',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/is_mongos',
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
index e8f93fc34f3..9045cbef69a 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
@@ -76,17 +76,12 @@
namespace mongo {
namespace {
-using std::vector;
-
using CallbackHandle = executor::TaskExecutor::CallbackHandle;
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
-const Seconds kDefaultFindHostMaxWaitTime(20);
-
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
-const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
/**
* Generates a unique name to be given to a newly added shard.
@@ -715,7 +710,7 @@ StatusWith<std::string> ShardingCatalogManager::addShard(
->catalogClient()
->logChange(
opCtx, "addShard", "", shardDetails.obj(), ShardingCatalogClient::kMajorityWriteConcern)
- .transitional_ignore();
+ .ignore();
// Ensure the added shard is visible to this process.
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
@@ -789,7 +784,7 @@ StatusWith<ShardDrainingStatus> ShardingCatalogManager::removeShard(OperationCon
"",
BSON("shard" << name),
ShardingCatalogClient::kLocalWriteConcern)
- .transitional_ignore();
+ .ignore();
return ShardDrainingStatus::STARTED;
}
@@ -845,7 +840,7 @@ StatusWith<ShardDrainingStatus> ShardingCatalogManager::removeShard(OperationCon
"",
BSON("shard" << name),
ShardingCatalogClient::kLocalWriteConcern)
- .transitional_ignore();
+ .ignore();
return ShardDrainingStatus::COMPLETED;
}
@@ -884,7 +879,7 @@ BSONObj ShardingCatalogManager::createShardIdentityUpsertForAddShard(OperationCo
// static
StatusWith<ShardId> ShardingCatalogManager::_selectShardForNewDatabase(
OperationContext* opCtx, ShardRegistry* shardRegistry) {
- vector<ShardId> allShardIds;
+ std::vector<ShardId> allShardIds;
shardRegistry->getAllShardIds(&allShardIds);
if (allShardIds.empty()) {
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index a575b0a3018..c6f2496bc3d 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -36,7 +36,7 @@
#include <vector>
#include "mongo/client/connpool.h"
-#include "mongo/db/auth/authorization_manager_global.h"
+#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/document_validation.h"
@@ -47,15 +47,13 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/move_timing_helper.h"
-#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/concurrency/notification.h"
@@ -66,6 +64,9 @@
namespace mongo {
namespace {
+const auto getMigrationDestinationManager =
+ ServiceContext::declareDecoration<MigrationDestinationManager>();
+
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
// Note: Even though we're setting UNSET here,
// kMajority implies JOURNAL if journaling is
@@ -210,6 +211,10 @@ MigrationDestinationManager::MigrationDestinationManager() = default;
MigrationDestinationManager::~MigrationDestinationManager() = default;
+MigrationDestinationManager* MigrationDestinationManager::get(OperationContext* opCtx) {
+ return &getMigrationDestinationManager(opCtx->getServiceContext());
+}
+
MigrationDestinationManager::State MigrationDestinationManager::getState() const {
stdx::lock_guard<stdx::mutex> sl(_mutex);
return _state;
@@ -428,7 +433,7 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
auto opCtx = Client::getCurrent()->makeOperationContext();
- if (getGlobalAuthorizationManager()->isAuthEnabled()) {
+ if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) {
AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization();
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index cc59483d699..bcbfb65facb 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -69,6 +69,13 @@ public:
MigrationDestinationManager();
~MigrationDestinationManager();
+ /**
+ * Returns the singleton instance of the migration destination manager.
+ *
+ * TODO (SERVER-25333): This should become per-collection instance instead of singleton.
+ */
+ static MigrationDestinationManager* get(OperationContext* opCtx);
+
State getState() const;
void setState(State newState);
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 37cde498251..98c2b85cd18 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -30,11 +30,6 @@
#include "mongo/platform/basic.h"
-#include <algorithm>
-#include <map>
-#include <string>
-#include <vector>
-
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager.h"
@@ -43,6 +38,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/s/chunk_move_write_concern_options.h"
+#include "mongo/db/s/migration_destination_manager.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/request_types/migration_secondary_throttle_options.h"
@@ -50,9 +46,6 @@
#include "mongo/util/log.h"
namespace mongo {
-
-using std::string;
-
namespace {
class RecvChunkStartCommand : public ErrmsgCommandDeprecated {
@@ -67,28 +60,28 @@ public:
return AllowedOnSecondary::kNever;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
// This is required to be true to support moveChunk.
return true;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {
ActionSet actions;
actions.addAction(ActionType::internal);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
bool errmsgRun(OperationContext* opCtx,
- const string&,
+ const std::string& dbname,
const BSONObj& cmdObj,
- string& errmsg,
- BSONObjBuilder& result) {
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
auto shardingState = ShardingState::get(opCtx);
uassertStatusOK(shardingState->canAcceptShardedCommands());
@@ -136,7 +129,7 @@ public:
auto scopedRegisterReceiveChunk(
uassertStatusOK(shardingState->registerReceiveChunk(nss, chunkRange, fromShard)));
- uassertStatusOK(shardingState->migrationDestinationManager()->start(
+ uassertStatusOK(MigrationDestinationManager::get(opCtx)->start(
nss,
std::move(scopedRegisterReceiveChunk),
migrationSessionId,
@@ -167,27 +160,27 @@ public:
return AllowedOnSecondary::kNever;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {
ActionSet actions;
actions.addAction(ActionType::internal);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
bool run(OperationContext* opCtx,
- const string&,
+ const std::string& dbname,
const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- ShardingState::get(opCtx)->migrationDestinationManager()->report(result);
+ BSONObjBuilder& result) override {
+ MigrationDestinationManager::get(opCtx)->report(result);
return true;
}
@@ -205,29 +198,29 @@ public:
return AllowedOnSecondary::kNever;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {
ActionSet actions;
actions.addAction(ActionType::internal);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
bool run(OperationContext* opCtx,
- const string&,
+ const std::string& dbname,
const BSONObj& cmdObj,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result) override {
auto const sessionId = uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj));
- auto mdm = ShardingState::get(opCtx)->migrationDestinationManager();
+ auto const mdm = MigrationDestinationManager::get(opCtx);
Status const status = mdm->startCommit(sessionId);
mdm->report(result);
if (!status.isOK()) {
@@ -251,28 +244,27 @@ public:
return AllowedOnSecondary::kNever;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {
ActionSet actions;
actions.addAction(ActionType::internal);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
bool run(OperationContext* opCtx,
- const string&,
+ const std::string&,
const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- auto const mdm = ShardingState::get(opCtx)->migrationDestinationManager();
+ BSONObjBuilder& result) override {
+ auto const mdm = MigrationDestinationManager::get(opCtx);
auto migrationSessionIdStatus(MigrationSessionId::extractFromBSON(cmdObj));
@@ -287,6 +279,7 @@ public:
mdm->abortWithoutSessionIdCheck();
mdm->report(result);
}
+
uassertStatusOK(migrationSessionIdStatus.getStatus());
return true;
}
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index f14815502b3..c42d0c5f111 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -37,7 +37,6 @@
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/chunk_splitter.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/migration_destination_manager.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/stdx/functional.h"
@@ -111,10 +110,6 @@ public:
std::string getShardName();
- MigrationDestinationManager* migrationDestinationManager() {
- return &_migrationDestManager;
- }
-
/**
* Initializes the sharding state of this server from the shard identity document argument
* and sets secondary or primary state information on the catalog cache loader.
@@ -303,9 +298,6 @@ private:
*/
ChunkVersion _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss);
- // Manages the state of the migration recipient shard
- MigrationDestinationManager _migrationDestManager;
-
// Tracks the active move chunk operations running on this shard
ActiveMigrationsRegistry _activeMigrationsRegistry;
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index f5c8e90935d..a0142e2a2fd 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -37,12 +37,12 @@
#include "mongo/db/sessions_collection_rs.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/net/op_msg.h"
namespace mongo {
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index a05befab239..940583a992e 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -64,7 +64,6 @@
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/set_shard_version_request.h"
diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp
index 033c6e947ea..0e7dfb0b845 100644
--- a/src/mongo/s/client/shard_connection.cpp
+++ b/src/mongo/s/client/shard_connection.cpp
@@ -34,9 +34,7 @@
#include <set>
-#include "mongo/db/commands.h"
#include "mongo/db/lasterror.h"
-#include "mongo/executor/connection_pool_stats.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
@@ -50,112 +48,48 @@
#include "mongo/util/stacktrace.h"
namespace mongo {
-
-using std::unique_ptr;
-using std::map;
-using std::set;
-using std::string;
-using std::stringstream;
-using std::vector;
-
namespace {
class ClientConnections;
/**
- * Class which tracks ClientConnections (the client connection pool) for each incoming
- * connection, allowing stats access.
+ * Class which tracks ClientConnections (the client connection pool) for each incoming connection,
+ * allowing stats access.
*/
class ActiveClientConnections {
public:
void add(const ClientConnections* cc) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
_clientConnections.insert(cc);
}
void remove(const ClientConnections* cc) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
_clientConnections.erase(cc);
}
- void appendInfo(BSONObjBuilder& b);
+ void appendInfo(BSONObjBuilder* b) const;
private:
- stdx::mutex _mutex;
- set<const ClientConnections*> _clientConnections;
+ mutable stdx::mutex _mutex;
+ std::set<const ClientConnections*> _clientConnections;
} activeClientConnections;
/**
- * Command to allow access to the sharded conn pool information in mongos.
- */
-class ShardedPoolStats : public BasicCommand {
-public:
- ShardedPoolStats() : BasicCommand("shardConnPoolStats") {}
- std::string help() const override {
- return "stats about the shard connection pool";
- }
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
- }
- AllowedOnSecondary secondaryAllowed() const override {
- return AllowedOnSecondary::kAlways;
- }
-
- // Same privs as connPoolStats
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
- ActionSet actions;
- actions.addAction(ActionType::connPoolStats);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
-
- virtual bool run(OperationContext* opCtx,
- const string& dbname,
- const mongo::BSONObj& cmdObj,
- mongo::BSONObjBuilder& result) {
- // Connection information
- executor::ConnectionPoolStats stats{};
- shardConnectionPool.appendConnectionStats(&stats);
- stats.appendToBSON(result);
-
- // Thread connection information
- activeClientConnections.appendInfo(result);
-
- return true;
- }
-
-} shardedPoolStatsCmd;
-
-/**
- * holds all the actual db connections for a client to various servers 1 per thread, so
- * doesn't have to be thread safe.
+ * Holds all the actual db connections for a client to various servers 1 per thread, so doesn't have
+ * to be thread safe.
*/
class ClientConnections {
MONGO_DISALLOW_COPYING(ClientConnections);
public:
struct Status {
- Status() : created(0), avail(0) {}
-
- // May be read concurrently, but only written from
- // this thread.
- long long created;
- DBClientBase* avail;
+ // May be read concurrently, but only written from this thread
+ long long created = 0;
+ DBClientBase* avail = nullptr;
};
- // Gets or creates the status object for the host
- Status* _getStatus(const string& addr) {
- scoped_spinlock lock(_lock);
- Status*& temp = _hosts[addr];
- if (!temp) {
- temp = new Status();
- }
-
- return temp;
- }
-
ClientConnections() {
// Start tracking client connections
activeClientConnections.add(this);
@@ -168,11 +102,45 @@ public:
releaseAll(true);
}
+ static ClientConnections* threadInstance() {
+ if (!_perThread) {
+ _perThread = stdx::make_unique<ClientConnections>();
+ }
+ return _perThread.get();
+ }
+
+ DBClientBase* get(const std::string& addr, const std::string& ns) {
+ {
+ // We want to report ns stats
+ scoped_spinlock lock(_lock);
+ if (ns.size() > 0)
+ _seenNS.insert(ns);
+ }
+
+ Status* s = _getStatus(addr);
+
+ std::unique_ptr<DBClientBase> c;
+ if (s->avail) {
+ c.reset(s->avail);
+ s->avail = 0;
+
+ // May throw an exception
+ shardConnectionPool.onHandedOut(c.get());
+ } else {
+ c.reset(shardConnectionPool.get(addr));
+
+ // After, so failed creation doesn't get counted
+ s->created++;
+ }
+
+ return c.release();
+ }
+
void releaseAll(bool fromDestructor = false) {
// Don't need spinlock protection because if not in the destructor, we don't modify
// _hosts, and if in the destructor we are not accessible to external threads.
for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
- const string addr = i->first;
+ const auto addr = i->first;
Status* ss = i->second;
invariant(ss);
@@ -202,34 +170,7 @@ public:
}
}
- DBClientBase* get(const string& addr, const string& ns) {
- {
- // We want to report ns stats
- scoped_spinlock lock(_lock);
- if (ns.size() > 0)
- _seenNS.insert(ns);
- }
-
- Status* s = _getStatus(addr);
-
- unique_ptr<DBClientBase> c;
- if (s->avail) {
- c.reset(s->avail);
- s->avail = 0;
-
- // May throw an exception
- shardConnectionPool.onHandedOut(c.get());
- } else {
- c.reset(shardConnectionPool.get(addr));
-
- // After, so failed creation doesn't get counted
- s->created++;
- }
-
- return c.release();
- }
-
- void done(const string& addr, DBClientBase* conn) {
+ void done(const std::string& addr, DBClientBase* conn) {
Status* s = _hosts[addr];
verify(s);
@@ -263,17 +204,18 @@ public:
return;
}
- // Note: Although we try our best to clear bad connections as much as possible,
- // some of them can still slip through because of how ClientConnections are being
- // used - as thread local variables. This means that threads won't be able to
- // see the s->avail connection of other threads.
-
+ // Note: Although we try our best to clear bad connections as much as possible, some of them
+ // can still slip through because of how ClientConnections are being used - as thread local
+ // variables. This means that threads won't be able to see the s->avail connection of other
+ // threads.
s->avail = conn;
}
- void checkVersions(OperationContext* opCtx, const string& ns) {
- vector<ShardId> all;
- grid.shardRegistry()->getAllShardIds(&all);
+ void checkVersions(OperationContext* opCtx, const std::string& ns) {
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+
+ std::vector<ShardId> all;
+ shardRegistry->getAllShardIds(&all);
// Don't report exceptions here as errors in GetLastError
LastError::Disabled ignoreForGLE(&LastError::get(cc()));
@@ -281,16 +223,16 @@ public:
// Now only check top-level shard connections
for (const ShardId& shardId : all) {
try {
- auto shardStatus = grid.shardRegistry()->getShard(opCtx, shardId);
+ auto shardStatus = shardRegistry->getShard(opCtx, shardId);
if (!shardStatus.isOK()) {
invariant(shardStatus == ErrorCodes::ShardNotFound);
continue;
}
const auto shard = shardStatus.getValue();
- string sconnString = shard->getConnString().toString();
- Status* s = _getStatus(sconnString);
+ const auto sconnString = shard->getConnString().toString();
+ Status* s = _getStatus(sconnString);
if (!s->avail) {
s->avail = shardConnectionPool.get(sconnString);
s->created++; // After, so failed creation doesn't get counted
@@ -298,22 +240,41 @@ public:
versionManager.checkShardVersionCB(opCtx, s->avail, ns, false, 1);
} catch (const DBException& ex) {
- warning() << "problem while initially checking shard versions on"
- << " " << shardId << causedBy(ex);
+ warning() << "Problem while initially checking shard versions on"
+ << " " << shardId << causedBy(redact(ex));
- // NOTE: This is only a heuristic, to avoid multiple stale version retries
- // across multiple shards, and does not affect correctness.
+ // NOTE: This is only a heuristic, to avoid multiple stale version retries across
+ // multiple shards, and does not affect correctness.
}
}
}
- void release(const string& addr, DBClientBase* conn) {
+ void release(const std::string& addr, DBClientBase* conn) {
shardConnectionPool.release(addr, conn);
}
+ void forgetNS(const std::string& ns) {
+ scoped_spinlock lock(_lock);
+ _seenNS.erase(ns);
+ }
+
+ /**
+ * Clears the connections kept by this pool (ie, not including the global pool)
+ */
+ void clearPool() {
+ for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) {
+ if (iter->second->avail != NULL) {
+ delete iter->second->avail;
+ }
+ delete iter->second;
+ }
+
+ _hosts.clear();
+ }
+
/**
- * Appends info about the client connection pool to a BOBuilder
- * Safe to call with activeClientConnections lock
+ * Appends info about the client connection pool to a BSONObjBuilder. Safe to call with
+ * activeClientConnections lock.
*/
void appendInfo(BSONObjBuilder& b) const {
scoped_spinlock lock(_lock);
@@ -329,77 +290,61 @@ public:
hostsArrB.done();
BSONArrayBuilder nsArrB(b.subarrayStart("seenNS"));
- for (set<string>::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i) {
- nsArrB.append(*i);
+ for (const auto& ns : _seenNS) {
+ nsArrB.append(ns);
}
nsArrB.done();
}
- // Protects only the creation of new entries in the _hosts and _seenNS map
- // from external threads. Reading _hosts / _seenNS in this thread doesn't
- // need protection.
- mutable SpinLock _lock;
- typedef map<string, Status*, DBConnectionPool::serverNameCompare> HostMap;
- HostMap _hosts;
- set<string> _seenNS;
-
+private:
/**
- * Clears the connections kept by this pool (ie, not including the global pool)
+ * Gets or creates the status object for the host.
*/
- void clearPool() {
- for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) {
- if (iter->second->avail != NULL) {
- delete iter->second->avail;
- }
- delete iter->second;
+ Status* _getStatus(const std::string& addr) {
+ scoped_spinlock lock(_lock);
+ Status*& temp = _hosts[addr];
+ if (!temp) {
+ temp = new Status();
}
- _hosts.clear();
- }
-
- void forgetNS(const string& ns) {
- scoped_spinlock lock(_lock);
- _seenNS.erase(ns);
+ return temp;
}
- // -----
-
static thread_local std::unique_ptr<ClientConnections> _perThread;
- static ClientConnections* threadInstance() {
- if (!_perThread) {
- _perThread = stdx::make_unique<ClientConnections>();
- }
- return _perThread.get();
- }
-};
+ // Protects only the creation of new entries in the _hosts and _seenNS map from external
+ // threads. Reading _hosts/_seenNS in this thread doesn't need protection.
+ mutable SpinLock _lock;
-thread_local std::unique_ptr<ClientConnections> ClientConnections::_perThread;
+ using HostMap = std::map<std::string, Status*, DBConnectionPool::serverNameCompare>;
+ HostMap _hosts;
+ std::set<std::string> _seenNS;
+};
-void ActiveClientConnections::appendInfo(BSONObjBuilder& b) {
- BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads
+void ActiveClientConnections::appendInfo(BSONObjBuilder* b) const {
+ // Preallocate the buffer because there may be quite a few threads to report
+ BSONArrayBuilder arr(64 * 1024);
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
- for (set<const ClientConnections*>::const_iterator i = _clientConnections.begin();
- i != _clientConnections.end();
- ++i) {
+ for (const auto* conn : _clientConnections) {
BSONObjBuilder bb(arr.subobjStart());
- (*i)->appendInfo(bb);
- bb.done();
+ conn->appendInfo(bb);
+ bb.doneFast();
}
}
- b.appendArray("threads", arr.obj());
+ b->appendArray("threads", arr.obj());
}
+thread_local std::unique_ptr<ClientConnections> ClientConnections::_perThread;
+
} // namespace
-// The global connection pool
DBConnectionPool shardConnectionPool;
ShardConnection::ShardConnection(const ConnectionString& connectionString,
- const string& ns,
+ const std::string& ns,
std::shared_ptr<ChunkManager> manager)
: _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) {
invariant(_cs.isValid());
@@ -450,7 +395,7 @@ void ShardConnection::_finishInit() {
_setVersion = versionManager.checkShardVersionCB(opCtx, this, false, 1);
} else {
// Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
- verify(!_manager);
+ invariant(!_manager);
_setVersion = false;
}
}
@@ -458,7 +403,7 @@ void ShardConnection::_finishInit() {
void ShardConnection::done() {
if (_conn) {
ClientConnections::threadInstance()->done(_cs.toString(), _conn);
- _conn = 0;
+ _conn = nullptr;
_finishedInit = true;
}
}
@@ -481,7 +426,11 @@ void ShardConnection::kill() {
}
}
-void ShardConnection::checkMyConnectionVersions(OperationContext* opCtx, const string& ns) {
+void ShardConnection::reportActiveClientConnections(BSONObjBuilder* builder) {
+ activeClientConnections.appendInfo(builder);
+}
+
+void ShardConnection::checkMyConnectionVersions(OperationContext* opCtx, const std::string& ns) {
ClientConnections::threadInstance()->checkVersions(opCtx, ns);
}
@@ -494,7 +443,7 @@ void ShardConnection::clearPool() {
ClientConnections::threadInstance()->clearPool();
}
-void ShardConnection::forgetNS(const string& ns) {
+void ShardConnection::forgetNS(const std::string& ns) {
ClientConnections::threadInstance()->forgetNS(ns);
}
diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h
index 62afb593f74..bfe3237e434 100644
--- a/src/mongo/s/client/shard_connection.h
+++ b/src/mongo/s/client/shard_connection.h
@@ -28,8 +28,6 @@
#pragma once
-#include <string>
-
#include "mongo/client/connpool.h"
namespace mongo {
@@ -112,6 +110,9 @@ public:
return _conn != NULL;
}
+ /** reports all thread local connections on this instance */
+ static void reportActiveClientConnections(BSONObjBuilder* builder);
+
/** checks all of my thread local connections for the version of this ns */
static void checkMyConnectionVersions(OperationContext* opCtx, const std::string& ns);
@@ -145,6 +146,9 @@ private:
bool _setVersion;
};
+/**
+ * Global sharded connection pool, used by all instances of ShardConnection.
+ */
extern DBConnectionPool shardConnectionPool;
} // namespace mongo
diff --git a/src/mongo/s/client/shard_connection_test.cpp b/src/mongo/s/client/shard_connection_test.cpp
index aa43bf8f740..dc734b82640 100644
--- a/src/mongo/s/client/shard_connection_test.cpp
+++ b/src/mongo/s/client/shard_connection_test.cpp
@@ -30,19 +30,11 @@
#include <cstdint>
#include <vector>
-#include "mongo/base/init.h"
-#include "mongo/client/connpool.h"
-#include "mongo/db/auth/authorization_manager.h"
-#include "mongo/db/auth/authorization_manager_global.h"
-#include "mongo/db/auth/authz_manager_external_state_mock.h"
#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/service_context_noop.h"
#include "mongo/dbtests/mock/mock_conn_registry.h"
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/s/client/shard_connection.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/net/socket_exception.h"
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 3cfa6aa678b..aa8279ed2f1 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -87,8 +87,8 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmongos',
'$BUILD_DIR/mongo/db/commands/killcursors_common',
- '$BUILD_DIR/mongo/db/ftdc/ftdc_server',
'$BUILD_DIR/mongo/db/commands/write_commands_common',
+ '$BUILD_DIR/mongo/db/ftdc/ftdc_server',
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/pipeline/aggregation',
'$BUILD_DIR/mongo/db/views/views',
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 154108b279a..6c3f604bdd2 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -52,7 +52,6 @@
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/commands/pipeline_s.h"
diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp
index 2f985d2aee6..bb4775f2e30 100644
--- a/src/mongo/s/commands/cluster_commands_helpers.cpp
+++ b/src/mongo/s/commands/cluster_commands_helpers.cpp
@@ -41,9 +41,7 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/parallel.h"
-#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/client/version_manager.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/create_database_gen.h"
#include "mongo/s/shard_id.h"
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 836f1dc0081..90c2c2e08c8 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -42,10 +42,10 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/commands/cluster_explain.h"
-#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/timer.h"
namespace mongo {
diff --git a/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp b/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp
index a0a14dd054f..a6ca81416a4 100644
--- a/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp
+++ b/src/mongo/s/commands/cluster_get_prev_error_cmd.cpp
@@ -28,12 +28,7 @@
#include "mongo/platform/basic.h"
-#include <set>
-#include <string>
-
#include "mongo/db/commands.h"
-#include "mongo/db/lasterror.h"
-#include "mongo/s/client/shard_connection.h"
namespace mongo {
namespace {
@@ -42,8 +37,7 @@ class GetPrevErrorCmd : public ErrmsgCommandDeprecated {
public:
GetPrevErrorCmd() : ErrmsgCommandDeprecated("getPrevError", "getpreverror") {}
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -55,19 +49,19 @@ public:
return "get previous error (since last reseterror command)";
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) const {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {
// No auth required
}
- virtual bool errmsgRun(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- std::string& errmsg,
- BSONObjBuilder& result) {
- errmsg += "getpreverror not supported for sharded environments";
- return false;
+ bool errmsgRun(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
+ uasserted(ErrorCodes::CommandNotSupported,
+ "getPrevError is not supported in sharded environments");
}
} cmdGetPrevError;
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index d89a51531ea..c96ea7be671 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -48,7 +48,6 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
-#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/shard_collection_gen.h"
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index ac87faa6ff1..f7ac158045a 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -55,7 +55,6 @@
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
-#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/migration_secondary_throttle_options.h"
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 79cab4816ef..667a2790280 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -41,12 +41,12 @@
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_last_error_info.h"
-#include "mongo/s/commands/chunk_manager_targeter.h"
#include "mongo/s/commands/cluster_explain.h"
-#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/s/write_ops/chunk_manager_targeter.h"
+#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript
index 0bd003e8ddf..138c7e8083d 100644
--- a/src/mongo/s/write_ops/SConscript
+++ b/src/mongo/s/write_ops/SConscript
@@ -25,19 +25,19 @@ env.Library(
env.Library(
target='cluster_write_op',
source=[
- 'write_op.cpp',
- 'batch_write_op.cpp',
'batch_write_exec.cpp',
- '$BUILD_DIR/mongo/s/commands/chunk_manager_targeter.cpp',
- '$BUILD_DIR/mongo/s/commands/cluster_write.cpp',
+ 'batch_write_op.cpp',
+ 'chunk_manager_targeter.cpp',
+ 'cluster_write.cpp',
+ 'write_op.cpp',
],
LIBDEPS=[
- 'batch_write_types',
'$BUILD_DIR/mongo/client/connection_string',
'$BUILD_DIR/mongo/s/async_requests_sender',
'$BUILD_DIR/mongo/s/client/sharding_client',
'$BUILD_DIR/mongo/s/commands/shared_cluster_commands',
'$BUILD_DIR/mongo/s/coreshard',
+ 'batch_write_types',
],
)
diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index 8cde8fa627c..04b0d3a1e21 100644
--- a/src/mongo/s/commands/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/commands/chunk_manager_targeter.h"
+#include "mongo/s/write_ops/chunk_manager_targeter.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/query/canonical_query.h"
diff --git a/src/mongo/s/commands/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index 6de41ed3fe7..6de41ed3fe7 100644
--- a/src/mongo/s/commands/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp
index 517f71606d3..fe288bfbd56 100644
--- a/src/mongo/s/commands/cluster_write.cpp
+++ b/src/mongo/s/write_ops/cluster_write.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/commands/cluster_write.h"
+#include "mongo/s/write_ops/cluster_write.h"
#include <algorithm>
@@ -42,10 +42,10 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/commands/chunk_manager_targeter.h"
#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_util.h"
+#include "mongo/s/write_ops/chunk_manager_targeter.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
diff --git a/src/mongo/s/commands/cluster_write.h b/src/mongo/s/write_ops/cluster_write.h
index f9face8ff80..f9face8ff80 100644
--- a/src/mongo/s/commands/cluster_write.h
+++ b/src/mongo/s/write_ops/cluster_write.h