summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp16
-rw-r--r--src/mongo/s/catalog/replset/SConscript2
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp106
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.h7
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp322
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp210
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h20
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_upgrade_test.cpp605
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h6
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp4
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h2
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h6
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.cpp4
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.h2
-rw-r--r--src/mongo/s/client/shard.h11
-rw-r--r--src/mongo/s/client/shard_local.cpp18
-rw-r--r--src/mongo/s/client/shard_local.h5
-rw-r--r--src/mongo/s/client/shard_local_test.cpp53
-rw-r--r--src/mongo/s/client/shard_remote.cpp7
-rw-r--r--src/mongo/s/client/shard_remote.h5
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp79
-rw-r--r--src/mongo/s/config_server_test_fixture.h12
-rw-r--r--src/mongo/s/server.cpp80
24 files changed, 753 insertions, 831 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index dbbe5d6aae2..191ce652127 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -130,7 +130,7 @@ error_code("LockNotFound", 128)
error_code("LockStateChangeFailed", 129)
error_code("SymbolNotFound", 130)
error_code("RLPInitializationFailed", 131)
-error_code("ConfigServersInconsistent", 132)
+error_code("OBSOLETE_ConfigServersInconsistent", 132)
error_code("FailedToSatisfyReadPreference", 133)
error_code("ReadConcernMajorityNotAvailableYet", 134)
error_code("StaleTerm", 135)
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 496f26cc96a..f2227103413 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -69,6 +69,7 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/s/balancer/balancer.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/functional.h"
@@ -470,6 +471,21 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat
}
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ status = Grid::get(txn)->catalogManager()->initializeConfigDatabaseIfNeeded(txn);
+ if (!status.isOK()) {
+ if (status == ErrorCodes::ShutdownInProgress ||
+ status == ErrorCodes::InterruptedAtShutdown) {
+ // Don't fassert if we're mid-shutdown, let the shutdown happen gracefully.
+ return;
+ }
+ fassertFailedWithStatus(40184,
+ Status(status.code(),
+ str::stream()
+ << "Failed to initialize config database on config "
+ "server's first transition to primary"
+ << causedBy(status)));
+ }
+
// If this is a config server node becoming a primary, start the balancer
auto balancer = Balancer::get(txn);
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index ec9157368fe..b741d44dc4c 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -109,6 +109,7 @@ env.CppUnitTest(
source=[
'sharding_catalog_add_shard_to_zone_test.cpp',
'sharding_catalog_remove_shard_from_zone_test.cpp',
+ 'sharding_catalog_config_initialization_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/replmocks',
@@ -126,7 +127,6 @@ env.CppUnitTest(
'sharding_catalog_remove_shard_test.cpp',
'sharding_catalog_shard_collection_test.cpp',
'sharding_catalog_test.cpp',
- 'sharding_catalog_upgrade_test.cpp',
'sharding_catalog_write_retry_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
index ae8801ebbe9..53464281c65 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
@@ -54,12 +54,10 @@
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
-#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
@@ -101,7 +99,6 @@ const char kWriteConcernField[] = "writeConcern";
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred,
TagSet{});
-const int kMaxConfigVersionInitRetry = 3;
const int kMaxReadRetry = 3;
const int kMaxWriteRetry = 3;
@@ -1186,7 +1183,7 @@ Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* txn,
const ChunkVersion& lastChunkVersion) {
BSONObj cmd =
BSON("applyOps" << updateOps << "preCondition" << preCondition << kWriteConcernField
- << kMajorityWriteConcern.toBSON());
+ << ShardingCatalogClient::kMajorityWriteConcern.toBSON());
auto response = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand(
txn,
@@ -1555,107 +1552,6 @@ StatusWith<long long> ShardingCatalogClientImpl::_runCountCommandOnConfig(Operat
return result;
}
-Status ShardingCatalogClientImpl::initConfigVersion(OperationContext* txn) {
- for (int x = 0; x < kMaxConfigVersionInitRetry; x++) {
- auto versionStatus = _getConfigVersion(txn);
- if (!versionStatus.isOK()) {
- return versionStatus.getStatus();
- }
-
- auto versionInfo = versionStatus.getValue();
- if (versionInfo.getMinCompatibleVersion() > CURRENT_CONFIG_VERSION) {
- return {ErrorCodes::IncompatibleShardingConfigVersion,
- str::stream() << "current version v" << CURRENT_CONFIG_VERSION
- << " is older than the cluster min compatible v"
- << versionInfo.getMinCompatibleVersion()};
- }
-
- if (versionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion) {
- VersionType newVersion;
- newVersion.setClusterId(OID::gen());
- newVersion.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION);
- newVersion.setCurrentVersion(CURRENT_CONFIG_VERSION);
-
- BSONObj versionObj(newVersion.toBSON());
- auto upsertStatus = updateConfigDocument(txn,
- VersionType::ConfigNS,
- versionObj,
- versionObj,
- true,
- ShardingCatalogClient::kMajorityWriteConcern);
-
- if ((upsertStatus.isOK() && !upsertStatus.getValue()) ||
- upsertStatus == ErrorCodes::DuplicateKey) {
- // Do the check again as someone inserted a new config version document
- // and the upsert neither inserted nor updated a config version document.
- // Note: you can get duplicate key errors on upsert because of SERVER-14322.
- continue;
- }
-
- return upsertStatus.getStatus();
- }
-
- if (versionInfo.getCurrentVersion() == UpgradeHistory_UnreportedVersion) {
- return {ErrorCodes::IncompatibleShardingConfigVersion,
- "Assuming config data is old since the version document cannot be found in the "
- "config server and it contains databases aside 'local' and 'admin'. "
- "Please upgrade if this is the case. Otherwise, make sure that the config "
- "server is clean."};
- }
-
- if (versionInfo.getCurrentVersion() < CURRENT_CONFIG_VERSION) {
- return {ErrorCodes::IncompatibleShardingConfigVersion,
- str::stream() << "need to upgrade current cluster version to v"
- << CURRENT_CONFIG_VERSION
- << "; currently at v"
- << versionInfo.getCurrentVersion()};
- }
-
- return Status::OK();
- }
-
- return {ErrorCodes::IncompatibleShardingConfigVersion,
- str::stream() << "unable to create new config version document after "
- << kMaxConfigVersionInitRetry
- << " retries"};
-}
-
-StatusWith<VersionType> ShardingCatalogClientImpl::_getConfigVersion(OperationContext* txn) {
- auto findStatus = _exhaustiveFindOnConfig(txn,
- kConfigReadSelector,
- NamespaceString(VersionType::ConfigNS),
- BSONObj(),
- BSONObj(),
- boost::none /* no limit */);
- if (!findStatus.isOK()) {
- return findStatus.getStatus();
- }
-
- auto queryResults = findStatus.getValue().value;
-
- if (queryResults.size() > 1) {
- return {ErrorCodes::RemoteValidationError,
- str::stream() << "should only have 1 document in " << VersionType::ConfigNS};
- }
-
- if (queryResults.empty()) {
- VersionType versionInfo;
- versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
- versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion);
- return versionInfo;
- }
-
- BSONObj versionDoc = queryResults.front();
- auto versionTypeResult = VersionType::fromBSON(versionDoc);
- if (!versionTypeResult.isOK()) {
- return Status(ErrorCodes::UnsupportedFormat,
- str::stream() << "invalid config version document: " << versionDoc
- << versionTypeResult.getStatus().toString());
- }
-
- return versionTypeResult.getValue();
-}
-
StatusWith<repl::OpTimeWith<vector<BSONObj>>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig(
OperationContext* txn,
const ReadPreferenceSetting& readPref,
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
index 8607e31c824..9d9d060a8e8 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
@@ -174,8 +174,6 @@ public:
DistLockManager* getDistLockManager() override;
- Status initConfigVersion(OperationContext* txn) override;
-
Status appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) override;
@@ -245,11 +243,6 @@ private:
void _appendReadConcern(BSONObjBuilder* builder);
/**
- * Returns the current cluster schema/protocol version.
- */
- StatusWith<VersionType> _getConfigVersion(OperationContext* txn);
-
- /**
* Queries the config servers for the database metadata for the given database, using the
* given read preference. Returns NamespaceNotFound if no database metadata is found.
*/
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp
new file mode 100644
index 00000000000..ca4f04bda77
--- /dev/null
+++ b/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp
@@ -0,0 +1,322 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <string>
+#include <vector>
+
+#include "mongo/bson/json.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/config_server_version.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_config_version.h"
+#include "mongo/s/catalog/type_lockpings.h"
+#include "mongo/s/catalog/type_locks.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog/type_tags.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/config_server_test_fixture.h"
+
+namespace mongo {
+namespace {
+
+using std::string;
+using std::vector;
+using unittest::assertGet;
+
+/**
+ * Takes two arrays of BSON objects and asserts that they contain the same documents
+ */
+void assertBSONObjsSame(const std::vector<BSONObj>& expectedBSON,
+ const std::vector<BSONObj>& foundBSON) {
+ ASSERT_EQUALS(expectedBSON.size(), foundBSON.size());
+
+ for (const auto& expectedObj : expectedBSON) {
+ bool wasFound = false;
+ for (const auto& foundObj : foundBSON) {
+ if (expectedObj.woCompare(foundObj) == 0) {
+ wasFound = true;
+ break;
+ }
+ }
+ ASSERT_TRUE(wasFound);
+ }
+}
+
+using ConfigInitializationTest = ConfigServerTestFixture;
+
+TEST_F(ConfigInitializationTest, UpgradeNotNeeded) {
+ VersionType version;
+ version.setClusterId(OID::gen());
+ version.setCurrentVersion(CURRENT_CONFIG_VERSION);
+ version.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION);
+ ASSERT_OK(insertToConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), version.toBSON()));
+
+ ASSERT_OK(catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ auto versionDoc = assertGet(findOneOnConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), BSONObj()));
+
+ VersionType foundVersion = assertGet(VersionType::fromBSON(versionDoc));
+
+ ASSERT_EQUALS(version.getClusterId(), foundVersion.getClusterId());
+ ASSERT_EQUALS(version.getCurrentVersion(), foundVersion.getCurrentVersion());
+ ASSERT_EQUALS(version.getMinCompatibleVersion(), foundVersion.getMinCompatibleVersion());
+}
+
+TEST_F(ConfigInitializationTest, InitIncompatibleVersion) {
+ VersionType version;
+ version.setClusterId(OID::gen());
+ version.setCurrentVersion(MIN_COMPATIBLE_CONFIG_VERSION - 1);
+ version.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION - 2);
+ ASSERT_OK(insertToConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), version.toBSON()));
+
+ ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion,
+ catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ auto versionDoc = assertGet(findOneOnConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), BSONObj()));
+
+ VersionType foundVersion = assertGet(VersionType::fromBSON(versionDoc));
+
+ ASSERT_EQUALS(version.getClusterId(), foundVersion.getClusterId());
+ ASSERT_EQUALS(version.getCurrentVersion(), foundVersion.getCurrentVersion());
+ ASSERT_EQUALS(version.getMinCompatibleVersion(), foundVersion.getMinCompatibleVersion());
+}
+
+TEST_F(ConfigInitializationTest, InitClusterMultipleVersionDocs) {
+ VersionType version;
+ version.setClusterId(OID::gen());
+ version.setCurrentVersion(MIN_COMPATIBLE_CONFIG_VERSION - 2);
+ version.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION - 3);
+ ASSERT_OK(insertToConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), version.toBSON()));
+
+ ASSERT_OK(insertToConfigCollection(operationContext(),
+ NamespaceString(VersionType::ConfigNS),
+ BSON("_id"
+ << "a second document")));
+
+ ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion,
+ catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+}
+
+TEST_F(ConfigInitializationTest, InitInvalidConfigVersionDoc) {
+ BSONObj versionDoc(fromjson(R"({
+ _id: 1,
+ minCompatibleVersion: "should be numeric",
+ currentVersion: 7,
+ clusterId: ObjectId("55919cc6dbe86ce7ac056427")
+ })"));
+ ASSERT_OK(insertToConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), versionDoc));
+
+ ASSERT_EQ(ErrorCodes::UnsupportedFormat,
+ catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+}
+
+
+TEST_F(ConfigInitializationTest, InitNoVersionDocEmptyConfig) {
+ // Make sure there is no existing document
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument,
+ findOneOnConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), BSONObj()));
+
+ ASSERT_OK(catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ auto versionDoc = assertGet(findOneOnConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), BSONObj()));
+
+ VersionType foundVersion = assertGet(VersionType::fromBSON(versionDoc));
+
+ ASSERT_TRUE(foundVersion.getClusterId().isSet());
+ ASSERT_EQUALS(CURRENT_CONFIG_VERSION, foundVersion.getCurrentVersion());
+ ASSERT_EQUALS(MIN_COMPATIBLE_CONFIG_VERSION, foundVersion.getMinCompatibleVersion());
+}
+
+TEST_F(ConfigInitializationTest, InitVersionTooHigh) {
+ VersionType version;
+ version.setClusterId(OID::gen());
+ version.setCurrentVersion(10000);
+ version.setMinCompatibleVersion(10000);
+ ASSERT_OK(insertToConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), version.toBSON()));
+
+ ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion,
+ catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+}
+
+TEST_F(ConfigInitializationTest, OnlyRunsOnce) {
+ ASSERT_OK(catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ auto versionDoc = assertGet(findOneOnConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), BSONObj()));
+
+ VersionType foundVersion = assertGet(VersionType::fromBSON(versionDoc));
+
+ ASSERT_TRUE(foundVersion.getClusterId().isSet());
+ ASSERT_EQUALS(CURRENT_CONFIG_VERSION, foundVersion.getCurrentVersion());
+ ASSERT_EQUALS(MIN_COMPATIBLE_CONFIG_VERSION, foundVersion.getMinCompatibleVersion());
+
+ // Now drop all databases and re-run initializeConfigDatabaseIfNeeded()
+ _dropAllDBs(operationContext());
+
+ ASSERT_OK(catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ // Even though there was no version document, initializeConfigDatabaseIfNeeded() returned
+ // without making one because it has already run once successfully so didn't bother to check.
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument,
+ findOneOnConfigCollection(
+ operationContext(), NamespaceString(VersionType::ConfigNS), BSONObj()));
+}
+
+TEST_F(ConfigInitializationTest, BuildsNecessaryIndexes) {
+ ASSERT_OK(catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ auto expectedChunksIndexes = std::vector<BSONObj>{
+ BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << "config.chunks"),
+ BSON("v" << 1 << "unique" << true << "key" << BSON("ns" << 1 << "min" << 1) << "name"
+ << "ns_1_min_1"
+ << "ns"
+ << "config.chunks"),
+ BSON("v" << 1 << "unique" << true << "key" << BSON("ns" << 1 << "shard" << 1 << "min" << 1)
+ << "name"
+ << "ns_1_shard_1_min_1"
+ << "ns"
+ << "config.chunks"),
+ BSON("v" << 1 << "unique" << true << "key" << BSON("ns" << 1 << "lastmod" << 1) << "name"
+ << "ns_1_lastmod_1"
+ << "ns"
+ << "config.chunks")};
+ auto expectedLockpingsIndexes =
+ std::vector<BSONObj>{BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << "config.lockpings"),
+ BSON("v" << 1 << "key" << BSON("ping" << 1) << "name"
+ << "ping_1"
+ << "ns"
+ << "config.lockpings")};
+ auto expectedLocksIndexes = std::vector<BSONObj>{
+ BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << "config.locks"),
+ BSON("v" << 1 << "key" << BSON("ts" << 1) << "name"
+ << "ts_1"
+ << "ns"
+ << "config.locks"),
+ BSON("v" << 1 << "key" << BSON("state" << 1 << "process" << 1) << "name"
+ << "state_1_process_1"
+ << "ns"
+ << "config.locks")};
+ auto expectedShardsIndexes = std::vector<BSONObj>{
+ BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << "config.shards"),
+ BSON("v" << 1 << "unique" << true << "key" << BSON("host" << 1) << "name"
+ << "host_1"
+ << "ns"
+ << "config.shards")};
+ auto expectedTagsIndexes = std::vector<BSONObj>{
+ BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << "config.tags"),
+ BSON("v" << 1 << "unique" << true << "key" << BSON("ns" << 1 << "min" << 1) << "name"
+ << "ns_1_min_1"
+ << "ns"
+ << "config.tags"),
+ BSON("v" << 1 << "key" << BSON("ns" << 1 << "tag" << 1) << "name"
+ << "ns_1_tag_1"
+ << "ns"
+ << "config.tags")};
+
+ auto foundChunksIndexes =
+ assertGet(getIndexes(operationContext(), NamespaceString(ChunkType::ConfigNS)));
+ assertBSONObjsSame(expectedChunksIndexes, foundChunksIndexes);
+
+ auto foundLockpingsIndexes =
+ assertGet(getIndexes(operationContext(), NamespaceString(LockpingsType::ConfigNS)));
+ assertBSONObjsSame(expectedLockpingsIndexes, foundLockpingsIndexes);
+
+ auto foundLocksIndexes =
+ assertGet(getIndexes(operationContext(), NamespaceString(LocksType::ConfigNS)));
+ assertBSONObjsSame(expectedLocksIndexes, foundLocksIndexes);
+
+ auto foundShardsIndexes =
+ assertGet(getIndexes(operationContext(), NamespaceString(ShardType::ConfigNS)));
+ assertBSONObjsSame(expectedShardsIndexes, foundShardsIndexes);
+
+ auto foundTagsIndexes =
+ assertGet(getIndexes(operationContext(), NamespaceString(TagsType::ConfigNS)));
+ assertBSONObjsSame(expectedTagsIndexes, foundTagsIndexes);
+}
+
+TEST_F(ConfigInitializationTest, CompatibleIndexAlreadyExists) {
+ getConfigShard()->createIndexOnConfig(
+ operationContext(), NamespaceString(ShardType::ConfigNS), BSON("host" << 1), true);
+
+ ASSERT_OK(catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+
+ auto expectedShardsIndexes = std::vector<BSONObj>{
+ BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_"
+ << "ns"
+ << "config.shards"),
+ BSON("v" << 1 << "unique" << true << "key" << BSON("host" << 1) << "name"
+ << "host_1"
+ << "ns"
+ << "config.shards")};
+
+
+ auto foundShardsIndexes =
+ assertGet(getIndexes(operationContext(), NamespaceString(ShardType::ConfigNS)));
+ assertBSONObjsSame(expectedShardsIndexes, foundShardsIndexes);
+}
+
+TEST_F(ConfigInitializationTest, IncompatibleIndexAlreadyExists) {
+ // Make the index non-unique even though its supposed to be unique, make sure initialization
+ // fails
+ getConfigShard()->createIndexOnConfig(
+ operationContext(), NamespaceString(ShardType::ConfigNS), BSON("host" << 1), false);
+
+ ASSERT_EQUALS(ErrorCodes::IndexOptionsConflict,
+ catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
+}
+
+} // unnamed namespace
+} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index b7269cc4739..39e0a6bae52 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -43,14 +43,20 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
+#include "mongo/s/catalog/type_lockpings.h"
+#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/client/shard.h"
@@ -719,4 +725,208 @@ void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolS
_executorForAddShard->appendConnectionStats(stats);
}
+Status ShardingCatalogManagerImpl::initializeConfigDatabaseIfNeeded(OperationContext* txn) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_configInitialized) {
+ return Status::OK();
+ }
+ }
+
+ Status status = _initConfigVersion(txn);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ status = _initConfigIndexes(txn);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _configInitialized = true;
+
+ return Status::OK();
+}
+
+StatusWith<VersionType> ShardingCatalogManagerImpl::_getConfigVersion(OperationContext* txn) {
+ auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ kConfigReadSelector,
+ // Use local read concern as we're likely to follow this up with a write.
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(VersionType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none /* no limit */);
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ auto queryResults = findStatus.getValue().docs;
+
+ if (queryResults.size() > 1) {
+ return {ErrorCodes::IncompatibleShardingConfigVersion,
+ str::stream() << "should only have 1 document in " << VersionType::ConfigNS};
+ }
+
+ if (queryResults.empty()) {
+ VersionType versionInfo;
+ versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
+ versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion);
+ return versionInfo;
+ }
+
+ BSONObj versionDoc = queryResults.front();
+ auto versionTypeResult = VersionType::fromBSON(versionDoc);
+ if (!versionTypeResult.isOK()) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ str::stream() << "invalid config version document: " << versionDoc
+ << versionTypeResult.getStatus().toString());
+ }
+
+ return versionTypeResult.getValue();
+}
+
+Status ShardingCatalogManagerImpl::_initConfigVersion(OperationContext* txn) {
+ auto versionStatus = _getConfigVersion(txn);
+ if (!versionStatus.isOK()) {
+ return versionStatus.getStatus();
+ }
+
+ auto versionInfo = versionStatus.getValue();
+ if (versionInfo.getMinCompatibleVersion() > CURRENT_CONFIG_VERSION) {
+ return {ErrorCodes::IncompatibleShardingConfigVersion,
+ str::stream() << "current version v" << CURRENT_CONFIG_VERSION
+ << " is older than the cluster min compatible v"
+ << versionInfo.getMinCompatibleVersion()};
+ }
+
+ if (versionInfo.getCurrentVersion() == UpgradeHistory_EmptyVersion) {
+ VersionType newVersion;
+ newVersion.setClusterId(OID::gen());
+ newVersion.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION);
+ newVersion.setCurrentVersion(CURRENT_CONFIG_VERSION);
+
+ BSONObj versionObj(newVersion.toBSON());
+ auto insertStatus = _catalogClient->insertConfigDocument(
+ txn, VersionType::ConfigNS, versionObj, kNoWaitWriteConcern);
+
+ return insertStatus;
+ }
+
+ if (versionInfo.getCurrentVersion() == UpgradeHistory_UnreportedVersion) {
+ return {ErrorCodes::IncompatibleShardingConfigVersion,
+ "Assuming config data is old since the version document cannot be found in the "
+ "config server and it contains databases besides 'local' and 'admin'. "
+ "Please upgrade if this is the case. Otherwise, make sure that the config "
+ "server is clean."};
+ }
+
+ if (versionInfo.getCurrentVersion() < CURRENT_CONFIG_VERSION) {
+ return {ErrorCodes::IncompatibleShardingConfigVersion,
+ str::stream() << "need to upgrade current cluster version to v"
+ << CURRENT_CONFIG_VERSION
+ << "; currently at v"
+ << versionInfo.getCurrentVersion()};
+ }
+
+ return Status::OK();
+}
+
+Status ShardingCatalogManagerImpl::_initConfigIndexes(OperationContext* txn) {
+ const bool unique = true;
+ auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
+
+ Status result =
+ configShard->createIndexOnConfig(txn,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << 1 << ChunkType::min() << 1),
+ unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create ns_1_min_1 index on config db"
+ << causedBy(result));
+ }
+
+ result = configShard->createIndexOnConfig(
+ txn,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << 1 << ChunkType::shard() << 1 << ChunkType::min() << 1),
+ unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create ns_1_shard_1_min_1 index on config db"
+ << causedBy(result));
+ }
+
+ result = configShard->createIndexOnConfig(
+ txn,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1),
+ unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create ns_1_lastmod_1 index on config db"
+ << causedBy(result));
+ }
+
+ result = configShard->createIndexOnConfig(
+ txn, NamespaceString(ShardType::ConfigNS), BSON(ShardType::host() << 1), unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create host_1 index on config db"
+ << causedBy(result));
+ }
+
+ result = configShard->createIndexOnConfig(
+ txn, NamespaceString(LocksType::ConfigNS), BSON(LocksType::lockID() << 1), !unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create lock id index on config db"
+ << causedBy(result));
+ }
+
+ result =
+ configShard->createIndexOnConfig(txn,
+ NamespaceString(LocksType::ConfigNS),
+ BSON(LocksType::state() << 1 << LocksType::process() << 1),
+ !unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create state and process id index on config db"
+ << causedBy(result));
+ }
+
+ result = configShard->createIndexOnConfig(
+ txn, NamespaceString(LockpingsType::ConfigNS), BSON(LockpingsType::ping() << 1), !unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create lockping ping time index on config db"
+ << causedBy(result));
+ }
+
+ result = configShard->createIndexOnConfig(txn,
+ NamespaceString(TagsType::ConfigNS),
+ BSON(TagsType::ns() << 1 << TagsType::min() << 1),
+ unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create ns_1_min_1 index on config db"
+ << causedBy(result));
+ }
+
+ result = configShard->createIndexOnConfig(txn,
+ NamespaceString(TagsType::ConfigNS),
+ BSON(TagsType::ns() << 1 << TagsType::tag() << 1),
+ !unique);
+ if (!result.isOK()) {
+ return Status(result.code(),
+ str::stream() << "couldn't create ns_1_tag_1 index on config db"
+ << causedBy(result));
+ }
+
+ return Status::OK();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index 3fb1f7c627b..bfacef70a57 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -38,6 +38,7 @@ namespace mongo {
class DatabaseType;
class ShardingCatalogClient;
+class VersionType;
namespace executor {
class TaskExecutor;
@@ -75,6 +76,8 @@ public:
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
+ Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
+
private:
/**
* Generates a unique name to be given to a newly added shard.
@@ -118,7 +121,21 @@ private:
const std::string& dbName,
const BSONObj& cmdObj);
+ /**
+ * Returns the current cluster schema/protocol version.
+ */
+ StatusWith<VersionType> _getConfigVersion(OperationContext* txn);
+ /**
+ * Performs the necessary checks for version compatibility and creates a new config.version
+ * document if the current cluster config is empty.
+ */
+ Status _initConfigVersion(OperationContext* txn);
+
+ /**
+ * Builds all the expected indexes on the config server.
+ */
+ Status _initConfigIndexes(OperationContext* txn);
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -145,6 +162,9 @@ private:
// True if startup() has been called.
bool _started = false; // (M)
+
+ // True if initializeConfigDatabaseIfNeeded() has been called and returned successfully.
+ bool _configInitialized = false; // (M)
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_upgrade_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_upgrade_test.cpp
deleted file mode 100644
index 8dc55bd5086..00000000000
--- a/src/mongo/s/catalog/replset/sharding_catalog_upgrade_test.cpp
+++ /dev/null
@@ -1,605 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <string>
-#include <vector>
-
-#include "mongo/bson/json.h"
-#include "mongo/client/remote_command_targeter_mock.h"
-#include "mongo/rpc/metadata/repl_set_metadata.h"
-#include "mongo/rpc/metadata/server_selection_metadata.h"
-#include "mongo/s/catalog/config_server_version.h"
-#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
-#include "mongo/s/catalog/replset/sharding_catalog_test_fixture.h"
-#include "mongo/s/catalog/type_config_version.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/s/write_ops/batched_update_request.h"
-
-namespace mongo {
-namespace {
-
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-using std::string;
-using std::vector;
-
-const BSONObj kReplSecondaryOkMetadata{[] {
- BSONObjBuilder o;
- o.appendElements(rpc::ServerSelectionMetadata(true, boost::none).toBSON());
- o.append(rpc::kReplSetMetadataFieldName, 1);
- return o.obj();
-}()};
-
-TEST_F(ShardingCatalogTestFixture, UpgradeNotNeeded) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future =
- launchAsync([this] { ASSERT_OK(catalogClient()->initConfigVersion(operationContext())); });
-
- onFindCommand([this](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
-
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- const auto& findCmd = request.cmdObj;
- ASSERT_EQ("version", findCmd["find"].str());
- checkReadConcern(findCmd, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
-
- BSONObj versionDoc(BSON("_id" << 1 << "minCompatibleVersion"
- << MIN_COMPATIBLE_CONFIG_VERSION
- << "currentVersion"
- << CURRENT_CONFIG_VERSION
- << "clusterId"
- << OID::gen()));
-
- return vector<BSONObj>{versionDoc};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitTargetError) {
- configTargeter()->setFindHostReturnValue({ErrorCodes::InternalError, "Bad test network"});
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::InternalError, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitIncompatibleVersion) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) {
- BSONObj versionDoc(fromjson(R"({
- _id: 1,
- minCompatibleVersion: 2,
- currentVersion: 3,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- return vector<BSONObj>{versionDoc};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitClusterMultiVersion) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::RemoteValidationError, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) {
-
- BSONObj versionDoc(fromjson(R"({
- _id: 1,
- minCompatibleVersion: 2,
- currentVersion: 3,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- BSONObj versionDoc2(fromjson(R"({
- _id: 2,
- minCompatibleVersion: 3,
- currentVersion: 4,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- return vector<BSONObj>{versionDoc, versionDoc2};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitInvalidConfigVersionDoc) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::UnsupportedFormat, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) {
- BSONObj versionDoc(fromjson(R"({
- _id: 1,
- minCompatibleVersion: "should be numeric",
- currentVersion: 7,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- return vector<BSONObj>{versionDoc};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitNoVersionDocEmptyConfig) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future =
- launchAsync([this] { ASSERT_OK(catalogClient()->initConfigVersion(operationContext())); });
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
-
- BatchedUpdateRequest actualBatchedUpdate;
- std::string errmsg;
- ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
- ASSERT_EQUALS(VersionType::ConfigNS, actualBatchedUpdate.getNS().ns());
-
- auto updates = actualBatchedUpdate.getUpdates();
- ASSERT_EQUALS(1U, updates.size());
- auto update = updates.front();
-
- ASSERT_EQUALS(update->getQuery(), update->getUpdateExpr());
- ASSERT_TRUE(update->getUpsert());
- ASSERT_FALSE(update->getMulti());
-
- auto versionDocRes = VersionType::fromBSON(update->getUpdateExpr());
- ASSERT_OK(versionDocRes.getStatus());
- const VersionType& versionDoc = versionDocRes.getValue();
-
- ASSERT_EQ(MIN_COMPATIBLE_CONFIG_VERSION, versionDoc.getMinCompatibleVersion());
- ASSERT_EQ(CURRENT_CONFIG_VERSION, versionDoc.getCurrentVersion());
- ASSERT_TRUE(versionDoc.isClusterIdSet());
- ASSERT_FALSE(versionDoc.isExcludingMongoVersionsSet());
- ASSERT_FALSE(versionDoc.isUpgradeIdSet());
- ASSERT_FALSE(versionDoc.isUpgradeStateSet());
-
- BatchedCommandResponse response;
- response.setOk(true);
- response.setN(1);
- response.setNModified(1);
-
- return response.toBSON();
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitConfigWriteError) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::ExceededTimeLimit, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- return fromjson(R"({
- ok: 1,
- nModified: 0,
- n: 0,
- writeErrors: [{
- index: 0,
- code: 50,
- errmsg: "exceeded time limit"
- }]
- })");
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitVersionTooOld) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) {
- BatchedCommandResponse response;
- response.setOk(true);
- response.setNModified(1);
-
- BSONObj versionDoc(fromjson(R"({
- _id: 1,
- minCompatibleVersion: 2000000000,
- currentVersion: 2000000000,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- return vector<BSONObj>{versionDoc};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitVersionDuplicateKeyNoOpAfterRetry) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future =
- launchAsync([this] { ASSERT_OK(catalogClient()->initConfigVersion(operationContext())); });
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQ(string("update"), request.cmdObj.firstElementFieldName());
-
- return fromjson(R"({
- ok: 1,
- nModified: 0,
- n: 0,
- writeErrors: [{
- index: 0,
- code: 11000,
- errmsg: "E11000 duplicate key error index: config.v.$_id_ dup key: { : 1.0 }"
- }]
- })");
- });
-
- // Retry starts here
-
- onFindCommand([this](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
-
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- const auto& findCmd = request.cmdObj;
- ASSERT_EQ("version", findCmd["find"].str());
- checkReadConcern(findCmd, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
-
- BSONObj versionDoc(fromjson(R"({
- _id: 1,
- minCompatibleVersion: 6,
- currentVersion: 7,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- return vector<BSONObj>{versionDoc};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitVersionDuplicateKeyNoConfigVersionAfterRetry) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future =
- launchAsync([this] { ASSERT_OK(catalogClient()->initConfigVersion(operationContext())); });
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQ(string("update"), request.cmdObj.firstElementFieldName());
-
- return fromjson(R"({
- ok: 1,
- nModified: 0,
- n: 0,
- writeErrors: [{
- index: 0,
- code: 11000,
- errmsg: "E11000 duplicate key error index: config.v.$_id_ dup key: { : 1.0 }"
- }]
- })");
- });
-
- // Retry starts here
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
-
- BatchedUpdateRequest actualBatchedUpdate;
- std::string errmsg;
- ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
- ASSERT_EQUALS(VersionType::ConfigNS, actualBatchedUpdate.getNS().ns());
-
- auto updates = actualBatchedUpdate.getUpdates();
- ASSERT_EQUALS(1U, updates.size());
- auto update = updates.front();
-
- ASSERT_EQUALS(update->getQuery(), update->getUpdateExpr());
- ASSERT_TRUE(update->getUpsert());
- ASSERT_FALSE(update->getMulti());
-
- auto versionDocRes = VersionType::fromBSON(update->getUpdateExpr());
- ASSERT_OK(versionDocRes.getStatus());
- const VersionType& versionDoc = versionDocRes.getValue();
-
- ASSERT_EQ(MIN_COMPATIBLE_CONFIG_VERSION, versionDoc.getMinCompatibleVersion());
- ASSERT_EQ(CURRENT_CONFIG_VERSION, versionDoc.getCurrentVersion());
- ASSERT_TRUE(versionDoc.isClusterIdSet());
- ASSERT_FALSE(versionDoc.isExcludingMongoVersionsSet());
- ASSERT_FALSE(versionDoc.isUpgradeIdSet());
- ASSERT_FALSE(versionDoc.isUpgradeStateSet());
-
- BatchedCommandResponse response;
- response.setOk(true);
- response.setN(1);
- response.setNModified(1);
-
- return response.toBSON();
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitVersionDuplicateKeyTooNewAfterRetry) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQ(string("update"), request.cmdObj.firstElementFieldName());
-
- return fromjson(R"({
- ok: 1,
- nModified: 0,
- n: 0,
- writeErrors: [{
- index: 0,
- code: 11000,
- errmsg: "E11000 duplicate key error index: config.v.$_id_ dup key: { : 1.0 }"
- }]
- })");
- });
-
- // Retry starts here
-
- onFindCommand([this](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
-
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- const auto& findCmd = request.cmdObj;
- ASSERT_EQ("version", findCmd["find"].str());
- checkReadConcern(findCmd, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
-
- BSONObj versionDoc(fromjson(R"({
- _id: 1,
- minCompatibleVersion: 2000000000,
- currentVersion: 2000000000,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- return vector<BSONObj>{versionDoc};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitVersionDuplicateKeyMaxRetry) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future = launchAsync([this] {
- auto status = catalogClient()->initConfigVersion(operationContext());
- ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- const int maxRetry = 3;
- for (int x = 0; x < maxRetry; x++) {
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQ(string("update"), request.cmdObj.firstElementFieldName());
-
- return fromjson(R"({
- ok: 1,
- nModified: 0,
- n: 0,
- writeErrors: [{
- index: 0,
- code: 11000,
- errmsg: "E11000 duplicate key error index: config.v.$_id_ dup key: { : 1 }"
- }]
- })");
- });
- }
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitVersionUpsertNoMatchNoOpAfterRetry) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future =
- launchAsync([this] { ASSERT_OK(catalogClient()->initConfigVersion(operationContext())); });
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQ(string("update"), request.cmdObj.firstElementFieldName());
-
- return fromjson(R"({
- ok: 1,
- nModified: 0,
- n: 0
- })");
- });
-
- // Retry starts here
-
- onFindCommand([this](const RemoteCommandRequest& request) {
- ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
-
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- const auto& findCmd = request.cmdObj;
- ASSERT_EQ("version", findCmd["find"].str());
- checkReadConcern(findCmd, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
-
- BSONObj versionDoc(fromjson(R"({
- _id: 1,
- minCompatibleVersion: 6,
- currentVersion: 7,
- clusterId: ObjectId("55919cc6dbe86ce7ac056427")
- })"));
-
- return vector<BSONObj>{versionDoc};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingCatalogTestFixture, InitVersionUpsertNoMatchNoConfigVersionAfterRetry) {
- configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
-
- auto future =
- launchAsync([this] { ASSERT_OK(catalogClient()->initConfigVersion(operationContext())); });
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQ(string("update"), request.cmdObj.firstElementFieldName());
-
- return fromjson(R"({
- ok: 1,
- nModified: 0,
- n: 0
- })");
- });
-
- // Retry starts here
-
- onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
-
- onCommand([](const RemoteCommandRequest& request) {
- ASSERT_EQ(HostAndPort("config:123"), request.target);
- ASSERT_EQ("config", request.dbname);
-
- ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
-
- BatchedUpdateRequest actualBatchedUpdate;
- std::string errmsg;
- ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
- ASSERT_EQUALS(VersionType::ConfigNS, actualBatchedUpdate.getNS().ns());
-
- auto updates = actualBatchedUpdate.getUpdates();
- ASSERT_EQUALS(1U, updates.size());
- auto update = updates.front();
-
- ASSERT_EQUALS(update->getQuery(), update->getUpdateExpr());
- ASSERT_TRUE(update->getUpsert());
- ASSERT_FALSE(update->getMulti());
-
- auto versionDocRes = VersionType::fromBSON(update->getUpdateExpr());
- ASSERT_OK(versionDocRes.getStatus());
- const VersionType& versionDoc = versionDocRes.getValue();
-
- ASSERT_EQ(MIN_COMPATIBLE_CONFIG_VERSION, versionDoc.getMinCompatibleVersion());
- ASSERT_EQ(CURRENT_CONFIG_VERSION, versionDoc.getCurrentVersion());
- ASSERT_TRUE(versionDoc.isClusterIdSet());
- ASSERT_FALSE(versionDoc.isExcludingMongoVersionsSet());
- ASSERT_FALSE(versionDoc.isUpgradeIdSet());
- ASSERT_FALSE(versionDoc.isUpgradeStateSet());
-
- BatchedCommandResponse response;
- response.setOk(true);
- response.setN(1);
- response.setNModified(1);
-
- return response.toBSON();
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-} // unnamed namespace
-} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index 69ad148860a..bbf59af157a 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -416,12 +416,6 @@ public:
const WriteConcernOptions& writeConcern) = 0;
/**
- * Performs the necessary checks for version compatibility and creates a new version document
- * if the current cluster config is empty.
- */
- virtual Status initConfigVersion(OperationContext* txn) = 0;
-
- /**
* Appends the information about the config and admin databases in the config server
* with the format for listDatabase.
*/
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 2481a5336e2..1ba10b51582 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -220,10 +220,6 @@ DistLockManager* ShardingCatalogClientMock::getDistLockManager() {
return _mockDistLockMgr.get();
}
-Status ShardingCatalogClientMock::initConfigVersion(OperationContext* txn) {
- return {ErrorCodes::InternalError, "Method not implemented"};
-}
-
Status ShardingCatalogClientMock::appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) {
return Status::OK();
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 60b09546631..125c6b36ca7 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -159,8 +159,6 @@ public:
StringData whyMessage,
Milliseconds waitFor) override;
- Status initConfigVersion(OperationContext* txn) override;
-
Status appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) override;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
index 637f8b3f955..c830e03a362 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -112,6 +112,12 @@ public:
*/
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
+ /**
+ * Initializes the collections that live in the config server. Mostly this involves building
+ * necessary indexes and populating the config.version document.
+ */
+ virtual Status initializeConfigDatabaseIfNeeded(OperationContext* txn) = 0;
+
protected:
ShardingCatalogManager() = default;
};
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
index bb12440da97..7ba2b08926f 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
@@ -70,4 +70,8 @@ Status ShardingCatalogManagerMock::removeShardFromZone(OperationContext* txn,
void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
+Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
index 4be21103b8e..49057fc1bde 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
@@ -58,6 +58,8 @@ public:
const std::string& zoneName) override;
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
+
+ Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
};
} // namespace mongo
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index d88d3fb67da..d2619f68b08 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -159,6 +159,17 @@ public:
const BSONObj& query,
const BSONObj& sort,
const boost::optional<long long> limit);
+ /**
+ * Builds an index on a config server collection.
+ * Creates the collection if it doesn't yet exist. Does not error if the index already exists,
+ * so long as the options are the same.
+ * NOTE: Currently only supported for LocalShard.
+ */
+ virtual Status createIndexOnConfig(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& keys,
+ bool unique) = 0;
+
protected:
Shard(const ShardId& id);
diff --git a/src/mongo/s/client/shard_local.cpp b/src/mongo/s/client/shard_local.cpp
index 86ba97eb148..dcf6db33205 100644
--- a/src/mongo/s/client/shard_local.cpp
+++ b/src/mongo/s/client/shard_local.cpp
@@ -178,5 +178,23 @@ StatusWith<Shard::QueryResponse> ShardLocal::_exhaustiveFindOnConfig(
}
}
+Status ShardLocal::createIndexOnConfig(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& keys,
+ bool unique) {
+ invariant(ns.db() == "config" || ns.db() == "admin");
+
+ try {
+ DBDirectClient client(txn);
+ IndexSpec index;
+ index.addKeys(keys);
+ index.unique(unique);
+ client.createIndex(ns.toString(), index);
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+
+ return Status::OK();
+}
} // namespace mongo
diff --git a/src/mongo/s/client/shard_local.h b/src/mongo/s/client/shard_local.h
index 764e814f25a..b39aba621d0 100644
--- a/src/mongo/s/client/shard_local.h
+++ b/src/mongo/s/client/shard_local.h
@@ -56,6 +56,11 @@ public:
bool isRetriableError(ErrorCodes::Error code, RetryPolicy options) final;
+ Status createIndexOnConfig(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& keys,
+ bool unique) override;
+
private:
StatusWith<Shard::CommandResponse> _runCommand(OperationContext* txn,
const ReadPreferenceSetting& unused,
diff --git a/src/mongo/s/client/shard_local_test.cpp b/src/mongo/s/client/shard_local_test.cpp
index b334b7a95bb..b79fef28600 100644
--- a/src/mongo/s/client/shard_local_test.cpp
+++ b/src/mongo/s/client/shard_local_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/find_and_modify_request.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
@@ -67,6 +68,11 @@ protected:
BSONObj sort,
boost::optional<long long> limit);
+ /**
+ * Returns the index definitions that exist for the given collection.
+ */
+ StatusWith<std::vector<BSONObj>> getIndexes(NamespaceString nss);
+
private:
void setUp() override;
void tearDown() override;
@@ -103,6 +109,26 @@ StatusWith<Shard::CommandResponse> ShardLocalTest::runFindAndModifyRunCommand(Na
Shard::RetryPolicy::kNoRetry);
}
+StatusWith<std::vector<BSONObj>> ShardLocalTest::getIndexes(NamespaceString nss) {
+ auto response = _shardLocal->runCommand(_txn.get(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ nss.db().toString(),
+ BSON("listIndexes" << nss.coll().toString()),
+ Shard::RetryPolicy::kIdempotent);
+ if (!response.isOK()) {
+ return response.getStatus();
+ }
+ if (!response.getValue().commandStatus.isOK()) {
+ return response.getValue().commandStatus;
+ }
+
+ auto cursorResponse = CursorResponse::parseFromBSON(response.getValue().response);
+ if (!cursorResponse.isOK()) {
+ return cursorResponse.getStatus();
+ }
+ return cursorResponse.getValue().getBatch();
+}
+
/**
* Takes a FindAndModify command's BSON response and parses it for the returned "value" field.
*/
@@ -210,5 +236,32 @@ TEST_F(ShardLocalTest, FindNoMatchingDocumentsEmpty) {
ASSERT_EQUALS(size, docs.size());
}
+TEST_F(ShardLocalTest, CreateIndex) {
+ NamespaceString nss("config.foo");
+
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, getIndexes(nss).getStatus());
+
+ Status status =
+ _shardLocal->createIndexOnConfig(_txn.get(), nss, BSON("a" << 1 << "b" << 1), true);
+ // Creating the index should implicitly create the collection
+ ASSERT_OK(status);
+
+ auto indexes = unittest::assertGet(getIndexes(nss));
+ // There should be the index we just added as well as the _id index
+ ASSERT_EQ(2U, indexes.size());
+
+ // Making an identical index should be a no-op.
+ status = _shardLocal->createIndexOnConfig(_txn.get(), nss, BSON("a" << 1 << "b" << 1), true);
+ ASSERT_OK(status);
+ indexes = unittest::assertGet(getIndexes(nss));
+ ASSERT_EQ(2U, indexes.size());
+
+ // Trying to make the same index as non-unique should fail.
+ status = _shardLocal->createIndexOnConfig(_txn.get(), nss, BSON("a" << 1 << "b" << 1), false);
+ ASSERT_EQUALS(ErrorCodes::IndexOptionsConflict, status);
+ indexes = unittest::assertGet(getIndexes(nss));
+ ASSERT_EQ(2U, indexes.size());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index d0f23563803..c28067b3261 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -369,4 +369,11 @@ StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
return response;
}
+Status ShardRemote::createIndexOnConfig(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& keys,
+ bool unique) {
+ MONGO_UNREACHABLE;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h
index 5f19a2c4301..339f0ae1b31 100644
--- a/src/mongo/s/client/shard_remote.h
+++ b/src/mongo/s/client/shard_remote.h
@@ -70,6 +70,11 @@ public:
bool isRetriableError(ErrorCodes::Error code, RetryPolicy options) final;
+ Status createIndexOnConfig(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& keys,
+ bool unique) override;
+
private:
/**
* Returns the metadata that should be used when running commands against this shard with
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index 76c73854ddf..c04d0df2a06 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_settings.h"
@@ -299,9 +300,33 @@ Status ConfigServerTestFixture::insertToConfigCollection(OperationContext* txn,
auto config = getConfigShard();
invariant(config);
- auto insertStatus = config->runCommand(
+ auto insertResponse = config->runCommand(
txn, kReadPref, ns.db().toString(), request.toBSON(), Shard::RetryPolicy::kNoRetry);
- return insertStatus.getStatus();
+
+ BatchedCommandResponse batchResponse;
+ auto status = Shard::CommandResponse::processBatchWriteResponse(insertResponse, &batchResponse);
+ return status;
+}
+
+StatusWith<BSONObj> ConfigServerTestFixture::findOneOnConfigCollection(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& filter) {
+ auto config = getConfigShard();
+ invariant(config);
+
+ auto findStatus = config->exhaustiveFindOnConfig(
+ txn, kReadPref, repl::ReadConcernLevel::kMajorityReadConcern, ns, filter, BSONObj(), 1);
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ auto findResult = findStatus.getValue();
+ if (findResult.docs.empty()) {
+ return Status(ErrorCodes::NoMatchingDocument, "No document found");
+ }
+
+ invariant(findResult.docs.size() == 1);
+ return findResult.docs.front().getOwned();
}
Status ConfigServerTestFixture::setupShards(const std::vector<ShardType>& shards) {
@@ -318,29 +343,41 @@ Status ConfigServerTestFixture::setupShards(const std::vector<ShardType>& shards
StatusWith<ShardType> ConfigServerTestFixture::getShardDoc(OperationContext* txn,
const std::string& shardId) {
- auto config = getConfigShard();
- invariant(config);
-
- NamespaceString ns(ShardType::ConfigNS);
- auto findStatus = config->exhaustiveFindOnConfig(txn,
- kReadPref,
- repl::ReadConcernLevel::kMajorityReadConcern,
- ns,
- BSON(ShardType::name(shardId)),
- BSONObj(),
- boost::none);
- if (!findStatus.isOK()) {
- return findStatus.getStatus();
+ auto doc = findOneOnConfigCollection(
+ txn, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name(shardId)));
+ if (!doc.isOK()) {
+ if (doc.getStatus() == ErrorCodes::NoMatchingDocument) {
+ return {ErrorCodes::ShardNotFound,
+ str::stream() << "shard " << shardId << " does not exist"};
+ }
+ return doc.getStatus();
}
- auto findResult = findStatus.getValue();
- if (findResult.docs.empty()) {
- return {ErrorCodes::ShardNotFound,
- str::stream() << "shard " << shardId << " does not exist"};
+ return ShardType::fromBSON(doc.getValue());
+}
+
+StatusWith<std::vector<BSONObj>> ConfigServerTestFixture::getIndexes(OperationContext* txn,
+ const NamespaceString& ns) {
+ auto configShard = getConfigShard();
+
+ auto response = configShard->runCommand(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ ns.db().toString(),
+ BSON("listIndexes" << ns.coll().toString()),
+ Shard::RetryPolicy::kIdempotent);
+ if (!response.isOK()) {
+ return response.getStatus();
+ }
+ if (!response.getValue().commandStatus.isOK()) {
+ return response.getValue().commandStatus;
}
- invariant(findResult.docs.size() == 1);
- return ShardType::fromBSON(findResult.docs.front());
+ auto cursorResponse = CursorResponse::parseFromBSON(response.getValue().response);
+ if (!cursorResponse.isOK()) {
+ return cursorResponse.getStatus();
+ }
+ return cursorResponse.getValue().getBatch();
}
+
} // namespace mongo
diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h
index 9ad3c2f0de2..7deabf1b323 100644
--- a/src/mongo/s/config_server_test_fixture.h
+++ b/src/mongo/s/config_server_test_fixture.h
@@ -112,6 +112,13 @@ public:
const BSONObj& doc);
/**
+ * Reads a single document from a collection living on the config server.
+ */
+ StatusWith<BSONObj> findOneOnConfigCollection(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& filter);
+
+ /**
* Blocking methods, which receive one message from the network and respond using the
* responses returned from the input function. This is a syntactic sugar for simple,
* single request + response or find tests.
@@ -135,6 +142,11 @@ public:
*/
StatusWith<ShardType> getShardDoc(OperationContext* txn, const std::string& shardId);
+ /**
+ * Returns the indexes definitions defined on a given collection.
+ */
+ StatusWith<std::vector<BSONObj>> getIndexes(OperationContext* txn, const NamespaceString& ns);
+
void setUp() override;
void tearDown() override;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index f0dee91919e..af01e1d2c0e 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -66,17 +66,11 @@
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_lockpings.h"
-#include "mongo/s/catalog/type_locks.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
#include "mongo/s/client/sharding_connection_hook_for_mongos.h"
-#include "mongo/s/cluster_write.h"
#include "mongo/s/commands/request.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
@@ -222,72 +216,6 @@ public:
using namespace mongo;
-static void reloadSettings(OperationContext* txn) {
- Grid::get(txn)->getBalancerConfiguration()->refreshAndCheck(txn);
-
- // Create the config data indexes
- const bool unique = true;
-
- Status result = clusterCreateIndex(
- txn, ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << ChunkType::min() << 1), unique);
- if (!result.isOK()) {
- warning() << "couldn't create ns_1_min_1 index on config db" << causedBy(result);
- }
-
- result = clusterCreateIndex(
- txn,
- ChunkType::ConfigNS,
- BSON(ChunkType::ns() << 1 << ChunkType::shard() << 1 << ChunkType::min() << 1),
- unique);
- if (!result.isOK()) {
- warning() << "couldn't create ns_1_shard_1_min_1 index on config db" << causedBy(result);
- }
-
- result = clusterCreateIndex(txn,
- ChunkType::ConfigNS,
- BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1),
- unique);
- if (!result.isOK()) {
- warning() << "couldn't create ns_1_lastmod_1 index on config db" << causedBy(result);
- }
-
- result = clusterCreateIndex(txn, ShardType::ConfigNS, BSON(ShardType::host() << 1), unique);
- if (!result.isOK()) {
- warning() << "couldn't create host_1 index on config db" << causedBy(result);
- }
-
- result = clusterCreateIndex(txn, LocksType::ConfigNS, BSON(LocksType::lockID() << 1), !unique);
- if (!result.isOK()) {
- warning() << "couldn't create lock id index on config db" << causedBy(result);
- }
-
- result = clusterCreateIndex(txn,
- LocksType::ConfigNS,
- BSON(LocksType::state() << 1 << LocksType::process() << 1),
- !unique);
- if (!result.isOK()) {
- warning() << "couldn't create state and process id index on config db" << causedBy(result);
- }
-
- result =
- clusterCreateIndex(txn, LockpingsType::ConfigNS, BSON(LockpingsType::ping() << 1), !unique);
- if (!result.isOK()) {
- warning() << "couldn't create lockping ping time index on config db" << causedBy(result);
- }
-
- result = clusterCreateIndex(
- txn, TagsType::ConfigNS, BSON(TagsType::ns() << 1 << TagsType::min() << 1), unique);
- if (!result.isOK()) {
- warning() << "could not create index ns_1_min_1: " << causedBy(result);
- }
-
- result = clusterCreateIndex(
- txn, TagsType::ConfigNS, BSON(TagsType::ns() << 1 << TagsType::tag() << 1), !unique);
- if (!result.isOK()) {
- warning() << "could not create index ns_1_tag_1: " << causedBy(result);
- }
-}
-
static Status initializeSharding(OperationContext* txn) {
auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryImpl>();
auto targeterFactoryPtr = targeterFactory.get();
@@ -329,12 +257,6 @@ static Status initializeSharding(OperationContext* txn) {
return status;
}
- auto catalogClient = grid.catalogClient(txn);
- status = catalogClient->initConfigVersion(txn);
- if (!status.isOK()) {
- return status;
- }
-
return Status::OK();
}
@@ -385,7 +307,7 @@ static ExitCode runMongosServer() {
return EXIT_SHARDING_ERROR;
}
- reloadSettings(opCtx.get());
+ Grid::get(opCtx.get())->getBalancerConfiguration()->refreshAndCheck(opCtx.get());
}
#if !defined(_WIN32)