/**
* Copyright (C) 2016 MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/sharding_mongod_test_fixture.h"
namespace mongo {
using executor::RemoteCommandRequest;
namespace {
class ShardingStateTest : public ShardingMongodTestFixture {
public:
ShardingState* shardingState() {
return &_shardingState;
}
std::string shardName() const {
return _shardName.toString();
}
protected:
// Used to write to set up local collections before exercising server logic.
std::unique_ptr _dbDirectClient;
void setUp() override {
_shardName = ShardId("a");
serverGlobalParams.clusterRole = ClusterRole::None;
ShardingMongodTestFixture::setUp();
// When sharding initialization is triggered, initialize sharding state as a shard server.
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
CatalogCacheLoader::set(getServiceContext(),
stdx::make_unique(
stdx::make_unique()));
_shardingState.setGlobalInitMethodForTest([&](OperationContext* opCtx,
const ConnectionString& configConnStr,
StringData distLockProcessId) {
auto status = initializeGlobalShardingStateForMongodForTest(configConnStr);
if (!status.isOK()) {
return status;
}
// Set the ConnectionString return value on the mock targeter so that later calls to
// the targeter's getConnString() return the appropriate value.
auto configTargeter =
RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
configTargeter->setConnectionStringReturnValue(configConnStr);
configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]);
return Status::OK();
});
_dbDirectClient = stdx::make_unique(operationContext());
}
void tearDown() override {
_dbDirectClient.reset();
// Some test cases modify the readOnly value, but the teardown calls below depend on
// readOnly being false, so we reset the value here rather than in setUp().
storageGlobalParams.readOnly = false;
// ShardingState initialize can modify ReplicaSetMonitor state.
ReplicaSetMonitor::cleanup();
CatalogCacheLoader::clearForTests(getServiceContext());
ShardingMongodTestFixture::tearDown();
}
std::unique_ptr makeDistLockManager(
std::unique_ptr distLockCatalog) override {
return stdx::make_unique(nullptr);
}
std::unique_ptr makeShardingCatalogClient(
std::unique_ptr distLockManager) override {
invariant(distLockManager);
return stdx::make_unique(std::move(distLockManager));
}
std::unique_ptr makeCatalogCache() override {
return stdx::make_unique(CatalogCacheLoader::get(getServiceContext()));
}
private:
ShardingState _shardingState;
ShardId _shardName;
};
TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) {
Lock::GlobalWrite lk(operationContext());
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
// Must hold a lock to call initializeFromShardIdentity.
Lock::GlobalWrite lk(operationContext());
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
shardingState()->setGlobalInitMethodForTest(
[](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) {
return Status{ErrorCodes::ShutdownInProgress, "shutting down"};
});
{
auto status =
shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity);
ASSERT_EQ(ErrorCodes::ShutdownInProgress, status);
}
// ShardingState is now in error state, attempting to call it again will still result in error.
shardingState()->setGlobalInitMethodForTest(
[](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) {
return Status::OK();
});
{
auto status =
shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity);
ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status);
}
ASSERT_FALSE(shardingState()->enabled());
}
TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
// Must hold a lock to call initializeFromShardIdentity.
Lock::GlobalWrite lk(operationContext());
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity2.setShardName(shardName());
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
[](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) {
return Status{ErrorCodes::InternalError, "should not reach here"};
});
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
// Must hold a lock to call initializeFromShardIdentity.
Lock::GlobalWrite lk(operationContext());
auto clusterID = OID::gen();
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "b:2,c:3", "config"));
shardIdentity2.setShardName(shardName());
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
[](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) {
return Status{ErrorCodes::InternalError, "should not reach here"};
});
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
// The below tests check for compatible startup parameters for --shardsvr, --overrideShardIdentity,
// and queryableBackup (readOnly) mode.
// readOnly and --shardsvr
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndNoOverrideShardIdentity) {
storageGlobalParams.readOnly = true;
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
serverGlobalParams.overrideShardIdentity = BSONObj();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndInvalidOverrideShardIdentity) {
storageGlobalParams.readOnly = true;
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
serverGlobalParams.overrideShardIdentity = BSON("_id"
<< "shardIdentity"
<< "configsvrConnectionString"
<< "invalid");
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, swShardingInitialized.getStatus().code());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndValidOverrideShardIdentity) {
storageGlobalParams.readOnly = true;
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_OK(swShardingInitialized);
ASSERT_TRUE(swShardingInitialized.getValue());
}
// readOnly and not --shardsvr
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndNoOverrideShardIdentity) {
storageGlobalParams.readOnly = true;
serverGlobalParams.clusterRole = ClusterRole::None;
serverGlobalParams.overrideShardIdentity = BSONObj();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_OK(swShardingInitialized);
ASSERT_FALSE(swShardingInitialized.getValue());
}
TEST_F(
ShardingStateTest,
InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndInvalidOverrideShardIdentity) {
storageGlobalParams.readOnly = true;
serverGlobalParams.clusterRole = ClusterRole::None;
serverGlobalParams.overrideShardIdentity = BSON("_id"
<< "shardIdentity"
<< "configsvrConnectionString"
<< "invalid");
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndValidOverrideShardIdentity) {
storageGlobalParams.readOnly = true;
serverGlobalParams.clusterRole = ClusterRole::None;
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code());
}
// not readOnly and --overrideShardIdentity
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndInvalidOverrideShardIdentity) {
storageGlobalParams.readOnly = false;
serverGlobalParams.overrideShardIdentity = BSON("_id"
<< "shardIdentity"
<< "configsvrConnectionString"
<< "invalid");
// Should error regardless of cluster role.
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code());
serverGlobalParams.clusterRole = ClusterRole::None;
swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndValidOverrideShardIdentity) {
storageGlobalParams.readOnly = false;
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
// Should error regardless of cluster role.
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code());
serverGlobalParams.clusterRole = ClusterRole::None;
swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code());
}
// not readOnly and --shardsvr
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndNoShardIdentity) {
storageGlobalParams.readOnly = false;
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
serverGlobalParams.overrideShardIdentity = BSONObj();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_OK(swShardingInitialized);
ASSERT_FALSE(swShardingInitialized.getValue());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndInvalidShardIdentity) {
ASSERT_OK(replicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY));
// Insert the shardIdentity doc to disk before setting the clusterRole, since if the clusterRole
// is ShardServer, the OpObserver for inserts will prevent the insert from occurring, since the
// shardIdentity doc is invalid.
serverGlobalParams.clusterRole = ClusterRole::None;
BSONObj invalidShardIdentity = BSON("_id"
<< "shardIdentity"
<< "configsvrConnectionString"
<< "invalid");
_dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(),
invalidShardIdentity);
storageGlobalParams.readOnly = false;
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
serverGlobalParams.overrideShardIdentity = BSONObj();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, swShardingInitialized.getStatus().code());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndValidShardIdentity) {
ASSERT_OK(replicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY));
// Insert the shardIdentity doc to disk before setting the clusterRole, since if the clusterRole
// is ShardServer, the OpObserver for inserts will trigger sharding initialization from the
// inserted doc.
serverGlobalParams.clusterRole = ClusterRole::None;
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
BSONObj validShardIdentity = shardIdentity.toBSON();
_dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(),
validShardIdentity);
storageGlobalParams.readOnly = false;
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
serverGlobalParams.overrideShardIdentity = BSONObj();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_OK(swShardingInitialized);
ASSERT_TRUE(swShardingInitialized.getValue());
}
// not readOnly and not --shardsvr
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndNoShardIdentity) {
storageGlobalParams.readOnly = false;
serverGlobalParams.clusterRole = ClusterRole::None;
serverGlobalParams.overrideShardIdentity = BSONObj();
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_OK(swShardingInitialized);
ASSERT_FALSE(swShardingInitialized.getValue());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndInvalidShardIdentity) {
_dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(),
BSON("_id"
<< "shardIdentity"
<< "configsvrConnectionString"
<< "invalid"));
storageGlobalParams.readOnly = false;
serverGlobalParams.clusterRole = ClusterRole::None;
serverGlobalParams.overrideShardIdentity = BSONObj();
// The shardIdentity doc on disk, even if invalid, is ignored if ClusterRole is None.
// This is to allow fixing the shardIdentity doc by starting without --shardsvr.
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_OK(swShardingInitialized);
ASSERT_FALSE(swShardingInitialized.getValue());
}
TEST_F(ShardingStateTest,
InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndValidShardIdentity) {
storageGlobalParams.readOnly = false;
serverGlobalParams.clusterRole = ClusterRole::None;
serverGlobalParams.overrideShardIdentity = BSONObj();
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
BSONObj validShardIdentity = shardIdentity.toBSON();
_dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(),
validShardIdentity);
// The shardIdentity doc on disk is ignored if ClusterRole is None.
auto swShardingInitialized =
shardingState()->initializeShardingAwarenessIfNeeded(operationContext());
ASSERT_OK(swShardingInitialized);
ASSERT_FALSE(swShardingInitialized.getValue());
}
} // namespace
} // namespace mongo