/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include "mongo/platform/basic.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/health_log.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/dbcheck.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/optime.h"
#include "mongo/util/background.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
constexpr uint64_t kBatchDocs = 5'000;
constexpr uint64_t kBatchBytes = 20'000'000;
/**
* All the information needed to run dbCheck on a single collection.
*/
struct DbCheckCollectionInfo {
NamespaceString nss;
BSONKey start;
BSONKey end;
int64_t maxCount;
int64_t maxSize;
int64_t maxRate;
};
/**
* A run of dbCheck consists of a series of collections.
*/
using DbCheckRun = std::vector;
/**
* Check if dbCheck can run on the given namespace.
*/
bool canRunDbCheckOn(const NamespaceString& nss) {
if (nss.isLocal()) {
return false;
}
// TODO: SERVER-30826.
const std::set replicatedSystemCollections{"system.backup_users",
"system.js",
"system.new_users",
"system.roles",
"system.users",
"system.version",
"system.views"};
if (nss.isSystem()) {
if (replicatedSystemCollections.count(nss.coll()) == 0) {
return false;
}
}
return true;
}
std::unique_ptr singleCollectionRun(OperationContext* opCtx,
const std::string& dbName,
const DbCheckSingleInvocation& invocation) {
NamespaceString nss(dbName, invocation.getColl());
AutoGetCollectionForRead agc(opCtx, nss);
uassert(ErrorCodes::NamespaceNotFound,
"Collection " + invocation.getColl() + " not found",
agc.getCollection());
uassert(40619,
"Cannot run dbCheck on " + nss.toString() + " because it is not replicated",
canRunDbCheckOn(nss));
auto start = invocation.getMinKey();
auto end = invocation.getMaxKey();
auto maxCount = invocation.getMaxCount();
auto maxSize = invocation.getMaxSize();
auto maxRate = invocation.getMaxCountPerSecond();
auto info = DbCheckCollectionInfo{nss, start, end, maxCount, maxSize, maxRate};
auto result = stdx::make_unique();
result->push_back(info);
return result;
}
std::unique_ptr fullDatabaseRun(OperationContext* opCtx,
const std::string& dbName,
const DbCheckAllInvocation& invocation) {
uassert(
ErrorCodes::InvalidNamespace, "Cannot run dbCheck on local database", dbName != "local");
// Read the list of collections in a database-level lock.
AutoGetDb agd(opCtx, StringData(dbName), MODE_S);
auto db = agd.getDb();
auto result = stdx::make_unique();
uassert(ErrorCodes::NamespaceNotFound, "Database " + dbName + " not found", agd.getDb());
int64_t max = std::numeric_limits::max();
auto rate = invocation.getMaxCountPerSecond();
for (Collection* coll : *db) {
DbCheckCollectionInfo info{coll->ns(), BSONKey::min(), BSONKey::max(), max, max, rate};
result->push_back(info);
}
return result;
}
/**
* Factory function for producing DbCheckRun's from command objects.
*/
std::unique_ptr getRun(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& obj) {
BSONObjBuilder builder;
// Get rid of generic command fields.
for (const auto& elem : obj) {
if (!CommandHelpers::isGenericArgument(elem.fieldNameStringData())) {
builder.append(elem);
}
}
BSONObj toParse = builder.obj();
// If the dbCheck argument is a string, this is the per-collection form.
if (toParse["dbCheck"].type() == BSONType::String) {
return singleCollectionRun(
opCtx, dbName, DbCheckSingleInvocation::parse(IDLParserErrorContext(""), toParse));
} else {
// Otherwise, it's the database-wide form.
return fullDatabaseRun(
opCtx, dbName, DbCheckAllInvocation::parse(IDLParserErrorContext(""), toParse));
}
}
/**
* The BackgroundJob in which dbCheck actually executes on the primary.
*/
class DbCheckJob : public BackgroundJob {
public:
DbCheckJob(const StringData& dbName, std::unique_ptr run)
: BackgroundJob(true), _done(false), _dbName(dbName.toString()), _run(std::move(run)) {}
protected:
virtual std::string name() const override {
return "dbCheck";
}
virtual void run() override {
// Every dbCheck runs in its own client.
Client::initThread(name());
for (const auto& coll : *_run) {
try {
_doCollection(coll);
} catch (const DBException& e) {
auto logEntry = dbCheckErrorHealthLogEntry(
coll.nss, "dbCheck failed", OplogEntriesEnum::Batch, e.toStatus());
HealthLog::get(Client::getCurrent()->getServiceContext()).log(*logEntry);
return;
}
if (_done) {
log() << "dbCheck terminated due to stepdown";
return;
}
}
}
private:
void _doCollection(const DbCheckCollectionInfo& info) {
// If we can't find the collection, abort the check.
if (!_getCollectionMetadata(info)) {
return;
}
if (_done) {
return;
}
// Parameters for the hasher.
auto start = info.start;
bool reachedEnd = false;
// Make sure the totals over all of our batches don't exceed the provided limits.
int64_t totalBytesSeen = 0;
int64_t totalDocsSeen = 0;
// Limit the rate of the check.
using Clock = stdx::chrono::system_clock;
using TimePoint = stdx::chrono::time_point;
TimePoint lastStart = Clock::now();
int64_t docsInCurrentInterval = 0;
do {
using namespace std::literals::chrono_literals;
if (Clock::now() - lastStart > 1s) {
lastStart = Clock::now();
docsInCurrentInterval = 0;
}
auto result = _runBatch(info, start, kBatchDocs, kBatchBytes);
if (_done) {
return;
}
std::unique_ptr entry;
if (!result.isOK()) {
entry = dbCheckErrorHealthLogEntry(
info.nss, "dbCheck batch failed", OplogEntriesEnum::Batch, result.getStatus());
HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry);
return;
} else {
auto stats = result.getValue();
entry = dbCheckBatchEntry(info.nss,
stats.nDocs,
stats.nBytes,
stats.md5,
stats.md5,
start,
stats.lastKey,
stats.time);
HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry);
}
auto stats = result.getValue();
start = stats.lastKey;
// Update our running totals.
totalDocsSeen += stats.nDocs;
totalBytesSeen += stats.nBytes;
docsInCurrentInterval += stats.nDocs;
// Check if we've exceeded any limits.
bool reachedLast = stats.lastKey >= info.end;
bool tooManyDocs = totalDocsSeen >= info.maxCount;
bool tooManyBytes = totalBytesSeen >= info.maxSize;
reachedEnd = reachedLast || tooManyDocs || tooManyBytes;
if (docsInCurrentInterval > info.maxRate && info.maxRate > 0) {
// If an extremely low max rate has been set (substantially smaller than the batch
// size) we might want to sleep for multiple seconds between batches.
int64_t timesExceeded = docsInCurrentInterval / info.maxRate;
stdx::this_thread::sleep_for(timesExceeded * 1s - (Clock::now() - lastStart));
}
} while (!reachedEnd);
}
/**
* For organizing the results of batches.
*/
struct BatchStats {
int64_t nDocs;
int64_t nBytes;
BSONKey lastKey;
std::string md5;
repl::OpTime time;
};
// Set if the job cannot proceed.
bool _done;
std::string _dbName;
std::unique_ptr _run;
bool _getCollectionMetadata(const DbCheckCollectionInfo& info) {
auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
auto opCtx = uniqueOpCtx.get();
// While we get the prev/next UUID information, we need a database-level lock, plus a global
// IX lock (see SERVER-28544).
AutoGetDbForDbCheck agd(opCtx, info.nss);
if (_stepdownHasOccurred(opCtx, info.nss)) {
_done = true;
return true;
}
auto db = agd.getDb();
if (!db) {
return false;
}
auto collection = db->getCollection(opCtx, info.nss);
if (!collection) {
return false;
}
auto uuid = collection->uuid();
// Check if UUID exists.
if (!uuid) {
return false;
}
auto prev = UUIDCatalog::get(opCtx).prev(_dbName, *uuid);
auto next = UUIDCatalog::get(opCtx).next(_dbName, *uuid);
// Find and report collection metadata.
auto indices = collectionIndexInfo(opCtx, collection);
auto options = collectionOptions(opCtx, collection);
DbCheckOplogCollection entry;
entry.setNss(collection->ns());
entry.setUuid(*collection->uuid());
if (prev) {
entry.setPrev(*prev);
}
if (next) {
entry.setNext(*next);
}
entry.setType(OplogEntriesEnum::Collection);
entry.setIndexes(indices);
entry.setOptions(options);
// Send information on this collection over the oplog for the secondary to check.
auto optime = _logOp(opCtx, collection->ns(), collection->uuid(), entry.toBSON());
DbCheckCollectionInformation collectionInfo;
collectionInfo.collectionName = collection->ns().coll().toString();
collectionInfo.prev = entry.getPrev();
collectionInfo.next = entry.getNext();
collectionInfo.indexes = entry.getIndexes();
collectionInfo.options = entry.getOptions();
auto hle = dbCheckCollectionEntry(
collection->ns(), *collection->uuid(), collectionInfo, collectionInfo, optime);
HealthLog::get(opCtx).log(*hle);
return true;
}
StatusWith _runBatch(const DbCheckCollectionInfo& info,
const BSONKey& first,
int64_t batchDocs,
int64_t batchBytes) {
// New OperationContext for each batch.
auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
auto opCtx = uniqueOpCtx.get();
DbCheckOplogBatch batch;
// Find the relevant collection.
AutoGetCollectionForDbCheck agc(opCtx, info.nss, OplogEntriesEnum::Batch);
if (_stepdownHasOccurred(opCtx, info.nss)) {
_done = true;
return Status(ErrorCodes::PrimarySteppedDown, "dbCheck terminated due to stepdown");
}
auto collection = agc.getCollection();
if (!collection) {
return {ErrorCodes::NamespaceNotFound, "dbCheck collection no longer exists"};
}
boost::optional hasher;
try {
hasher.emplace(opCtx,
collection,
first,
info.end,
std::min(batchDocs, info.maxCount),
std::min(batchBytes, info.maxSize));
} catch (const DBException& e) {
return e.toStatus();
}
Status status = hasher->hashAll();
if (!status.isOK()) {
return status;
}
std::string md5 = hasher->total();
batch.setType(OplogEntriesEnum::Batch);
batch.setNss(info.nss);
batch.setMd5(md5);
batch.setMinKey(first);
batch.setMaxKey(BSONKey(hasher->lastKey()));
BatchStats result;
// Send information on this batch over the oplog.
result.time = _logOp(opCtx, info.nss, collection->uuid(), batch.toBSON());
result.nDocs = hasher->docsSeen();
result.nBytes = hasher->bytesSeen();
result.lastKey = hasher->lastKey();
result.md5 = md5;
return result;
}
/**
* Return `true` iff the primary the check is running on has stepped down.
*/
bool _stepdownHasOccurred(OperationContext* opCtx, const NamespaceString& nss) {
Status status = opCtx->checkForInterruptNoAssert();
if (!status.isOK()) {
return true;
}
auto coord = repl::ReplicationCoordinator::get(opCtx);
if (!coord->canAcceptWritesFor(opCtx, nss)) {
return true;
}
return false;
}
repl::OpTime _logOp(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
const BSONObj& obj) {
return writeConflictRetry(
opCtx, "dbCheck oplog entry", NamespaceString::kRsOplogNamespace.ns(), [&] {
auto const clockSource = opCtx->getServiceContext()->getFastClockSource();
const auto wallClockTime = clockSource->now();
WriteUnitOfWork uow(opCtx);
repl::OpTime result = repl::logOp(opCtx,
"c",
nss,
uuid,
obj,
nullptr,
false,
wallClockTime,
{},
kUninitializedStmtId,
{});
uow.commit();
return result;
});
}
};
/**
* The command, as run on the primary.
*/
class DbCheckCmd : public BasicCommand {
public:
DbCheckCmd() : BasicCommand("dbCheck") {}
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
virtual bool adminOnly() const {
return false;
}
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
std::string help() const override {
return "Validate replica set consistency.\n"
"Invoke with { dbCheck: ,\n"
" minKey: ,\n"
" maxKey: ,\n"
" maxCount: ,\n"
" maxSize: ,\n"
" maxCountPerSecond: } "
"to check a collection.\n"
"Invoke with {dbCheck: 1} to check all collections in the database.";
}
virtual Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) const {
// For now, just use `find` permissions.
const NamespaceString nss(parseNs(dbname, cmdObj));
// First, check that we can read this collection.
Status status = AuthorizationSession::get(client)->checkAuthForFind(nss, false);
if (!status.isOK()) {
return status;
}
// Then check that we can read the health log.
return AuthorizationSession::get(client)->checkAuthForFind(HealthLog::nss, false);
}
virtual bool run(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
auto job = getRun(opCtx, dbname, cmdObj);
try {
(new DbCheckJob(dbname, std::move(job)))->go();
} catch (const DBException& e) {
result.append("ok", false);
result.append("err", e.toString());
return false;
}
result.append("ok", true);
return true;
}
};
MONGO_INITIALIZER(RegisterDbCheckCmd)(InitializerContext* context) {
if (Command::testCommandsEnabled) {
new DbCheckCmd();
}
return Status::OK();
}
} // namespace
}