/** * Copyright (C) 2016 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/config_server_op_observer.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/shard_server_op_observer.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" #include "mongo/db/storage/storage_options.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/sharding_mongod_test_fixture.h" namespace mongo { namespace { using executor::RemoteCommandRequest; const std::string kShardName("a"); class ShardingStateTest : public ShardingMongodTestFixture { protected: // Used to write to set up local collections before exercising server logic. std::unique_ptr _dbDirectClient; void setUp() override { serverGlobalParams.clusterRole = ClusterRole::None; ShardingMongodTestFixture::setUp(); // When sharding initialization is triggered, initialize sharding state as a shard server. serverGlobalParams.clusterRole = ClusterRole::ShardServer; CatalogCacheLoader::set(getServiceContext(), stdx::make_unique( stdx::make_unique())); _shardingState.setGlobalInitMethodForTest([&](OperationContext* opCtx, const ConnectionString& configConnStr, StringData distLockProcessId) { auto status = initializeGlobalShardingStateForMongodForTest(configConnStr); if (!status.isOK()) { return status; } // Set the ConnectionString return value on the mock targeter so that later calls to the // targeter's getConnString() return the appropriate value auto configTargeter = RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); configTargeter->setConnectionStringReturnValue(configConnStr); configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]); return Status::OK(); }); _dbDirectClient = stdx::make_unique(operationContext()); } void tearDown() override { _dbDirectClient.reset(); // Restore the defaults before calling tearDown storageGlobalParams.readOnly = false; serverGlobalParams.overrideShardIdentity = BSONObj(); CatalogCacheLoader::clearForTests(getServiceContext()); ShardingMongodTestFixture::tearDown(); } std::unique_ptr makeDistLockManager( std::unique_ptr distLockCatalog) override { return stdx::make_unique(nullptr); } std::unique_ptr makeShardingCatalogClient( std::unique_ptr distLockManager) override { invariant(distLockManager); return stdx::make_unique(std::move(distLockManager)); } ShardingState* shardingState() { return &_shardingState; } private: ShardingState _shardingState; }; /** * This class emulates the server being started as a standalone node for the scope for which it is * used */ class ScopedSetStandaloneMode { public: ScopedSetStandaloneMode(ServiceContext* serviceContext) : _serviceContext(serviceContext) { serverGlobalParams.clusterRole = ClusterRole::None; _serviceContext->setOpObserver(stdx::make_unique()); } ~ScopedSetStandaloneMode() { serverGlobalParams.clusterRole = ClusterRole::ShardServer; auto makeOpObserver = [&] { auto opObserver = stdx::make_unique(); opObserver->addObserver(stdx::make_unique()); opObserver->addObserver(stdx::make_unique()); opObserver->addObserver(stdx::make_unique()); return opObserver; }; _serviceContext->setOpObserver(makeOpObserver()); } private: ServiceContext* const _serviceContext; }; TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ(kShardName, shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest( [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; }); { auto status = shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity); ASSERT_EQ(ErrorCodes::ShutdownInProgress, status); } // ShardingState is now in error state, attempting to call it again will still result in error. shardingState()->setGlobalInitMethodForTest( [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status::OK(); }); { auto status = shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity); ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status); } ASSERT_FALSE(shardingState()->enabled()); } TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName(kShardName); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ(kShardName, shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "b:2,c:3", "config")); shardIdentity2.setShardName(kShardName); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ(kShardName, shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } // The tests below check for different combinations of the compatible startup parameters for // --shardsvr, --overrideShardIdentity, and queryableBackup (readOnly) mode // readOnly and --shardsvr TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndNoOverrideShardIdentity) { storageGlobalParams.readOnly = true; auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndInvalidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << ShardIdentity::kShardNameFieldName << kShardName << ShardIdentity::kClusterIdFieldName << OID::gen() << ShardIdentity::kConfigsvrConnectionStringFieldName << "invalid"); auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, swShardingInitialized.getStatus().code()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndValidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::ShardServer; ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toShardIdentityDocument(); auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); ASSERT_TRUE(swShardingInitialized.getValue()); } // readOnly and not --shardsvr TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndNoOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); ASSERT_FALSE(swShardingInitialized.getValue()); } TEST_F( ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndInvalidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << "configsvrConnectionString" << "invalid"); auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndValidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toShardIdentityDocument(); auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); } // not readOnly and --overrideShardIdentity TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndInvalidOverrideShardIdentity) { serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << "configsvrConnectionString" << "invalid"); // Should error regardless of cluster role. serverGlobalParams.clusterRole = ClusterRole::ShardServer; auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); serverGlobalParams.clusterRole = ClusterRole::None; swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndValidOverrideShardIdentity) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toShardIdentityDocument(); // Should error regardless of cluster role. serverGlobalParams.clusterRole = ClusterRole::ShardServer; auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); serverGlobalParams.clusterRole = ClusterRole::None; swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); } // not readOnly and --shardsvr TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndNoShardIdentity) { auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); ASSERT_FALSE(swShardingInitialized.getValue()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndInvalidShardIdentity) { // Insert the shardIdentity doc to disk while pretending that we are in "standalone" mode, // otherwise OpObserver for inserts will prevent the insert from occurring because the // shardIdentity doc is invalid { ScopedSetStandaloneMode standalone(getServiceContext()); BSONObj invalidShardIdentity = BSON("_id" << "shardIdentity" << ShardIdentity::kShardNameFieldName << kShardName << ShardIdentity::kClusterIdFieldName << OID::gen() << ShardIdentity::kConfigsvrConnectionStringFieldName << "invalid"); _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), invalidShardIdentity); } auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, swShardingInitialized.getStatus().code()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndValidShardIdentity) { // Insert the shardIdentity doc to disk while pretending that we are in "standalone" mode, // otherwise OpObserver for inserts will prevent the insert from occurring because the // shardIdentity doc is invalid { ScopedSetStandaloneMode standalone(getServiceContext()); BSONObj validShardIdentity = [&] { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); return shardIdentity.toShardIdentityDocument(); }(); _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), validShardIdentity); } auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); ASSERT_TRUE(swShardingInitialized.getValue()); } // not readOnly and not --shardsvr TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndNoShardIdentity) { ScopedSetStandaloneMode standalone(getServiceContext()); auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); ASSERT_FALSE(swShardingInitialized.getValue()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndInvalidShardIdentity) { ScopedSetStandaloneMode standalone(getServiceContext()); _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), BSON("_id" << "shardIdentity" << "configsvrConnectionString" << "invalid")); // The shardIdentity doc on disk, even if invalid, is ignored if the ClusterRole is None. This // is to allow fixing the shardIdentity doc by starting without --shardsvr. auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); ASSERT_FALSE(swShardingInitialized.getValue()); } TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndValidShardIdentity) { ScopedSetStandaloneMode standalone(getServiceContext()); BSONObj validShardIdentity = [&] { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnectionString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); return shardIdentity.toShardIdentityDocument(); }(); _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), validShardIdentity); // The shardIdentity doc on disk is ignored if ClusterRole is None. auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); ASSERT_FALSE(swShardingInitialized.getValue()); } } // namespace } // namespace mongo