summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2021-12-16 17:54:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-17 16:00:11 +0000
commitda10567348cf7883e2ec40a441e38efc17c23385 (patch)
tree3824041cfc93a241551b4ce32d41239ab2903816 /src
parent1027d63b8bd49844e9aa91c25c126703bdb9ae0b (diff)
downloadmongo-da10567348cf7883e2ec40a441e38efc17c23385.tar.gz
SERVER-61754 dbCheck does not hold strong locks during batches
The dbCheck command accepts an optional (default true) parameter, 'snapshotRead', that uses point-in-time reads to check each batch. When true, dbCheck does not block writes. When set to false, dbCheck reverts to blocking writes.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/dbcheck.cpp101
-rw-r--r--src/mongo/db/concurrency/deferred_writer.cpp26
-rw-r--r--src/mongo/db/concurrency/deferred_writer.h2
-rw-r--r--src/mongo/db/repl/dbcheck.cpp116
-rw-r--r--src/mongo/db/repl/dbcheck.h4
-rw-r--r--src/mongo/db/repl/dbcheck.idl9
6 files changed, 156 insertions, 102 deletions
diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp
index e2ddda90c87..4f4f9b2de6f 100644
--- a/src/mongo/db/commands/dbcheck.cpp
+++ b/src/mongo/db/commands/dbcheck.cpp
@@ -68,6 +68,7 @@ struct DbCheckCollectionInfo {
int64_t maxDocsPerBatch;
int64_t maxBytesPerBatch;
int64_t maxBatchTimeMillis;
+ bool snapshotRead;
};
/**
@@ -105,7 +106,8 @@ std::unique_ptr<DbCheckRun> singleCollectionRun(OperationContext* opCtx,
maxRate,
maxDocsPerBatch,
maxBytesPerBatch,
- maxBatchTimeMillis};
+ maxBatchTimeMillis,
+ invocation.getSnapshotRead()};
auto result = std::make_unique<DbCheckRun>();
result->push_back(info);
return result;
@@ -138,7 +140,8 @@ std::unique_ptr<DbCheckRun> fullDatabaseRun(OperationContext* opCtx,
rate,
maxDocsPerBatch,
maxBytesPerBatch,
- maxBatchTimeMillis};
+ maxBatchTimeMillis,
+ invocation.getSnapshotRead()};
result->push_back(info);
return true;
};
@@ -253,25 +256,37 @@ private:
return;
}
- std::unique_ptr<HealthLogEntry> entry;
-
if (!result.isOK()) {
- auto code = result.getStatus().code();
+ bool retryable = false;
+ std::unique_ptr<HealthLogEntry> entry;
+
+ const auto code = result.getStatus().code();
if (code == ErrorCodes::LockTimeout) {
+ retryable = true;
entry = dbCheckWarningHealthLogEntry(
info.nss,
"retrying dbCheck batch after timeout due to lock unavailability",
OplogEntriesEnum::Batch,
result.getStatus());
- HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry);
- continue;
- }
- if (code == ErrorCodes::NamespaceNotFound) {
+ } else if (code == ErrorCodes::SnapshotUnavailable) {
+ retryable = true;
+ entry = dbCheckWarningHealthLogEntry(
+ info.nss,
+ "retrying dbCheck batch after conflict with pending catalog operation",
+ OplogEntriesEnum::Batch,
+ result.getStatus());
+ } else if (code == ErrorCodes::NamespaceNotFound) {
entry = dbCheckWarningHealthLogEntry(
info.nss,
"abandoning dbCheck batch because collection no longer exists",
OplogEntriesEnum::Batch,
result.getStatus());
+ } else if (code == ErrorCodes::IndexNotFound) {
+ entry = dbCheckWarningHealthLogEntry(
+ info.nss,
+ "skipping dbCheck on collection because it is missing an _id index",
+ OplogEntriesEnum::Batch,
+ result.getStatus());
} else if (ErrorCodes::isA<ErrorCategory::NotPrimaryError>(code)) {
entry = dbCheckWarningHealthLogEntry(
info.nss,
@@ -285,21 +300,23 @@ private:
result.getStatus());
}
HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry);
+ if (retryable) {
+ continue;
+ }
return;
- } else {
- auto stats = result.getValue();
- entry = dbCheckBatchEntry(info.nss,
- stats.nDocs,
- stats.nBytes,
- stats.md5,
- stats.md5,
- start,
- stats.lastKey,
- stats.time);
- HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry);
}
auto stats = result.getValue();
+ auto entry = dbCheckBatchEntry(info.nss,
+ stats.nDocs,
+ stats.nBytes,
+ stats.md5,
+ stats.md5,
+ start,
+ stats.lastKey,
+ stats.readTimestamp,
+ stats.time);
+ HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry);
start = stats.lastKey;
@@ -333,6 +350,7 @@ private:
BSONKey lastKey;
std::string md5;
repl::OpTime time;
+ boost::optional<Timestamp> readTimestamp;
};
// Set if the job cannot proceed.
@@ -348,6 +366,17 @@ private:
auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
auto opCtx = uniqueOpCtx.get();
+ auto lockMode = MODE_S;
+ if (info.snapshotRead) {
+ // Each batch will read at the latest no-overlap point, which is the all_durable
+ // timestamp on primaries. We assume that the history window on secondaries is always
+ // longer than the time it takes between starting and replicating a batch on the
+ // primary. Otherwise, the readTimestamp will not be available on a secondary by the
+ // time it processes the oplog entry.
+ lockMode = MODE_IS;
+ opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap);
+ }
+
BatchStats result;
auto timeoutMs = Milliseconds(gDbCheckCollectionTryLockTimeoutMillis.load());
const auto initialBackoffMs =
@@ -355,14 +384,14 @@ private:
auto backoffMs = initialBackoffMs;
for (int attempt = 1;; attempt++) {
try {
- // Try to acquire collection lock in S mode with increasing timeout and bounded
- // exponential backoff.
+ // Try to acquire collection lock with increasing timeout and bounded exponential
+ // backoff.
auto const lockDeadline = Date_t::now() + timeoutMs;
timeoutMs *= 2;
AutoGetCollection agc(opCtx,
info.nss,
- MODE_S,
+ lockMode,
AutoGetCollectionViewMode::kViewsForbidden,
lockDeadline);
@@ -376,15 +405,18 @@ private:
CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, info.nss);
if (!collection) {
const auto msg = "Collection under dbCheck no longer exists";
- auto entry = dbCheckHealthLogEntry(info.nss,
- SeverityEnum::Info,
- "dbCheck failed",
- OplogEntriesEnum::Batch,
- BSON("success" << false << "error" << msg));
- HealthLog::get(opCtx).log(*entry);
return {ErrorCodes::NamespaceNotFound, msg};
}
+ auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
+ auto minVisible = collection->getMinimumVisibleSnapshot();
+ if (readTimestamp && minVisible &&
+ *readTimestamp < *collection->getMinimumVisibleSnapshot()) {
+ return {ErrorCodes::SnapshotUnavailable,
+ str::stream() << "Unable to read from collection " << info.nss
+ << " due to pending catalog changes"};
+ }
+
boost::optional<DbCheckHasher> hasher;
try {
hasher.emplace(opCtx,
@@ -412,9 +444,11 @@ private:
batch.setMd5(md5);
batch.setMinKey(first);
batch.setMaxKey(BSONKey(hasher->lastKey()));
+ batch.setReadTimestamp(readTimestamp);
// Send information on this batch over the oplog.
result.time = _logOp(opCtx, info.nss, collection->uuid(), batch.toBSON());
+ result.readTimestamp = readTimestamp;
result.nDocs = hasher->docsSeen();
result.nBytes = hasher->bytesSeen();
@@ -526,10 +560,11 @@ public:
" maxKey: <last key, inclusive>,\n"
" maxCount: <max number of docs>,\n"
" maxSize: <max size of docs>,\n"
- " maxCountPerSecond: <max rate in docs/sec> } "
- " maxDocsPerBatch: <max number of docs/batch> } "
- " maxBytesPerBatch: <try to keep a batch within max bytes/batch> } "
- " maxBatchTimeMillis: <max time processing a batch in milliseconds> } "
+ " maxCountPerSecond: <max rate in docs/sec>\n"
+ " maxDocsPerBatch: <max number of docs/batch>\n"
+ " maxBytesPerBatch: <try to keep a batch within max bytes/batch>\n"
+ " maxBatchTimeMillis: <max time processing a batch in milliseconds>\n"
+ " readTimestamp: <bool, read at a timestamp without strong locks> }\n"
"to check a collection.\n"
"Invoke with {dbCheck: 1} to check all collections in the database.";
}
diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp
index be9facf84ec..9c6abe5ff47 100644
--- a/src/mongo/db/concurrency/deferred_writer.cpp
+++ b/src/mongo/db/concurrency/deferred_writer.cpp
@@ -48,7 +48,6 @@ auto kLogInterval = stdx::chrono::minutes(1);
void DeferredWriter::_logFailure(const Status& status) {
if (TimePoint::clock::now() - _lastLogged > kLogInterval) {
LOGV2(20516,
- "Unable to write to collection {namespace}: {error}",
"Unable to write to collection",
"namespace"_attr = _nss.toString(),
"error"_attr = status);
@@ -100,14 +99,13 @@ StatusWith<std::unique_ptr<AutoGetCollection>> DeferredWriter::_getCollection(
return std::move(agc);
}
-void DeferredWriter::_worker(InsertStatement stmt) try {
+Status DeferredWriter::_worker(InsertStatement stmt) noexcept try {
auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
OperationContext* opCtx = uniqueOpCtx.get();
auto result = _getCollection(opCtx);
if (!result.isOK()) {
- _logFailure(result.getStatus());
- return;
+ return result.getStatus();
}
auto agc = std::move(result.getValue());
@@ -128,13 +126,9 @@ void DeferredWriter::_worker(InsertStatement stmt) try {
stdx::lock_guard<Latch> lock(_mutex);
_numBytes -= stmt.doc.objsize();
-
- // If a write to a deferred collection fails, periodically tell the log.
- if (!status.isOK()) {
- _logFailure(status);
- }
+ return status;
} catch (const DBException& e) {
- _logFailure(e.toStatus());
+ return e.toStatus();
}
DeferredWriter::DeferredWriter(NamespaceString nss, CollectionOptions opts, int64_t maxSize)
@@ -155,7 +149,12 @@ void DeferredWriter::startup(std::string workerName) {
options.threadNamePrefix = workerName;
options.minThreads = 0;
options.maxThreads = 1;
- options.onCreateThread = [](const std::string& name) { Client::initThread(name); };
+ options.onCreateThread = [](const std::string& name) {
+ Client::initThread(name);
+
+ stdx::lock_guard<Client> lk(cc());
+ cc().setSystemOperationKillableByStepdown(lk);
+ };
_pool = std::make_unique<ThreadPool>(options);
_pool->startup();
}
@@ -190,7 +189,10 @@ bool DeferredWriter::insertDocument(BSONObj obj) {
_pool->schedule([this, obj](auto status) {
fassert(40588, status);
- _worker(InsertStatement(obj.getOwned()));
+ auto workerStatus = _worker(InsertStatement(obj.getOwned()));
+ if (!workerStatus.isOK()) {
+ _logFailure(workerStatus);
+ }
});
return true;
}
diff --git a/src/mongo/db/concurrency/deferred_writer.h b/src/mongo/db/concurrency/deferred_writer.h
index 0ac8238fa8d..1f45fbb8d15 100644
--- a/src/mongo/db/concurrency/deferred_writer.h
+++ b/src/mongo/db/concurrency/deferred_writer.h
@@ -136,7 +136,7 @@ private:
/**
* The method that the worker thread will run.
*/
- void _worker(InsertStatement stmt);
+ Status _worker(InsertStatement stmt) noexcept;
/**
* The options for the collection, in case we need to create it.
diff --git a/src/mongo/db/repl/dbcheck.cpp b/src/mongo/db/repl/dbcheck.cpp
index 87cebb7b4bf..028df1c3469 100644
--- a/src/mongo/db/repl/dbcheck.cpp
+++ b/src/mongo/db/repl/dbcheck.cpp
@@ -141,12 +141,14 @@ std::unique_ptr<HealthLogEntry> dbCheckHealthLogEntry(const NamespaceString& nss
std::unique_ptr<HealthLogEntry> dbCheckErrorHealthLogEntry(const NamespaceString& nss,
const std::string& msg,
OplogEntriesEnum operation,
- const Status& err) {
- return dbCheckHealthLogEntry(nss,
- SeverityEnum::Error,
- msg,
- operation,
- BSON("success" << false << "error" << err.toString()));
+ const Status& err,
+ const BSONObj& context) {
+ return dbCheckHealthLogEntry(
+ nss,
+ SeverityEnum::Error,
+ msg,
+ operation,
+ BSON("success" << false << "error" << err.toString() << "context" << context));
}
std::unique_ptr<HealthLogEntry> dbCheckWarningHealthLogEntry(const NamespaceString& nss,
@@ -171,13 +173,22 @@ std::unique_ptr<HealthLogEntry> dbCheckBatchEntry(
const std::string& foundHash,
const BSONKey& minKey,
const BSONKey& maxKey,
+ const boost::optional<Timestamp>& readTimestamp,
const repl::OpTime& optime,
const boost::optional<CollectionOptions>& options) {
auto hashes = expectedFound(expectedHash, foundHash);
- auto data = BSON("success" << true << "count" << count << "bytes" << bytes << "md5"
- << hashes.second << "minKey" << minKey.elem() << "maxKey"
- << maxKey.elem() << "optime" << optime);
+ BSONObjBuilder builder;
+ builder.append("success", true);
+ builder.append("count", count);
+ builder.append("bytes", bytes);
+ builder.append("md5", hashes.second);
+ builder.appendAs(minKey.elem(), "minKey");
+ builder.appendAs(maxKey.elem(), "maxKey");
+ if (readTimestamp) {
+ builder.append("readTimestamp", *readTimestamp);
+ }
+ builder.append("optime", optime.toBSON());
const auto hashesMatch = hashes.first;
const auto severity = [&] {
@@ -196,7 +207,7 @@ std::unique_ptr<HealthLogEntry> dbCheckBatchEntry(
std::string msg =
"dbCheck batch " + (hashesMatch ? std::string("consistent") : std::string("inconsistent"));
- return dbCheckHealthLogEntry(nss, severity, msg, OplogEntriesEnum::Batch, data);
+ return dbCheckHealthLogEntry(nss, severity, msg, OplogEntriesEnum::Batch, builder.obj());
}
DbCheckHasher::DbCheckHasher(OperationContext* opCtx,
@@ -332,62 +343,57 @@ namespace {
Status dbCheckBatchOnSecondary(OperationContext* opCtx,
const repl::OpTime& optime,
const DbCheckOplogBatch& entry) {
- AutoGetCollection coll(opCtx, entry.getNss(), MODE_S);
- const auto& collection = coll.getCollection();
-
- if (!collection) {
- const auto msg = "Collection under dbCheck no longer exists";
- auto logEntry = dbCheckHealthLogEntry(entry.getNss(),
- SeverityEnum::Info,
- "dbCheck failed",
- OplogEntriesEnum::Batch,
- BSON("success" << false << "info" << msg));
- HealthLog::get(opCtx).log(*logEntry);
- return Status::OK();
- }
-
const auto msg = "replication consistency check";
// Set up the hasher,
- Status status = Status::OK();
boost::optional<DbCheckHasher> hasher;
try {
+ auto lockMode = MODE_S;
+ if (entry.getReadTimestamp()) {
+ lockMode = MODE_IS;
+ opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ entry.getReadTimestamp());
+ }
+
+ AutoGetCollection coll(opCtx, entry.getNss(), lockMode);
+ const auto& collection = coll.getCollection();
+
+ if (!collection) {
+ const auto msg = "Collection under dbCheck no longer exists";
+ auto logEntry = dbCheckHealthLogEntry(entry.getNss(),
+ SeverityEnum::Info,
+ "dbCheck failed",
+ OplogEntriesEnum::Batch,
+ BSON("success" << false << "info" << msg));
+ HealthLog::get(opCtx).log(*logEntry);
+ return Status::OK();
+ }
+
hasher.emplace(opCtx, collection, entry.getMinKey(), entry.getMaxKey());
+ uassertStatusOK(hasher->hashAll(opCtx));
+
+ std::string expected = entry.getMd5().toString();
+ std::string found = hasher->total();
+
+ auto logEntry = dbCheckBatchEntry(entry.getNss(),
+ hasher->docsSeen(),
+ hasher->bytesSeen(),
+ expected,
+ found,
+ entry.getMinKey(),
+ hasher->lastKey(),
+ entry.getReadTimestamp(),
+ optime,
+ collection->getCollectionOptions());
+
+ HealthLog::get(opCtx).log(*logEntry);
} catch (const DBException& exception) {
+ // In case of an error, report it to the health log,
auto logEntry = dbCheckErrorHealthLogEntry(
- entry.getNss(), msg, OplogEntriesEnum::Batch, exception.toStatus());
+ entry.getNss(), msg, OplogEntriesEnum::Batch, exception.toStatus(), entry.toBSON());
HealthLog::get(opCtx).log(*logEntry);
return Status::OK();
}
-
- // run the hasher.
- if (status.isOK()) {
- status = hasher->hashAll(opCtx);
- }
-
- // In case of an error, report it to the health log,
- if (!status.isOK()) {
- auto logEntry =
- dbCheckErrorHealthLogEntry(entry.getNss(), msg, OplogEntriesEnum::Batch, status);
- HealthLog::get(opCtx).log(*logEntry);
- return Status::OK();
- }
-
- std::string expected = entry.getMd5().toString();
- std::string found = hasher->total();
-
- auto logEntry = dbCheckBatchEntry(entry.getNss(),
- hasher->docsSeen(),
- hasher->bytesSeen(),
- expected,
- found,
- entry.getMinKey(),
- hasher->lastKey(),
- optime,
- collection->getCollectionOptions());
-
- HealthLog::get(opCtx).log(*logEntry);
-
return Status::OK();
}
diff --git a/src/mongo/db/repl/dbcheck.h b/src/mongo/db/repl/dbcheck.h
index 150580e7aa1..73c9bef29f1 100644
--- a/src/mongo/db/repl/dbcheck.h
+++ b/src/mongo/db/repl/dbcheck.h
@@ -63,7 +63,8 @@ std::unique_ptr<HealthLogEntry> dbCheckHealthLogEntry(const NamespaceString& nss
std::unique_ptr<HealthLogEntry> dbCheckErrorHealthLogEntry(const NamespaceString& nss,
const std::string& msg,
OplogEntriesEnum operation,
- const Status& err);
+ const Status& err,
+ const BSONObj& context = BSONObj());
std::unique_ptr<HealthLogEntry> dbCheckWarningHealthLogEntry(const NamespaceString& nss,
const std::string& msg,
@@ -80,6 +81,7 @@ std::unique_ptr<HealthLogEntry> dbCheckBatchEntry(
const std::string& foundHash,
const BSONKey& minKey,
const BSONKey& maxKey,
+ const boost::optional<Timestamp>& timestamp,
const repl::OpTime& optime,
const boost::optional<CollectionOptions>& options = boost::none);
diff --git a/src/mongo/db/repl/dbcheck.idl b/src/mongo/db/repl/dbcheck.idl
index 9a16e3456e0..c1e7fedfacc 100644
--- a/src/mongo/db/repl/dbcheck.idl
+++ b/src/mongo/db/repl/dbcheck.idl
@@ -130,6 +130,9 @@ structs:
validator:
gte: 10
lte: 20000
+ snapshotRead:
+ type: safeBool
+ default: true
DbCheckAllInvocation:
description: "Command object for database-wide form of dbCheck invocation"
@@ -158,6 +161,9 @@ structs:
validator:
gte: 10
lte: 20000
+ snapshotRead:
+ type: safeBool
+ default: true
DbCheckOplogBatch:
description: "Oplog entry for a dbCheck batch"
@@ -180,6 +186,9 @@ structs:
maxRate:
type: safeInt64
optional: true
+ readTimestamp:
+ type: timestamp
+ optional: true
DbCheckOplogCollection:
description: "Oplog entry for dbCheck collection metadata"