/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/client/connection_string.h"
#include "mongo/db/repl/optime.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
class NamespaceString;
struct ReadPreferenceSetting;
class VersionType;
namespace executor {
class TaskExecutor;
} // namespace executor
/**
* Implements the catalog manager for talking to replica set config servers.
*/
class CatalogManagerReplicaSet final : public CatalogManager {
public:
CatalogManagerReplicaSet(std::unique_ptr distLockManager,
std::unique_ptr addShardExecutor);
virtual ~CatalogManagerReplicaSet();
/**
* Safe to call multiple times as long as the calls are externally synchronized to be
* non-overlapping.
*/
Status startup() override;
void shutDown(OperationContext* txn) override;
Status enableSharding(OperationContext* txn, const std::string& dbName) override;
StatusWith addShard(OperationContext* txn,
const std::string* shardProposedName,
const ConnectionString& shardConnectionString,
const long long maxSize) override;
Status updateDatabase(OperationContext* txn,
const std::string& dbName,
const DatabaseType& db) override;
Status updateCollection(OperationContext* txn,
const std::string& collNs,
const CollectionType& coll) override;
Status createDatabase(OperationContext* txn, const std::string& dbName) override;
Status logAction(OperationContext* txn,
const std::string& what,
const std::string& ns,
const BSONObj& detail) override;
Status logChange(OperationContext* txn,
const std::string& what,
const std::string& ns,
const BSONObj& detail) override;
StatusWith distLock(
OperationContext* txn,
StringData name,
StringData whyMessage,
stdx::chrono::milliseconds waitFor) override;
Status shardCollection(OperationContext* txn,
const std::string& ns,
const ShardKeyPattern& fieldsAndOrder,
bool unique,
const std::vector& initPoints,
const std::set& initShardsIds) override;
StatusWith removeShard(OperationContext* txn,
const std::string& name) override;
StatusWith> getDatabase(OperationContext* txn,
const std::string& dbName) override;
StatusWith> getCollection(OperationContext* txn,
const std::string& collNs) override;
Status getCollections(OperationContext* txn,
const std::string* dbName,
std::vector* collections,
repl::OpTime* optime) override;
Status dropCollection(OperationContext* txn, const NamespaceString& ns) override;
Status getDatabasesForShard(OperationContext* txn,
const std::string& shardName,
std::vector* dbs) override;
Status getChunks(OperationContext* txn,
const BSONObj& query,
const BSONObj& sort,
boost::optional limit,
std::vector* chunks,
repl::OpTime* opTime) override;
Status getTagsForCollection(OperationContext* txn,
const std::string& collectionNs,
std::vector* tags) override;
StatusWith getTagForChunk(OperationContext* txn,
const std::string& collectionNs,
const ChunkType& chunk) override;
StatusWith>> getAllShards(
OperationContext* txn) override;
bool runUserManagementWriteCommand(OperationContext* txn,
const std::string& commandName,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder* result) override;
bool runUserManagementReadCommand(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder* result) override;
Status applyChunkOpsDeprecated(OperationContext* txn,
const BSONArray& updateOps,
const BSONArray& preCondition,
const std::string& nss,
const ChunkVersion& lastChunkVersion) override;
StatusWith getGlobalSettings(OperationContext* txn, StringData key) override;
void writeConfigServerDirect(OperationContext* txn,
const BatchedCommandRequest& request,
BatchedCommandResponse* response) override;
Status insertConfigDocument(OperationContext* txn,
const std::string& ns,
const BSONObj& doc) override;
StatusWith updateConfigDocument(OperationContext* txn,
const std::string& ns,
const BSONObj& query,
const BSONObj& update,
bool upsert) override;
Status removeConfigDocuments(OperationContext* txn,
const std::string& ns,
const BSONObj& query) override;
DistLockManager* getDistLockManager() override;
Status initConfigVersion(OperationContext* txn) override;
Status appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) override;
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
/**
* Runs a read command against the config server with majority read concern.
*/
bool runReadCommandForTest(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder* result);
private:
/**
* Selects an optimal shard on which to place a newly created database from the set of
* available shards. Will return ShardNotFound if shard could not be found.
*/
static StatusWith _selectShardForNewDatabase(OperationContext* txn,
ShardRegistry* shardRegistry);
/**
* Checks that the given database name doesn't already exist in the config.databases
* collection, including under different casing. Optional db can be passed and will
* be set with the database details if the given dbName exists.
*
* Returns OK status if the db does not exist.
* Some known errors include:
* NamespaceExists if it exists with the same casing
* DatabaseDifferCase if it exists under different casing.
*/
Status _checkDbDoesNotExist(OperationContext* txn, const std::string& dbName, DatabaseType* db);
/**
* Generates a unique name to be given to a newly added shard.
*/
StatusWith _generateNewShardName(OperationContext* txn);
/**
* Validates that the specified connection string can serve as a shard server. In particular,
* this function checks that the shard can be contacted, that it is not already member of
* another sharded cluster and etc.
*
* @param shardRegistry Shard registry to use for getting a targeter to the shard-to-be.
* @param connectionString Connection string to be attempted as a shard host.
* @param shardProposedName Optional proposed name for the shard. Can be omitted in which case
* a unique name for the shard will be generated from the shard's connection string. If it
* is not omitted, the value cannot be the empty string.
*
* On success returns a partially initialized ShardType object corresponding to the requested
* shard. It will have the hostName field set and optionally the name, if the name could be
* generated from either the proposed name or the connection string set name. The returned
* shard's name should be checked and if empty, one should be generated using some uniform
* algorithm.
*/
StatusWith _validateHostAsShard(OperationContext* txn,
ShardRegistry* shardRegistry,
const ConnectionString& connectionString,
const std::string* shardProposedName);
/**
* Runs the listDatabases command on the specified host and returns the names of all databases
* it returns excluding those named local and admin, since they serve administrative purpose.
*/
StatusWith> _getDBNamesListFromShard(
OperationContext* txn,
ShardRegistry* shardRegistry,
const ConnectionString& connectionString);
/**
* Creates the specified collection name in the config database.
*/
Status _createCappedConfigCollection(OperationContext* txn,
StringData collName,
int cappedSize);
/**
* Executes the specified batch write command on the current config server's primary and retries
* on the specified set of errors using the default retry policy.
*/
void _runBatchWriteCommand(OperationContext* txn,
const BatchedCommandRequest& request,
BatchedCommandResponse* response,
Shard::RetryPolicy retryPolicy);
/**
* Helper method for running a count command against the config server with appropriate
* error handling.
*/
StatusWith _runCountCommandOnConfig(OperationContext* txn,
const NamespaceString& ns,
BSONObj query);
/**
* Runs a command against a "shard" that is not yet in the cluster and thus not present in the
* ShardRegistry.
*/
StatusWith _runCommandForAddShard(OperationContext* txn,
RemoteCommandTargeter* targeter,
const std::string& dbName,
const BSONObj& cmdObj);
StatusWith>> _exhaustiveFindOnConfig(
OperationContext* txn,
const ReadPreferenceSetting& readPref,
const NamespaceString& nss,
const BSONObj& query,
const BSONObj& sort,
boost::optional limit);
/**
* Appends a read committed read concern to the request object.
*/
void _appendReadConcern(BSONObjBuilder* builder);
/**
* Returns the current cluster schema/protocol version.
*/
StatusWith _getConfigVersion(OperationContext* txn);
/**
* Queries the config servers for the database metadata for the given database, using the
* given read preference. Returns NamespaceNotFound if no database metadata is found.
*/
StatusWith> _fetchDatabaseMetadata(
OperationContext* txn, const std::string& dbName, const ReadPreferenceSetting& readPref);
/**
* Best effort method, which logs diagnostic events on the config server. If the config server
* write fails for any reason a warning will be written to the local service log and the method
* will return a failed status.
*
* @param txn Operation context in which the call is running
* @param logCollName Which config collection to write to (excluding the database name)
* @param what E.g. "split", "migrate" (not interpreted)
* @param operationNS To which collection the metadata change is being applied (not interpreted)
* @param detail Additional info about the metadata change (not interpreted)
*/
Status _log(OperationContext* txn,
const StringData& logCollName,
const std::string& what,
const std::string& operationNS,
const BSONObj& detail);
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
// (M) Must hold _mutex for access.
// (R) Read only, can only be written during initialization.
// (S) Self-synchronizing; access in any way from any context.
//
stdx::mutex _mutex;
// Distributed lock manager singleton.
std::unique_ptr _distLockManager; // (R)
// Executor specifically used for sending commands to servers that are in the process of being
// added as shards. Does not have any connection hook set on it, thus it can be used to talk
// to servers that are not yet in the ShardRegistry.
std::unique_ptr _executorForAddShard; // (R)
// True if shutDown() has been called. False, otherwise.
bool _inShutdown = false; // (M)
// True if startup() has been called.
bool _started = false; // (M)
// Whether the logAction call should attempt to create the actionlog collection
AtomicInt32 _actionLogCollectionCreated{0}; // (S)
// Whether the logChange call should attempt to create the changelog collection
AtomicInt32 _changeLogCollectionCreated{0}; // (S)
};
} // namespace mongo