/** * Copyright (C) 2016 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/base/status_with.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/service_context_noop.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager_mock.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/tick_source_mock.h" namespace mongo { namespace { /** * Initializes the grid object with the bare minimum and is not intended to be functional. */ void initGrid(OperationContext* txn, const ConnectionString& configConnString) { // Set up executor pool used for most operations. auto fixedNet = stdx::make_unique(); auto mockNetwork = fixedNet.get(); auto fixedExec = makeThreadPoolTestExecutor(std::move(fixedNet)); auto netForPool = stdx::make_unique(); auto execForPool = makeThreadPoolTestExecutor(std::move(netForPool)); std::vector> executorsForPool; executorsForPool.emplace_back(std::move(execForPool)); auto executorPool = stdx::make_unique(); executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExec)); executorPool->startup(); auto targeterFactory = stdx::make_unique(); auto targeterFactoryPtr = targeterFactory.get(); auto configTargeter(stdx::make_unique()); configTargeter->setConnectionStringReturnValue(configConnString); targeterFactory->addTargeterToReturn(configConnString, std::move(configTargeter)); ShardFactory::BuilderCallable setBuilder = [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) { return stdx::make_unique( shardId, connStr, targeterFactoryPtr->create(connStr)); }; ShardFactory::BuilderCallable masterBuilder = [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) { return stdx::make_unique( shardId, connStr, targeterFactoryPtr->create(connStr)); }; ShardFactory::BuildersMap buildersMap{ {ConnectionString::SET, std::move(setBuilder)}, {ConnectionString::MASTER, std::move(masterBuilder)}, }; auto shardFactory = stdx::make_unique(std::move(buildersMap), std::move(targeterFactory)); auto shardRegistry(stdx::make_unique(std::move(shardFactory), configConnString)); grid.init( stdx::make_unique(), stdx::make_unique(), std::move(shardRegistry), stdx::make_unique(txn->getServiceContext()->getPreciseClockSource()), stdx::make_unique(ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes), std::move(executorPool), mockNetwork); } class ShardingStateTest : public mongo::unittest::Test { public: void setUp() override { _service.setFastClockSource(stdx::make_unique()); _service.setPreciseClockSource(stdx::make_unique()); _service.setTickSource(stdx::make_unique()); serverGlobalParams.clusterRole = ClusterRole::ShardServer; _client = _service.makeClient("ShardingStateTest"); _opCtx = _client->makeOperationContext(); _shardingState.setGlobalInitMethodForTest([this](const ConnectionString& connStr) { initGrid(_opCtx.get(), connStr); return Status::OK(); }); } void tearDown() override { // ShardingState initialize can modify ReplicaSetMonitor state. ReplicaSetMonitor::cleanup(); // Cleanup only if shard registry was initialized if (grid.shardRegistry()) { grid.getExecutorPool()->shutdownAndJoin(); grid.clearForUnitTests(); } } OperationContext* txn() { return _opCtx.get(); } ShardingState* shardingState() { return &_shardingState; } private: ServiceContextNoop _service; ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; ShardingState _shardingState; }; TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; }); { auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()); ASSERT_EQ(ErrorCodes::ShutdownInProgress, status); } // ShardingState is now in error state, attempting to call it again will still result in error. shardingState()->setGlobalInitMethodForTest( [](const ConnectionString& connStr) { return Status::OK(); }); { auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()); ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status); } ASSERT_FALSE(shardingState()->enabled()); } TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "b:2,c:3", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { auto clusterID = OID::gen(); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("b"); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID()); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } } // unnamed namespace } // namespace mongo