/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/s/config_server_test_fixture.h"
#include
#include
#include "mongo/base/status_with.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/replset_dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
#include "mongo/s/catalog/sharding_catalog_manager_impl.h"
#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_local.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/set_shard_version_request.h"
#include "mongo/s/shard_id.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/tick_source_mock.h"
namespace mongo {
using executor::NetworkInterfaceMock;
using executor::NetworkTestEnv;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using unittest::assertGet;
using std::string;
using std::vector;
using unittest::assertGet;
namespace {
ReadPreferenceSetting kReadPref(ReadPreference::PrimaryOnly);
} // namespace
ConfigServerTestFixture::ConfigServerTestFixture() = default;
ConfigServerTestFixture::~ConfigServerTestFixture() = default;
void ConfigServerTestFixture::setUp() {
ShardingMongodTestFixture::setUp();
// TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it
// actually needs to bypass the op observer.
replicationCoordinator()->alwaysAllowWrites(true);
// Initialize sharding components as a config server.
serverGlobalParams.clusterRole = ClusterRole::ConfigServer;
uassertStatusOK(initializeGlobalShardingStateForMongodForTest(ConnectionString::forLocal()));
}
std::unique_ptr ConfigServerTestFixture::makeDistLockCatalog(
ShardRegistry* shardRegistry) {
invariant(shardRegistry);
return stdx::make_unique(shardRegistry);
}
std::unique_ptr ConfigServerTestFixture::makeDistLockManager(
std::unique_ptr distLockCatalog) {
invariant(distLockCatalog);
return stdx::make_unique(
getServiceContext(),
"distLockProcessId",
std::move(distLockCatalog),
ReplSetDistLockManager::kDistLockPingInterval,
ReplSetDistLockManager::kDistLockExpirationTime);
}
std::unique_ptr ConfigServerTestFixture::makeShardingCatalogClient(
std::unique_ptr distLockManager) {
invariant(distLockManager);
return stdx::make_unique(std::move(distLockManager));
}
std::unique_ptr ConfigServerTestFixture::makeShardingCatalogManager(
ShardingCatalogClient* catalogClient) {
invariant(catalogClient);
// The catalog manager requires a special executor used for operations during addShard.
auto specialNet(stdx::make_unique());
_mockNetworkForAddShard = specialNet.get();
auto specialExec = makeThreadPoolTestExecutor(std::move(specialNet));
_executorForAddShard = specialExec.get();
_addShardNetworkTestEnv =
stdx::make_unique(specialExec.get(), _mockNetworkForAddShard);
return stdx::make_unique(std::move(specialExec));
}
std::unique_ptr ConfigServerTestFixture::makeCatalogCacheLoader() {
return stdx::make_unique();
}
std::unique_ptr ConfigServerTestFixture::makeCatalogCache(
std::unique_ptr catalogCacheLoader) {
invariant(catalogCacheLoader);
return stdx::make_unique(std::move(catalogCacheLoader));
}
std::unique_ptr ConfigServerTestFixture::makeBalancerConfiguration() {
return stdx::make_unique();
}
std::unique_ptr ConfigServerTestFixture::makeClusterCursorManager() {
return stdx::make_unique(getServiceContext()->getPreciseClockSource());
}
executor::NetworkInterfaceMock* ConfigServerTestFixture::networkForAddShard() const {
invariant(_mockNetworkForAddShard);
return _mockNetworkForAddShard;
}
executor::TaskExecutor* ConfigServerTestFixture::executorForAddShard() const {
invariant(_executorForAddShard);
return _executorForAddShard;
}
void ConfigServerTestFixture::onCommandForAddShard(NetworkTestEnv::OnCommandFunction func) {
_addShardNetworkTestEnv->onCommand(func);
}
std::shared_ptr ConfigServerTestFixture::getConfigShard() const {
return shardRegistry()->getConfigShard();
}
Status ConfigServerTestFixture::insertToConfigCollection(OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& doc) {
auto insert(stdx::make_unique());
insert->addToDocuments(doc);
BatchedCommandRequest request(insert.release());
request.setNS(ns);
auto config = getConfigShard();
invariant(config);
auto insertResponse = config->runCommand(opCtx,
kReadPref,
ns.db().toString(),
request.toBSON(),
Shard::kDefaultConfigCommandTimeout,
Shard::RetryPolicy::kNoRetry);
BatchedCommandResponse batchResponse;
auto status = Shard::CommandResponse::processBatchWriteResponse(insertResponse, &batchResponse);
return status;
}
StatusWith ConfigServerTestFixture::findOneOnConfigCollection(OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& filter) {
auto config = getConfigShard();
invariant(config);
auto findStatus = config->exhaustiveFindOnConfig(
opCtx, kReadPref, repl::ReadConcernLevel::kMajorityReadConcern, ns, filter, BSONObj(), 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
auto findResult = findStatus.getValue();
if (findResult.docs.empty()) {
return Status(ErrorCodes::NoMatchingDocument, "No document found");
}
invariant(findResult.docs.size() == 1);
return findResult.docs.front().getOwned();
}
Status ConfigServerTestFixture::setupShards(const std::vector& shards) {
const NamespaceString shardNS(ShardType::ConfigNS);
for (const auto& shard : shards) {
auto insertStatus = insertToConfigCollection(operationContext(), shardNS, shard.toBSON());
if (!insertStatus.isOK()) {
return insertStatus;
}
}
return Status::OK();
}
StatusWith ConfigServerTestFixture::getShardDoc(OperationContext* opCtx,
const std::string& shardId) {
auto doc = findOneOnConfigCollection(
opCtx, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name(shardId)));
if (!doc.isOK()) {
if (doc.getStatus() == ErrorCodes::NoMatchingDocument) {
return {ErrorCodes::ShardNotFound,
str::stream() << "shard " << shardId << " does not exist"};
}
return doc.getStatus();
}
return ShardType::fromBSON(doc.getValue());
}
Status ConfigServerTestFixture::setupChunks(const std::vector& chunks) {
const NamespaceString chunkNS(ChunkType::ConfigNS);
for (const auto& chunk : chunks) {
auto insertStatus =
insertToConfigCollection(operationContext(), chunkNS, chunk.toConfigBSON());
if (!insertStatus.isOK())
return insertStatus;
}
return Status::OK();
}
StatusWith ConfigServerTestFixture::getChunkDoc(OperationContext* opCtx,
const BSONObj& minKey) {
auto doc = findOneOnConfigCollection(
opCtx, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::min() << minKey));
if (!doc.isOK())
return doc.getStatus();
return ChunkType::fromConfigBSON(doc.getValue());
}
void ConfigServerTestFixture::setupDatabase(const std::string& dbName,
const ShardId primaryShard,
const bool sharded) {
DatabaseType db;
db.setName(dbName);
db.setPrimary(primaryShard);
db.setSharded(sharded);
ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(),
DatabaseType::ConfigNS,
db.toBSON(),
ShardingCatalogClient::kMajorityWriteConcern));
}
StatusWith> ConfigServerTestFixture::getIndexes(OperationContext* opCtx,
const NamespaceString& ns) {
auto configShard = getConfigShard();
auto response = configShard->runCommand(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
ns.db().toString(),
BSON("listIndexes" << ns.coll().toString()),
Shard::kDefaultConfigCommandTimeout,
Shard::RetryPolicy::kIdempotent);
if (!response.isOK()) {
return response.getStatus();
}
if (!response.getValue().commandStatus.isOK()) {
return response.getValue().commandStatus;
}
auto cursorResponse = CursorResponse::parseFromBSON(response.getValue().response);
if (!cursorResponse.isOK()) {
return cursorResponse.getStatus();
}
return cursorResponse.getValue().getBatch();
}
std::vector ConfigServerTestFixture::getKeys(OperationContext* opCtx) {
auto config = getConfigShard();
auto findStatus =
config->exhaustiveFindOnConfig(opCtx,
kReadPref,
repl::ReadConcernLevel::kMajorityReadConcern,
NamespaceString(KeysCollectionDocument::ConfigNS),
BSONObj(),
BSON("expiresAt" << 1),
boost::none);
ASSERT_OK(findStatus.getStatus());
std::vector keys;
const auto& docs = findStatus.getValue().docs;
for (const auto& doc : docs) {
auto keyStatus = KeysCollectionDocument::fromBSON(doc);
ASSERT_OK(keyStatus.getStatus());
keys.push_back(keyStatus.getValue());
}
return keys;
}
void ConfigServerTestFixture::expectSetShardVersion(
const HostAndPort& expectedHost,
const ShardType& expectedShard,
const NamespaceString& expectedNs,
boost::optional expectedChunkVersion) {
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(expectedHost, request.target);
ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(),
rpc::TrackingMetadata::removeTrackingData(request.metadata));
SetShardVersionRequest ssv =
assertGet(SetShardVersionRequest::parseFromBSON(request.cmdObj));
ASSERT(!ssv.isInit());
ASSERT(ssv.isAuthoritative());
ASSERT_EQ(expectedShard.getHost(), ssv.getShardConnectionString().toString());
ASSERT_EQ(expectedNs.toString(), ssv.getNS().ns());
if (expectedChunkVersion) {
ASSERT_EQ(*expectedChunkVersion, ssv.getNSVersion());
}
return BSON("ok" << true);
});
}
} // namespace mongo