summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/config/configsvr_remove_shard_command.cpp106
-rw-r--r--src/mongo/s/catalog/SConscript2
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.cpp3
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h24
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp149
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.h11
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp5
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h3
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h43
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_database_operations.cpp32
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp158
-rw-r--r--src/mongo/s/catalog/sharding_catalog_remove_shard_test.cpp502
-rw-r--r--src/mongo/s/commands/cluster_remove_shard_cmd.cpp102
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp54
-rw-r--r--src/mongo/s/config_server_test_fixture.h17
15 files changed, 654 insertions, 557 deletions
diff --git a/src/mongo/db/s/config/configsvr_remove_shard_command.cpp b/src/mongo/db/s/config/configsvr_remove_shard_command.cpp
index 5ac9d2dc224..d24516b0a9c 100644
--- a/src/mongo/db/s/config/configsvr_remove_shard_command.cpp
+++ b/src/mongo/db/s/config/configsvr_remove_shard_command.cpp
@@ -42,6 +42,7 @@
#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -65,7 +66,7 @@ public:
}
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
+ return true;
}
virtual void help(std::stringstream& help) const override {
@@ -73,11 +74,114 @@ public:
"directly. Removes a shard from the cluster.";
}
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
+ if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
+ ResourcePattern::forClusterResource(), ActionType::internal)) {
+ return Status(ErrorCodes::Unauthorized, "Unauthorized");
+ }
+ return Status::OK();
+ }
+
bool run(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
+ uassert(ErrorCodes::IllegalOperation,
+ "_configsvrRemoveShard can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Field '" << cmdObj.firstElement().fieldName()
+ << "' must be of type string",
+ cmdObj.firstElement().type() == BSONType::String);
+ const std::string target = cmdObj.firstElement().str();
+
+ const auto shardStatus = grid.shardRegistry()->getShard(opCtx, ShardId(target));
+ if (!shardStatus.isOK()) {
+ std::string msg(str::stream() << "Could not drop shard '" << target
+ << "' because it does not exist");
+ log() << msg;
+ return appendCommandStatus(result, Status(ErrorCodes::ShardNotFound, msg));
+ }
+ const auto shard = shardStatus.getValue();
+
+ const auto shardingCatalogManager = ShardingCatalogManager::get(opCtx);
+
+ StatusWith<ShardDrainingStatus> removeShardResult =
+ shardingCatalogManager->removeShard(opCtx, shard->getId());
+ if (!removeShardResult.isOK()) {
+ return appendCommandStatus(result, removeShardResult.getStatus());
+ }
+
+ std::vector<std::string> databases;
+ Status status =
+ shardingCatalogManager->getDatabasesForShard(opCtx, shard->getId(), &databases);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
+ }
+
+ // Get BSONObj containing:
+ // 1) note about moving or dropping databases in a shard
+ // 2) list of databases (excluding 'local' database) that need to be moved
+ BSONObj dbInfo;
+ {
+ BSONObjBuilder dbInfoBuilder;
+ dbInfoBuilder.append("note", "you need to drop or movePrimary these databases");
+ BSONArrayBuilder dbs(dbInfoBuilder.subarrayStart("dbsToMove"));
+ for (std::vector<std::string>::const_iterator it = databases.begin();
+ it != databases.end();
+ it++) {
+ if (*it != "local") {
+ dbs.append(*it);
+ }
+ }
+ dbs.doneFast();
+ dbInfo = dbInfoBuilder.obj();
+ }
+
+ // TODO: Standardize/Seperate how we append to the result object
+ switch (removeShardResult.getValue()) {
+ case ShardDrainingStatus::STARTED:
+ result.append("msg", "draining started successfully");
+ result.append("state", "started");
+ result.append("shard", shard->getId().toString());
+ result.appendElements(dbInfo);
+ break;
+ case ShardDrainingStatus::ONGOING: {
+ std::vector<ChunkType> chunks;
+ Status status = Grid::get(opCtx)->catalogClient()->getChunks(
+ opCtx,
+ BSON(ChunkType::shard(shard->getId().toString())),
+ BSONObj(),
+ boost::none, // return all
+ &chunks,
+ nullptr,
+ repl::ReadConcernLevel::kMajorityReadConcern);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
+ }
+
+ result.append("msg", "draining ongoing");
+ result.append("state", "ongoing");
+ {
+ BSONObjBuilder inner;
+ inner.append("chunks", static_cast<long long>(chunks.size()));
+ inner.append("dbs", static_cast<long long>(databases.size()));
+ BSONObj b = inner.obj();
+ result.append("remaining", b);
+ }
+ result.appendElements(dbInfo);
+ break;
+ }
+ case ShardDrainingStatus::COMPLETED:
+ result.append("msg", "removeshard completed successfully");
+ result.append("state", "completed");
+ result.append("shard", shard->getId().toString());
+ }
+
return true;
}
diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript
index a0d2be936ac..e099aa25dc5 100644
--- a/src/mongo/s/catalog/SConscript
+++ b/src/mongo/s/catalog/SConscript
@@ -180,6 +180,7 @@ env.CppUnitTest(
'sharding_catalog_enable_sharding_test.cpp',
'sharding_catalog_merge_chunks_test.cpp',
'sharding_catalog_remove_shard_from_zone_test.cpp',
+ 'sharding_catalog_remove_shard_test.cpp',
'sharding_catalog_shard_collection_test.cpp',
'sharding_catalog_split_chunk_test.cpp',
],
@@ -195,7 +196,6 @@ env.CppUnitTest(
'sharding_catalog_append_db_stats_test.cpp',
'sharding_catalog_drop_coll_test.cpp',
'sharding_catalog_log_change_test.cpp',
- 'sharding_catalog_remove_shard_test.cpp',
'sharding_catalog_test.cpp',
'sharding_catalog_write_retry_test.cpp',
],
diff --git a/src/mongo/s/catalog/sharding_catalog_client.cpp b/src/mongo/s/catalog/sharding_catalog_client.cpp
index 35befd0a6b8..ed17aa3b743 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client.cpp
@@ -40,4 +40,7 @@ const WriteConcernOptions ShardingCatalogClient::kMajorityWriteConcern(
WriteConcernOptions::SyncMode::UNSET,
Seconds(15));
+const WriteConcernOptions ShardingCatalogClient::kLocalWriteConcern(
+ 1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index 2069a90e5d3..98284b27e88 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -71,16 +71,6 @@ struct ConnectionPoolStats;
}
/**
- * Used to indicate to the caller of the removeShard method whether draining of chunks for
- * a particular shard has started, is ongoing, or has been completed.
- */
-enum ShardDrainingStatus {
- STARTED,
- ONGOING,
- COMPLETED,
-};
-
-/**
* Abstracts reads of the sharding catalog metadata.
*
* All implementations of this interface should go directly to the persistent backing store
@@ -101,6 +91,9 @@ public:
// Constant to use for configuration data majority writes
static const WriteConcernOptions kMajorityWriteConcern;
+ // Constant to use for configuration data local writes
+ static const WriteConcernOptions kLocalWriteConcern;
+
virtual ~ShardingCatalogClient() = default;
/**
@@ -116,17 +109,6 @@ public:
virtual void shutDown(OperationContext* opCtx) = 0;
/**
- * Tries to remove a shard. To completely remove a shard from a sharded cluster,
- * the data residing in that shard must be moved to the remaining shards in the
- * cluster by "draining" chunks from that shard.
- *
- * Because of the asynchronous nature of the draining mechanism, this method returns
- * the current draining status. See ShardDrainingStatus enum definition for more details.
- */
- virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* opCtx,
- const ShardId& name) = 0;
-
- /**
* Updates or creates the metadata for a given database.
*/
virtual Status updateDatabase(OperationContext* opCtx,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index 4e013100eb2..65e2e69792f 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -265,121 +265,6 @@ Status ShardingCatalogClientImpl::_log(OperationContext* opCtx,
return result;
}
-StatusWith<ShardDrainingStatus> ShardingCatalogClientImpl::removeShard(OperationContext* opCtx,
- const ShardId& shardId) {
- // Check preconditions for removing the shard
- string name = shardId.toString();
- auto countStatus = _runCountCommandOnConfig(
- opCtx,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << name << ShardType::draining(true)));
- if (!countStatus.isOK()) {
- return countStatus.getStatus();
- }
- if (countStatus.getValue() > 0) {
- return Status(ErrorCodes::ConflictingOperationInProgress,
- "Can't have more than one draining shard at a time");
- }
-
- countStatus = _runCountCommandOnConfig(
- opCtx, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name() << NE << name));
- if (!countStatus.isOK()) {
- return countStatus.getStatus();
- }
- if (countStatus.getValue() == 0) {
- return Status(ErrorCodes::IllegalOperation, "Can't remove last shard");
- }
-
- // Figure out if shard is already draining
- countStatus =
- _runCountCommandOnConfig(opCtx,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << name << ShardType::draining(true)));
- if (!countStatus.isOK()) {
- return countStatus.getStatus();
- }
-
- auto* const shardRegistry = Grid::get(opCtx)->shardRegistry();
-
- if (countStatus.getValue() == 0) {
- log() << "going to start draining shard: " << name;
-
- auto updateStatus = updateConfigDocument(opCtx,
- ShardType::ConfigNS,
- BSON(ShardType::name() << name),
- BSON("$set" << BSON(ShardType::draining(true))),
- false,
- ShardingCatalogClient::kMajorityWriteConcern);
- if (!updateStatus.isOK()) {
- log() << "error starting removeShard: " << name
- << causedBy(redact(updateStatus.getStatus()));
- return updateStatus.getStatus();
- }
-
- shardRegistry->reload(opCtx);
-
- // Record start in changelog
- logChange(opCtx,
- "removeShard.start",
- "",
- BSON("shard" << name),
- ShardingCatalogClientImpl::kMajorityWriteConcern)
- .ignore();
-
- return ShardDrainingStatus::STARTED;
- }
-
- // Draining has already started, now figure out how many chunks and databases are still on the
- // shard.
- countStatus = _runCountCommandOnConfig(
- opCtx, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(name)));
- if (!countStatus.isOK()) {
- return countStatus.getStatus();
- }
- const long long chunkCount = countStatus.getValue();
-
- countStatus = _runCountCommandOnConfig(
- opCtx, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::primary(name)));
- if (!countStatus.isOK()) {
- return countStatus.getStatus();
- }
- const long long databaseCount = countStatus.getValue();
-
- if (chunkCount > 0 || databaseCount > 0) {
- // Still more draining to do
- return ShardDrainingStatus::ONGOING;
- }
-
- // Draining is done, now finish removing the shard.
- log() << "going to remove shard: " << name;
- audit::logRemoveShard(opCtx->getClient(), name);
-
- Status status = removeConfigDocuments(opCtx,
- ShardType::ConfigNS,
- BSON(ShardType::name() << name),
- ShardingCatalogClient::kMajorityWriteConcern);
- if (!status.isOK()) {
- log() << "Error concluding removeShard operation on: " << name
- << "; err: " << status.reason();
- return status;
- }
-
- shardConnectionPool.removeHost(name);
- ReplicaSetMonitor::remove(name);
-
- shardRegistry->reload(opCtx);
-
- // Record finish in changelog
- logChange(opCtx,
- "removeShard",
- "",
- BSON("shard" << name),
- ShardingCatalogClientImpl::kMajorityWriteConcern)
- .ignore();
-
- return ShardDrainingStatus::COMPLETED;
-}
-
StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::getDatabase(
OperationContext* opCtx, const std::string& dbName) {
if (!NamespaceString::validDBName(dbName, NamespaceString::DollarInDbNameBehavior::Allow)) {
@@ -1340,40 +1225,6 @@ Status ShardingCatalogClientImpl::_createCappedConfigCollection(
return result.getValue().writeConcernStatus;
}
-StatusWith<long long> ShardingCatalogClientImpl::_runCountCommandOnConfig(OperationContext* opCtx,
- const NamespaceString& ns,
- BSONObj query) {
- BSONObjBuilder countBuilder;
- countBuilder.append("count", ns.coll());
- countBuilder.append("query", query);
- _appendReadConcern(&countBuilder);
-
- auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- auto resultStatus =
- configShard->runCommandWithFixedRetryAttempts(opCtx,
- kConfigReadSelector,
- ns.db().toString(),
- countBuilder.done(),
- Shard::kDefaultConfigCommandTimeout,
- Shard::RetryPolicy::kIdempotent);
- if (!resultStatus.isOK()) {
- return resultStatus.getStatus();
- }
- if (!resultStatus.getValue().commandStatus.isOK()) {
- return resultStatus.getValue().commandStatus;
- }
-
- auto responseObj = std::move(resultStatus.getValue().response);
-
- long long result;
- auto status = bsonExtractIntegerField(responseObj, "n", &result);
- if (!status.isOK()) {
- return status;
- }
-
- return result;
-}
-
StatusWith<repl::OpTimeWith<vector<BSONObj>>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h
index e184c078fff..45d466aa235 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h
@@ -86,9 +86,6 @@ public:
const BSONObj& detail,
const WriteConcernOptions& writeConcern) override;
- StatusWith<ShardDrainingStatus> removeShard(OperationContext* opCtx,
- const ShardId& name) override;
-
StatusWith<repl::OpTimeWith<DatabaseType>> getDatabase(OperationContext* opCtx,
const std::string& dbName) override;
@@ -214,14 +211,6 @@ private:
int cappedSize,
const WriteConcernOptions& writeConcern);
- /**
- * Helper method for running a count command against the config server with appropriate
- * error handling.
- */
- StatusWith<long long> _runCountCommandOnConfig(OperationContext* opCtx,
- const NamespaceString& ns,
- BSONObj query);
-
StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 1df6313afec..3dd25980159 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -66,11 +66,6 @@ Status ShardingCatalogClientMock::enableSharding(OperationContext* opCtx,
return {ErrorCodes::InternalError, "Method not implemented"};
}
-StatusWith<ShardDrainingStatus> ShardingCatalogClientMock::removeShard(OperationContext* opCtx,
- const ShardId& name) {
- return ShardDrainingStatus::COMPLETED;
-}
-
Status ShardingCatalogClientMock::updateDatabase(OperationContext* opCtx,
const string& dbName,
const DatabaseType& db) {
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 6189ad67660..91fdc4d48b2 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -47,9 +47,6 @@ public:
Status enableSharding(OperationContext* opCtx, const std::string& dbName);
- StatusWith<ShardDrainingStatus> removeShard(OperationContext* opCtx,
- const ShardId& name) override;
-
Status updateDatabase(OperationContext* opCtx,
const std::string& dbName,
const DatabaseType& db) override;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
index b323469f178..4c78b2fa4ea 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -31,6 +31,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status_with.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/repl/optime_with.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_database.h"
@@ -48,6 +49,16 @@ class ServiceContext;
class UUID;
/**
+ * Used to indicate to the caller of the removeShard method whether draining of chunks for
+ * a particular shard has started, is ongoing, or has been completed.
+ */
+enum ShardDrainingStatus {
+ STARTED,
+ ONGOING,
+ COMPLETED,
+};
+
+/**
* Implements modifications to the sharding catalog metadata.
*
* TODO: Currently the code responsible for writing the sharding catalog metadata is split between
@@ -207,6 +218,15 @@ public:
*/
Status enableSharding(OperationContext* opCtx, const std::string& dbName);
+ /**
+ * Retrieves all databases for a shard.
+ *
+ * Returns a !OK status if an error occurs.
+ */
+ Status getDatabasesForShard(OperationContext* opCtx,
+ const ShardId& shardId,
+ std::vector<std::string>* dbs);
+
//
// Collection Operations
@@ -259,6 +279,16 @@ public:
const ConnectionString& shardConnectionString,
const long long maxSize);
+ /**
+ * Tries to remove a shard. To completely remove a shard from a sharded cluster,
+ * the data residing in that shard must be moved to the remaining shards in the
+ * cluster by "draining" chunks from that shard.
+ *
+ * Because of the asynchronous nature of the draining mechanism, this method returns
+ * the current draining status. See ShardDrainingStatus enum definition for more details.
+ */
+ StatusWith<ShardDrainingStatus> removeShard(OperationContext* opCtx, const ShardId& shardId);
+
//
// Cluster Upgrade Operations
//
@@ -379,6 +409,19 @@ private:
static StatusWith<ShardId> _selectShardForNewDatabase(OperationContext* opCtx,
ShardRegistry* shardRegistry);
+ /**
+ * Helper method for running a count command against the config server with appropriate error
+ * handling.
+ */
+ StatusWith<long long> _runCountCommandOnConfig(OperationContext* opCtx,
+ const NamespaceString& ns,
+ BSONObj query);
+
+ /**
+ * Appends a read committed read concern to the request object.
+ */
+ void _appendReadConcern(BSONObjBuilder* builder);
+
// The owning service context
ServiceContext* const _serviceContext;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_database_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_database_operations.cpp
index 26ebb518f4b..c7282833631 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_database_operations.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_database_operations.cpp
@@ -32,6 +32,7 @@
#include <pcrecpp.h>
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/namespace_string.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
#include "mongo/s/catalog/type_database.h"
@@ -44,8 +45,12 @@ namespace mongo {
using std::string;
using std::vector;
+namespace {
+
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
+} // namespace
+
Status ShardingCatalogManager::createDatabase(OperationContext* opCtx, const std::string& dbName) {
invariant(nsIsDbOnly(dbName));
@@ -162,6 +167,7 @@ Status ShardingCatalogManager::_checkDbDoesNotExist(OperationContext* opCtx,
queryBuilder.obj(),
BSONObj(),
1);
+
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -195,4 +201,30 @@ Status ShardingCatalogManager::_checkDbDoesNotExist(OperationContext* opCtx,
<< dbName);
}
+Status ShardingCatalogManager::getDatabasesForShard(OperationContext* opCtx,
+ const ShardId& shardId,
+ std::vector<std::string>* dbs) {
+ auto findStatus = Grid::get(opCtx)->catalogClient()->_exhaustiveFindOnConfig(
+ opCtx,
+ kConfigReadSelector,
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(DatabaseType::ConfigNS),
+ BSON(DatabaseType::primary(shardId.toString())),
+ BSONObj(),
+ boost::none); // no limit
+
+ for (const BSONObj& obj : findStatus.getValue().value) {
+ std::string dbName;
+ Status status = bsonExtractStringField(obj, DatabaseType::name(), &dbName);
+ if (!status.isOK()) {
+ dbs->clear();
+ return status;
+ }
+
+ dbs->push_back(dbName);
+ }
+
+ return Status::OK();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp
index 93560936442..076f3885cb0 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp
@@ -42,6 +42,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/audit.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/db_raii.h"
@@ -59,6 +60,7 @@
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
@@ -698,6 +700,129 @@ StatusWith<std::string> ShardingCatalogManager::addShard(
return shardType.getName();
}
+StatusWith<ShardDrainingStatus> ShardingCatalogManager::removeShard(OperationContext* opCtx,
+ const ShardId& shardId) {
+ // Check preconditions for removing the shard
+ std::string name = shardId.toString();
+ auto countStatus = _runCountCommandOnConfig(
+ opCtx,
+ NamespaceString(ShardType::ConfigNS),
+ BSON(ShardType::name() << NE << name << ShardType::draining(true)));
+ if (!countStatus.isOK()) {
+ return countStatus.getStatus();
+ }
+ if (countStatus.getValue() > 0) {
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ "Can't have more than one draining shard at a time");
+ }
+
+ countStatus = _runCountCommandOnConfig(
+ opCtx, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name() << NE << name));
+ if (!countStatus.isOK()) {
+ return countStatus.getStatus();
+ }
+ if (countStatus.getValue() == 0) {
+ return Status(ErrorCodes::IllegalOperation, "Can't remove last shard");
+ }
+
+ // Figure out if shard is already draining
+ countStatus =
+ _runCountCommandOnConfig(opCtx,
+ NamespaceString(ShardType::ConfigNS),
+ BSON(ShardType::name() << name << ShardType::draining(true)));
+ if (!countStatus.isOK()) {
+ return countStatus.getStatus();
+ }
+
+ auto* const shardRegistry = Grid::get(opCtx)->shardRegistry();
+
+ if (countStatus.getValue() == 0) {
+ log() << "going to start draining shard: " << name;
+
+ auto updateStatus = Grid::get(opCtx)->catalogClient()->updateConfigDocument(
+ opCtx,
+ ShardType::ConfigNS,
+ BSON(ShardType::name() << name),
+ BSON("$set" << BSON(ShardType::draining(true))),
+ false,
+ ShardingCatalogClient::kLocalWriteConcern);
+ if (!updateStatus.isOK()) {
+ log() << "error starting removeShard: " << name
+ << causedBy(redact(updateStatus.getStatus()));
+ return updateStatus.getStatus();
+ }
+
+ shardRegistry->reload(opCtx);
+
+ // Record start in changelog
+ Grid::get(opCtx)
+ ->catalogClient()
+ ->logChange(opCtx,
+ "removeShard.start",
+ "",
+ BSON("shard" << name),
+ ShardingCatalogClient::kLocalWriteConcern)
+ .transitional_ignore();
+
+ return ShardDrainingStatus::STARTED;
+ }
+
+ // Draining has already started, now figure out how many chunks and databases are still on the
+ // shard.
+ countStatus = _runCountCommandOnConfig(
+ opCtx, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(name)));
+ if (!countStatus.isOK()) {
+ return countStatus.getStatus();
+ }
+ const long long chunkCount = countStatus.getValue();
+
+ countStatus = _runCountCommandOnConfig(
+ opCtx, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::primary(name)));
+ if (!countStatus.isOK()) {
+ return countStatus.getStatus();
+ }
+ const long long databaseCount = countStatus.getValue();
+
+ if (chunkCount > 0 || databaseCount > 0) {
+ // Still more draining to do
+ LOG(0) << "chunkCount: " << chunkCount;
+ LOG(0) << "databaseCount: " << databaseCount;
+ return ShardDrainingStatus::ONGOING;
+ }
+
+ // Draining is done, now finish removing the shard.
+ log() << "going to remove shard: " << name;
+ audit::logRemoveShard(opCtx->getClient(), name);
+
+ Status status = Grid::get(opCtx)->catalogClient()->removeConfigDocuments(
+ opCtx,
+ ShardType::ConfigNS,
+ BSON(ShardType::name() << name),
+ ShardingCatalogClient::kLocalWriteConcern);
+ if (!status.isOK()) {
+ log() << "Error concluding removeShard operation on: " << name
+ << "; err: " << status.reason();
+ return status;
+ }
+
+ shardConnectionPool.removeHost(name);
+ ReplicaSetMonitor::remove(name);
+
+ shardRegistry->reload(opCtx);
+
+ // Record finish in changelog
+ Grid::get(opCtx)
+ ->catalogClient()
+ ->logChange(opCtx,
+ "removeShard",
+ "",
+ BSON("shard" << name),
+ ShardingCatalogClient::kLocalWriteConcern)
+ .transitional_ignore();
+
+ return ShardDrainingStatus::COMPLETED;
+}
+
void ShardingCatalogManager::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}
@@ -768,4 +893,37 @@ StatusWith<ShardId> ShardingCatalogManager::_selectShardForNewDatabase(
return candidateShardId;
}
+StatusWith<long long> ShardingCatalogManager::_runCountCommandOnConfig(OperationContext* opCtx,
+ const NamespaceString& ns,
+ BSONObj query) {
+ BSONObjBuilder countBuilder;
+ countBuilder.append("count", ns.coll());
+ countBuilder.append("query", query);
+
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto resultStatus =
+ configShard->runCommandWithFixedRetryAttempts(opCtx,
+ kConfigReadSelector,
+ ns.db().toString(),
+ countBuilder.done(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent);
+ if (!resultStatus.isOK()) {
+ return resultStatus.getStatus();
+ }
+ if (!resultStatus.getValue().commandStatus.isOK()) {
+ return resultStatus.getValue().commandStatus;
+ }
+
+ auto responseObj = std::move(resultStatus.getValue().response);
+
+ long long result;
+ auto status = bsonExtractIntegerField(responseObj, "n", &result);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return result;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_remove_shard_test.cpp b/src/mongo/s/catalog/sharding_catalog_remove_shard_test.cpp
index 7b310ebe75d..644631d4459 100644
--- a/src/mongo/s/catalog/sharding_catalog_remove_shard_test.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_remove_shard_test.cpp
@@ -40,12 +40,15 @@
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
-#include "mongo/s/catalog/sharding_catalog_client_impl.h"
-#include "mongo/s/catalog/sharding_catalog_test_fixture.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_identity_loader.h"
+#include "mongo/s/config_server_test_fixture.h"
+#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/chrono.h"
#include "mongo/stdx/future.h"
@@ -71,314 +74,251 @@ BSONObj getReplSecondaryOkMetadata() {
return o.obj();
}
-class RemoveShardTest : public ShardingCatalogTestFixture {
-public:
+class RemoveShardTest : public ConfigServerTestFixture {
+protected:
+ /**
+ * Performs the test setup steps from the parent class and then configures the config shard and
+ * the client name.
+ */
void setUp() override {
- ShardingCatalogTestFixture::setUp();
- configTargeter()->setFindHostReturnValue(configHost);
+ ConfigServerTestFixture::setUp();
+
+ // Make sure clusterID is written to the config.version collection.
+ ASSERT_OK(ShardingCatalogManager::get(operationContext())
+ ->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ auto clusterIdLoader = ClusterIdentityLoader::get(operationContext());
+ ASSERT_OK(clusterIdLoader->loadClusterId(operationContext(),
+ repl::ReadConcernLevel::kLocalReadConcern));
+ _clusterId = clusterIdLoader->getClusterId();
+ }
+
+ /**
+ * Checks whether a particular shard's "draining" field is set to true.
+ */
+ bool isDraining(const std::string& shardName) {
+ auto response = assertGet(shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(ShardType::ConfigNS),
+ BSON(ShardType::name() << shardName),
+ BSONObj(),
+ 1));
+ BSONObj shardBSON = response.docs.front();
+ if (shardBSON.hasField("draining")) {
+ return shardBSON["draining"].Bool();
+ }
+ return false;
}
-protected:
const HostAndPort configHost{"TestHost1"};
+ OID _clusterId;
};
TEST_F(RemoveShardTest, RemoveShardAnotherShardDraining) {
- string shardName = "shardToRemove";
-
- auto future = launchAsync([&] {
- ASSERT_EQUALS(ErrorCodes::ConflictingOperationInProgress,
- catalogClient()->removeShard(operationContext(), shardName));
- });
-
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName << ShardType::draining(true)),
- 1);
- future.timed_get(kFutureTimeout);
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("host1:12345");
+ shard1.setMaxSizeMB(100);
+ shard1.setState(ShardType::ShardState::kShardAware);
+
+ ShardType shard2;
+ shard2.setName("shard2");
+ shard2.setHost("host2:12345");
+ shard2.setMaxSizeMB(100);
+ shard2.setState(ShardType::ShardState::kShardAware);
+
+ ASSERT_OK(setupShards(std::vector<ShardType>{shard1, shard2}));
+
+ auto result = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::STARTED, result);
+ ASSERT_TRUE(isDraining(shard1.getName()));
+
+ ASSERT_EQUALS(ErrorCodes::ConflictingOperationInProgress,
+ ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard2.getName()));
+ ASSERT_FALSE(isDraining(shard2.getName()));
}
TEST_F(RemoveShardTest, RemoveShardCantRemoveLastShard) {
string shardName = "shardToRemove";
- auto future = launchAsync([&] {
- ASSERT_EQUALS(ErrorCodes::IllegalOperation,
- catalogClient()->removeShard(operationContext(), shardName));
- });
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("host1:12345");
+ shard1.setMaxSizeMB(100);
+ shard1.setState(ShardType::ShardState::kShardAware);
- // Report that there are no other draining operations ongoing
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName << ShardType::draining(true)),
- 0);
+ ASSERT_OK(setupShards(std::vector<ShardType>{shard1}));
- // Now report that there are no other shard left
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName),
- 0);
-
- future.timed_get(kFutureTimeout);
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation,
+ ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_FALSE(isDraining(shard1.getName()));
}
TEST_F(RemoveShardTest, RemoveShardStartDraining) {
- string shardName = "shardToRemove";
- const HostAndPort clientHost{"client1:12345"};
- setRemote(clientHost);
-
- auto future = launchAsync([&] {
- auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName));
- ASSERT_EQUALS(ShardDrainingStatus::STARTED, result);
-
- });
-
- // Report that there are no other draining operations ongoing
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName << ShardType::draining(true)),
- 0);
-
- // Report that there *are* other shards left
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName),
- 1);
-
- // Report that the shard is not yet marked as draining
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << shardName << ShardType::draining(true)),
- 0);
-
- // Respond to request to update shard entry and mark it as draining.
- onCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(configHost, request.target);
- ASSERT_EQUALS("config", request.dbname);
-
- ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1),
- rpc::TrackingMetadata::removeTrackingData(request.metadata));
-
- const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
- const auto updateOp = UpdateOp::parse(opMsgRequest);
- ASSERT_EQUALS(ShardType::ConfigNS, updateOp.getNamespace().ns());
-
- const auto& updates = updateOp.getUpdates();
- ASSERT_EQUALS(1U, updates.size());
-
- const auto& update = updates.front();
- ASSERT(!update.getUpsert());
- ASSERT(!update.getMulti());
- ASSERT_BSONOBJ_EQ(BSON(ShardType::name() << shardName), update.getQ());
- ASSERT_BSONOBJ_EQ(BSON("$set" << BSON(ShardType::draining(true))), update.getU());
-
- BatchedCommandResponse response;
- response.setOk(true);
- response.setNModified(1);
-
- return response.toBSON();
- });
-
- // Respond to request to reload information about existing shards
- onFindCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(configHost, request.target);
- ASSERT_BSONOBJ_EQ(getReplSecondaryOkMetadata(),
- rpc::TrackingMetadata::removeTrackingData(request.metadata));
-
- const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
- auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
-
- ASSERT_EQ(ShardType::ConfigNS, query->ns());
- ASSERT_BSONOBJ_EQ(BSONObj(), query->getFilter());
- ASSERT_BSONOBJ_EQ(BSONObj(), query->getSort());
- ASSERT_FALSE(query->getLimit().is_initialized());
-
- checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
-
- ShardType remainingShard;
- remainingShard.setHost("host1");
- remainingShard.setName("shard0");
- return vector<BSONObj>{remainingShard.toBSON()};
- });
-
- expectChangeLogCreate(configHost, BSON("ok" << 1));
- expectChangeLogInsert(
- configHost, network()->now(), "removeShard.start", "", BSON("shard" << shardName));
-
- future.timed_get(kFutureTimeout);
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("host1:12345");
+ shard1.setMaxSizeMB(100);
+ shard1.setState(ShardType::ShardState::kShardAware);
+
+ ShardType shard2;
+ shard2.setName("shard2");
+ shard2.setHost("host2:12345");
+ shard2.setMaxSizeMB(100);
+ shard2.setState(ShardType::ShardState::kShardAware);
+
+ ASSERT_OK(setupShards(std::vector<ShardType>{shard1, shard2}));
+
+ auto result = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::STARTED, result);
+ ASSERT_TRUE(isDraining(shard1.getName()));
}
TEST_F(RemoveShardTest, RemoveShardStillDrainingChunksRemaining) {
- string shardName = "shardToRemove";
- auto future = launchAsync([&] {
- auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName));
- ASSERT_EQUALS(ShardDrainingStatus::ONGOING, result);
-
- });
-
- // Report that there are no other draining operations ongoing
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName << ShardType::draining(true)),
- 0);
-
- // Report that there *are* other shards left
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName),
- 1);
-
- // Report that the shard is already marked as draining
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << shardName << ShardType::draining(true)),
- 1);
-
- // Report that there are still chunks to drain
- expectCount(
- configHost, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(shardName)), 10);
-
- // Report that there are no more databases to drain
- expectCount(configHost,
- NamespaceString(DatabaseType::ConfigNS),
- BSON(DatabaseType::primary(shardName)),
- 0);
-
- future.timed_get(kFutureTimeout);
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("host1:12345");
+ shard1.setMaxSizeMB(100);
+ shard1.setState(ShardType::ShardState::kShardAware);
+
+ ShardType shard2;
+ shard2.setName("shard2");
+ shard2.setHost("host2:12345");
+ shard2.setMaxSizeMB(100);
+ shard2.setState(ShardType::ShardState::kShardAware);
+
+ auto epoch = OID::gen();
+ ChunkType chunk1(NamespaceString("testDB.testColl"),
+ ChunkRange(BSON("_id" << 0), BSON("_id" << 20)),
+ ChunkVersion(1, 1, epoch),
+ shard1.getName());
+ ChunkType chunk2(NamespaceString("testDB.testColl"),
+ ChunkRange(BSON("_id" << 21), BSON("_id" << 50)),
+ ChunkVersion(1, 2, epoch),
+ shard1.getName());
+ ChunkType chunk3(NamespaceString("testDB.testColl"),
+ ChunkRange(BSON("_id" << 51), BSON("_id" << 1000)),
+ ChunkVersion(1, 3, epoch),
+ shard1.getName());
+
+ ASSERT_OK(setupShards(std::vector<ShardType>{shard1, shard2}));
+ setupDatabase("testDB", shard1.getName(), true);
+ ASSERT_OK(setupChunks(std::vector<ChunkType>{chunk1, chunk2, chunk3}));
+
+ auto startedResult = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::STARTED, startedResult);
+ ASSERT_TRUE(isDraining(shard1.getName()));
+
+ auto ongoingResult = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::ONGOING, ongoingResult);
+ ASSERT_TRUE(isDraining(shard1.getName()));
}
TEST_F(RemoveShardTest, RemoveShardStillDrainingDatabasesRemaining) {
- string shardName = "shardToRemove";
- auto future = launchAsync([&] {
- auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName));
- ASSERT_EQUALS(ShardDrainingStatus::ONGOING, result);
-
- });
-
- // Report that there are no other draining operations ongoing
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName << ShardType::draining(true)),
- 0);
-
- // Report that there *are* other shards left
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName),
- 1);
-
- // Report that the shard is already marked as draining
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << shardName << ShardType::draining(true)),
- 1);
-
- // Report that there are no more chunks to drain
- expectCount(
- configHost, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(shardName)), 0);
-
- // Report that there are still more databases to drain
- expectCount(configHost,
- NamespaceString(DatabaseType::ConfigNS),
- BSON(DatabaseType::primary(shardName)),
- 5);
-
- future.timed_get(kFutureTimeout);
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("host1:12345");
+ shard1.setMaxSizeMB(100);
+ shard1.setState(ShardType::ShardState::kShardAware);
+
+ ShardType shard2;
+ shard2.setName("shard2");
+ shard2.setHost("host2:12345");
+ shard2.setMaxSizeMB(100);
+ shard2.setState(ShardType::ShardState::kShardAware);
+
+ ASSERT_OK(setupShards(std::vector<ShardType>{shard1, shard2}));
+ setupDatabase("testDB", shard1.getName(), false);
+
+ auto startedResult = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::STARTED, startedResult);
+ ASSERT_TRUE(isDraining(shard1.getName()));
+
+ auto ongoingResult = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::ONGOING, ongoingResult);
+ ASSERT_TRUE(isDraining(shard1.getName()));
}
TEST_F(RemoveShardTest, RemoveShardCompletion) {
- string shardName = "shardToRemove";
- const HostAndPort clientHost{"client1:12345"};
- setRemote(clientHost);
-
- auto future = launchAsync([&] {
- auto result = assertGet(catalogClient()->removeShard(operationContext(), shardName));
- ASSERT_EQUALS(ShardDrainingStatus::COMPLETED, result);
-
- });
-
- // Report that there are no other draining operations ongoing
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName << ShardType::draining(true)),
- 0);
-
- // Report that there *are* other shards left
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << NE << shardName),
- 1);
-
- // Report that the shard is already marked as draining
- expectCount(configHost,
- NamespaceString(ShardType::ConfigNS),
- BSON(ShardType::name() << shardName << ShardType::draining(true)),
- 1);
-
- // Report that there are no more chunks to drain
- expectCount(
- configHost, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(shardName)), 0);
-
- // Report that there are no more databases to drain
- expectCount(configHost,
- NamespaceString(DatabaseType::ConfigNS),
- BSON(DatabaseType::primary(shardName)),
- 0);
-
- // Respond to request to remove shard entry.
- onCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(configHost, request.target);
- ASSERT_EQUALS(NamespaceString::kConfigDb, request.dbname);
-
- ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1),
- rpc::TrackingMetadata::removeTrackingData(request.metadata));
-
- const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
- const auto deleteOp = DeleteOp::parse(opMsgRequest);
- ASSERT_EQUALS(ShardType::ConfigNS, deleteOp.getNamespace().ns());
-
- const auto& deletes = deleteOp.getDeletes();
- ASSERT_EQUALS(1U, deletes.size());
-
- const auto& deleteEntry = deletes.front();
- ASSERT(deleteEntry.getMulti());
- ASSERT_BSONOBJ_EQ(BSON(ShardType::name() << shardName), deleteEntry.getQ());
-
- BatchedCommandResponse response;
- response.setOk(true);
- response.setNModified(1);
-
- return response.toBSON();
- });
-
- // Respond to request to reload information about existing shards
- onFindCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(configHost, request.target);
- ASSERT_BSONOBJ_EQ(getReplSecondaryOkMetadata(),
- rpc::TrackingMetadata::removeTrackingData(request.metadata));
-
- const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
- auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
-
- ASSERT_EQ(ShardType::ConfigNS, query->ns());
- ASSERT_BSONOBJ_EQ(BSONObj(), query->getFilter());
- ASSERT_BSONOBJ_EQ(BSONObj(), query->getSort());
- ASSERT_FALSE(query->getLimit().is_initialized());
-
- checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
-
- ShardType remainingShard;
- remainingShard.setHost("host1");
- remainingShard.setName("shard0");
- return vector<BSONObj>{remainingShard.toBSON()};
- });
-
- expectChangeLogCreate(configHost, BSON("ok" << 1));
- expectChangeLogInsert(
- configHost, network()->now(), "removeShard", "", BSON("shard" << shardName));
-
- future.timed_get(kFutureTimeout);
+
+ ShardType shard1;
+ shard1.setName("shard1");
+ shard1.setHost("host1:12345");
+ shard1.setMaxSizeMB(100);
+ shard1.setState(ShardType::ShardState::kShardAware);
+
+ ShardType shard2;
+ shard2.setName("shard2");
+ shard2.setHost("host2:12345");
+ shard2.setMaxSizeMB(100);
+ shard2.setState(ShardType::ShardState::kShardAware);
+
+ auto epoch = OID::gen();
+ ChunkType chunk1(NamespaceString("testDB.testColl"),
+ ChunkRange(BSON("_id" << 0), BSON("_id" << 20)),
+ ChunkVersion(1, 1, epoch),
+ shard1.getName());
+ ChunkType chunk2(NamespaceString("testDB.testColl"),
+ ChunkRange(BSON("_id" << 21), BSON("_id" << 50)),
+ ChunkVersion(1, 2, epoch),
+ shard1.getName());
+ ChunkType chunk3(NamespaceString("testDB.testColl"),
+ ChunkRange(BSON("_id" << 51), BSON("_id" << 1000)),
+ ChunkVersion(1, 3, epoch),
+ shard1.getName());
+
+ std::vector<ChunkType> chunks{chunk1, chunk2, chunk3};
+
+ ASSERT_OK(setupShards(std::vector<ShardType>{shard1, shard2}));
+ setupDatabase("testDB", shard2.getName(), false);
+ ASSERT_OK(setupChunks(std::vector<ChunkType>{chunk1, chunk2, chunk3}));
+
+ auto startedResult = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::STARTED, startedResult);
+ ASSERT_TRUE(isDraining(shard1.getName()));
+
+ auto ongoingResult = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::ONGOING, ongoingResult);
+ ASSERT_TRUE(isDraining(shard1.getName()));
+
+ // Mock the operation during which the chunks are moved to the other shard.
+ const NamespaceString chunkNS(ChunkType::ConfigNS);
+ for (ChunkType chunk : chunks) {
+ ChunkType updatedChunk = chunk;
+ updatedChunk.setShard(shard2.getName());
+ ASSERT_OK(updateToConfigCollection(
+ operationContext(), chunkNS, chunk.toConfigBSON(), updatedChunk.toConfigBSON(), false));
+ }
+
+ auto completedResult = assertGet(ShardingCatalogManager::get(operationContext())
+ ->removeShard(operationContext(), shard1.getName()));
+ ASSERT_EQUALS(ShardDrainingStatus::COMPLETED, completedResult);
+
+ // Now make sure that the shard no longer exists on config.
+ auto response = assertGet(shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(ShardType::ConfigNS),
+ BSON(ShardType::name() << shard1.getName()),
+ BSONObj(),
+ 1));
+ ASSERT_TRUE(response.docs.empty());
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
index 79c2df15c47..4e5cea30cd2 100644
--- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
+++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
@@ -40,6 +40,7 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
@@ -55,16 +56,15 @@ public:
RemoveShardCmd() : BasicCommand("removeShard", "removeshard") {}
virtual bool slaveOk() const {
- return true;
+ return false;
}
virtual bool adminOnly() const {
return true;
}
-
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
+ return true;
}
virtual void help(std::stringstream& help) const {
@@ -83,91 +83,23 @@ public:
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
+
uassert(ErrorCodes::TypeMismatch,
str::stream() << "Field '" << cmdObj.firstElement().fieldName()
- << "' must be of type String",
+ << "' must be of type string",
cmdObj.firstElement().type() == BSONType::String);
- const string target = cmdObj.firstElement().str();
-
- const auto shardStatus = grid.shardRegistry()->getShard(opCtx, ShardId(target));
- if (!shardStatus.isOK()) {
- string msg(str::stream() << "Could not drop shard '" << target
- << "' because it does not exist");
- log() << msg;
- return appendCommandStatus(result, Status(ErrorCodes::ShardNotFound, msg));
- }
- const auto s = shardStatus.getValue();
-
- auto catalogClient = grid.catalogClient();
- StatusWith<ShardDrainingStatus> removeShardResult =
- catalogClient->removeShard(opCtx, s->getId());
- if (!removeShardResult.isOK()) {
- return appendCommandStatus(result, removeShardResult.getStatus());
- }
-
- vector<string> databases;
- Status status = catalogClient->getDatabasesForShard(opCtx, s->getId(), &databases);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
-
- // Get BSONObj containing:
- // 1) note about moving or dropping databases in a shard
- // 2) list of databases (excluding 'local' database) that need to be moved
- BSONObj dbInfo;
- {
- BSONObjBuilder dbInfoBuilder;
- dbInfoBuilder.append("note", "you need to drop or movePrimary these databases");
- BSONArrayBuilder dbs(dbInfoBuilder.subarrayStart("dbsToMove"));
- for (vector<string>::const_iterator it = databases.begin(); it != databases.end();
- it++) {
- if (*it != "local") {
- dbs.append(*it);
- }
- }
- dbs.doneFast();
- dbInfo = dbInfoBuilder.obj();
- }
-
- // TODO: Standardize/Seperate how we append to the result object
- switch (removeShardResult.getValue()) {
- case ShardDrainingStatus::STARTED:
- result.append("msg", "draining started successfully");
- result.append("state", "started");
- result.append("shard", s->getId().toString());
- result.appendElements(dbInfo);
- break;
- case ShardDrainingStatus::ONGOING: {
- vector<ChunkType> chunks;
- Status status =
- catalogClient->getChunks(opCtx,
- BSON(ChunkType::shard(s->getId().toString())),
- BSONObj(),
- boost::none, // return all
- &chunks,
- nullptr,
- repl::ReadConcernLevel::kMajorityReadConcern);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
-
- result.append("msg", "draining ongoing");
- result.append("state", "ongoing");
- {
- BSONObjBuilder inner;
- inner.append("chunks", static_cast<long long>(chunks.size()));
- inner.append("dbs", static_cast<long long>(databases.size()));
- BSONObj b = inner.obj();
- result.append("remaining", b);
- }
- result.appendElements(dbInfo);
- break;
- }
- case ShardDrainingStatus::COMPLETED:
- result.append("msg", "removeshard completed successfully");
- result.append("state", "completed");
- result.append("shard", s->getId().toString());
- }
+ const std::string target = cmdObj.firstElement().str();
+
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto cmdResponseStatus = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ Command::appendPassthroughFields(cmdObj, BSON("_configsvrRemoveShard" << target)),
+ Shard::RetryPolicy::kIdempotent));
+ uassertStatusOK(cmdResponseStatus.commandStatus);
+
+ Command::filterCommandReplyForPassthrough(cmdResponseStatus.response, &result);
return true;
}
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index 22bc7ac4ccb..2715c61d852 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -211,6 +211,60 @@ Status ConfigServerTestFixture::insertToConfigCollection(OperationContext* opCtx
return status;
}
+Status ConfigServerTestFixture::updateToConfigCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& query,
+ const BSONObj& update,
+ const bool upsert) {
+ auto updateResponse = getConfigShard()->runCommand(opCtx,
+ kReadPref,
+ ns.db().toString(),
+ [&]() {
+ write_ops::Update updateOp(ns);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(query);
+ entry.setU(update);
+ entry.setUpsert(upsert);
+ return entry;
+ }()});
+ return updateOp.toBSON({});
+ }(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kNoRetry);
+
+
+ BatchedCommandResponse batchResponse;
+ auto status = Shard::CommandResponse::processBatchWriteResponse(updateResponse, &batchResponse);
+ return status;
+}
+
+Status ConfigServerTestFixture::deleteToConfigCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& doc,
+ const bool multi) {
+ auto deleteReponse = getConfigShard()->runCommand(opCtx,
+ kReadPref,
+ ns.db().toString(),
+ [&]() {
+ write_ops::Delete deleteOp(ns);
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(doc);
+ entry.setMulti(multi);
+ return entry;
+ }()});
+ return deleteOp.toBSON({});
+ }(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kNoRetry);
+
+
+ BatchedCommandResponse batchResponse;
+ auto status = Shard::CommandResponse::processBatchWriteResponse(deleteReponse, &batchResponse);
+ return status;
+}
+
StatusWith<BSONObj> ConfigServerTestFixture::findOneOnConfigCollection(OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& filter) {
diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h
index 37ac1f5c570..cfbe682aa5e 100644
--- a/src/mongo/s/config_server_test_fixture.h
+++ b/src/mongo/s/config_server_test_fixture.h
@@ -63,6 +63,23 @@ public:
const BSONObj& doc);
/**
+ * Updates a document to this config server to the specified namespace.
+ */
+ Status updateToConfigCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& query,
+ const BSONObj& update,
+ const bool upsert);
+
+ /**
+ * Deletes a document to this config server to the specified namespace.
+ */
+ Status deleteToConfigCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& doc,
+ const bool multi);
+
+ /**
* Reads a single document from a collection living on the config server.
*/
StatusWith<BSONObj> findOneOnConfigCollection(OperationContext* opCtx,