diff options
author | Louis Williams <louis.williams@mongodb.com> | 2021-12-16 17:54:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-17 16:00:11 +0000 |
commit | da10567348cf7883e2ec40a441e38efc17c23385 (patch) | |
tree | 3824041cfc93a241551b4ce32d41239ab2903816 /src | |
parent | 1027d63b8bd49844e9aa91c25c126703bdb9ae0b (diff) | |
download | mongo-da10567348cf7883e2ec40a441e38efc17c23385.tar.gz |
SERVER-61754 dbCheck does not hold strong locks during batches
The dbCheck command accepts an optional (default true) parameter,
'snapshotRead', that uses point-in-time reads to check each batch. When
true, dbCheck does not block writes. When set to false, dbCheck reverts to
blocking writes.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/dbcheck.cpp | 101 | ||||
-rw-r--r-- | src/mongo/db/concurrency/deferred_writer.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/concurrency/deferred_writer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/dbcheck.cpp | 116 | ||||
-rw-r--r-- | src/mongo/db/repl/dbcheck.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/dbcheck.idl | 9 |
6 files changed, 156 insertions, 102 deletions
diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp index e2ddda90c87..4f4f9b2de6f 100644 --- a/src/mongo/db/commands/dbcheck.cpp +++ b/src/mongo/db/commands/dbcheck.cpp @@ -68,6 +68,7 @@ struct DbCheckCollectionInfo { int64_t maxDocsPerBatch; int64_t maxBytesPerBatch; int64_t maxBatchTimeMillis; + bool snapshotRead; }; /** @@ -105,7 +106,8 @@ std::unique_ptr<DbCheckRun> singleCollectionRun(OperationContext* opCtx, maxRate, maxDocsPerBatch, maxBytesPerBatch, - maxBatchTimeMillis}; + maxBatchTimeMillis, + invocation.getSnapshotRead()}; auto result = std::make_unique<DbCheckRun>(); result->push_back(info); return result; @@ -138,7 +140,8 @@ std::unique_ptr<DbCheckRun> fullDatabaseRun(OperationContext* opCtx, rate, maxDocsPerBatch, maxBytesPerBatch, - maxBatchTimeMillis}; + maxBatchTimeMillis, + invocation.getSnapshotRead()}; result->push_back(info); return true; }; @@ -253,25 +256,37 @@ private: return; } - std::unique_ptr<HealthLogEntry> entry; - if (!result.isOK()) { - auto code = result.getStatus().code(); + bool retryable = false; + std::unique_ptr<HealthLogEntry> entry; + + const auto code = result.getStatus().code(); if (code == ErrorCodes::LockTimeout) { + retryable = true; entry = dbCheckWarningHealthLogEntry( info.nss, "retrying dbCheck batch after timeout due to lock unavailability", OplogEntriesEnum::Batch, result.getStatus()); - HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry); - continue; - } - if (code == ErrorCodes::NamespaceNotFound) { + } else if (code == ErrorCodes::SnapshotUnavailable) { + retryable = true; + entry = dbCheckWarningHealthLogEntry( + info.nss, + "retrying dbCheck batch after conflict with pending catalog operation", + OplogEntriesEnum::Batch, + result.getStatus()); + } else if (code == ErrorCodes::NamespaceNotFound) { entry = dbCheckWarningHealthLogEntry( info.nss, "abandoning dbCheck batch because collection no longer exists", OplogEntriesEnum::Batch, result.getStatus()); + } else if (code == ErrorCodes::IndexNotFound) { + entry = dbCheckWarningHealthLogEntry( + info.nss, + "skipping dbCheck on collection because it is missing an _id index", + OplogEntriesEnum::Batch, + result.getStatus()); } else if (ErrorCodes::isA<ErrorCategory::NotPrimaryError>(code)) { entry = dbCheckWarningHealthLogEntry( info.nss, @@ -285,21 +300,23 @@ private: result.getStatus()); } HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry); + if (retryable) { + continue; + } return; - } else { - auto stats = result.getValue(); - entry = dbCheckBatchEntry(info.nss, - stats.nDocs, - stats.nBytes, - stats.md5, - stats.md5, - start, - stats.lastKey, - stats.time); - HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry); } auto stats = result.getValue(); + auto entry = dbCheckBatchEntry(info.nss, + stats.nDocs, + stats.nBytes, + stats.md5, + stats.md5, + start, + stats.lastKey, + stats.readTimestamp, + stats.time); + HealthLog::get(Client::getCurrent()->getServiceContext()).log(*entry); start = stats.lastKey; @@ -333,6 +350,7 @@ private: BSONKey lastKey; std::string md5; repl::OpTime time; + boost::optional<Timestamp> readTimestamp; }; // Set if the job cannot proceed. @@ -348,6 +366,17 @@ private: auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); auto opCtx = uniqueOpCtx.get(); + auto lockMode = MODE_S; + if (info.snapshotRead) { + // Each batch will read at the latest no-overlap point, which is the all_durable + // timestamp on primaries. We assume that the history window on secondaries is always + // longer than the time it takes between starting and replicating a batch on the + // primary. Otherwise, the readTimestamp will not be available on a secondary by the + // time it processes the oplog entry. + lockMode = MODE_IS; + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap); + } + BatchStats result; auto timeoutMs = Milliseconds(gDbCheckCollectionTryLockTimeoutMillis.load()); const auto initialBackoffMs = @@ -355,14 +384,14 @@ private: auto backoffMs = initialBackoffMs; for (int attempt = 1;; attempt++) { try { - // Try to acquire collection lock in S mode with increasing timeout and bounded - // exponential backoff. + // Try to acquire collection lock with increasing timeout and bounded exponential + // backoff. auto const lockDeadline = Date_t::now() + timeoutMs; timeoutMs *= 2; AutoGetCollection agc(opCtx, info.nss, - MODE_S, + lockMode, AutoGetCollectionViewMode::kViewsForbidden, lockDeadline); @@ -376,15 +405,18 @@ private: CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, info.nss); if (!collection) { const auto msg = "Collection under dbCheck no longer exists"; - auto entry = dbCheckHealthLogEntry(info.nss, - SeverityEnum::Info, - "dbCheck failed", - OplogEntriesEnum::Batch, - BSON("success" << false << "error" << msg)); - HealthLog::get(opCtx).log(*entry); return {ErrorCodes::NamespaceNotFound, msg}; } + auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx); + auto minVisible = collection->getMinimumVisibleSnapshot(); + if (readTimestamp && minVisible && + *readTimestamp < *collection->getMinimumVisibleSnapshot()) { + return {ErrorCodes::SnapshotUnavailable, + str::stream() << "Unable to read from collection " << info.nss + << " due to pending catalog changes"}; + } + boost::optional<DbCheckHasher> hasher; try { hasher.emplace(opCtx, @@ -412,9 +444,11 @@ private: batch.setMd5(md5); batch.setMinKey(first); batch.setMaxKey(BSONKey(hasher->lastKey())); + batch.setReadTimestamp(readTimestamp); // Send information on this batch over the oplog. result.time = _logOp(opCtx, info.nss, collection->uuid(), batch.toBSON()); + result.readTimestamp = readTimestamp; result.nDocs = hasher->docsSeen(); result.nBytes = hasher->bytesSeen(); @@ -526,10 +560,11 @@ public: " maxKey: <last key, inclusive>,\n" " maxCount: <max number of docs>,\n" " maxSize: <max size of docs>,\n" - " maxCountPerSecond: <max rate in docs/sec> } " - " maxDocsPerBatch: <max number of docs/batch> } " - " maxBytesPerBatch: <try to keep a batch within max bytes/batch> } " - " maxBatchTimeMillis: <max time processing a batch in milliseconds> } " + " maxCountPerSecond: <max rate in docs/sec>\n" + " maxDocsPerBatch: <max number of docs/batch>\n" + " maxBytesPerBatch: <try to keep a batch within max bytes/batch>\n" + " maxBatchTimeMillis: <max time processing a batch in milliseconds>\n" + " readTimestamp: <bool, read at a timestamp without strong locks> }\n" "to check a collection.\n" "Invoke with {dbCheck: 1} to check all collections in the database."; } diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp index be9facf84ec..9c6abe5ff47 100644 --- a/src/mongo/db/concurrency/deferred_writer.cpp +++ b/src/mongo/db/concurrency/deferred_writer.cpp @@ -48,7 +48,6 @@ auto kLogInterval = stdx::chrono::minutes(1); void DeferredWriter::_logFailure(const Status& status) { if (TimePoint::clock::now() - _lastLogged > kLogInterval) { LOGV2(20516, - "Unable to write to collection {namespace}: {error}", "Unable to write to collection", "namespace"_attr = _nss.toString(), "error"_attr = status); @@ -100,14 +99,13 @@ StatusWith<std::unique_ptr<AutoGetCollection>> DeferredWriter::_getCollection( return std::move(agc); } -void DeferredWriter::_worker(InsertStatement stmt) try { +Status DeferredWriter::_worker(InsertStatement stmt) noexcept try { auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); OperationContext* opCtx = uniqueOpCtx.get(); auto result = _getCollection(opCtx); if (!result.isOK()) { - _logFailure(result.getStatus()); - return; + return result.getStatus(); } auto agc = std::move(result.getValue()); @@ -128,13 +126,9 @@ void DeferredWriter::_worker(InsertStatement stmt) try { stdx::lock_guard<Latch> lock(_mutex); _numBytes -= stmt.doc.objsize(); - - // If a write to a deferred collection fails, periodically tell the log. - if (!status.isOK()) { - _logFailure(status); - } + return status; } catch (const DBException& e) { - _logFailure(e.toStatus()); + return e.toStatus(); } DeferredWriter::DeferredWriter(NamespaceString nss, CollectionOptions opts, int64_t maxSize) @@ -155,7 +149,12 @@ void DeferredWriter::startup(std::string workerName) { options.threadNamePrefix = workerName; options.minThreads = 0; options.maxThreads = 1; - options.onCreateThread = [](const std::string& name) { Client::initThread(name); }; + options.onCreateThread = [](const std::string& name) { + Client::initThread(name); + + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationKillableByStepdown(lk); + }; _pool = std::make_unique<ThreadPool>(options); _pool->startup(); } @@ -190,7 +189,10 @@ bool DeferredWriter::insertDocument(BSONObj obj) { _pool->schedule([this, obj](auto status) { fassert(40588, status); - _worker(InsertStatement(obj.getOwned())); + auto workerStatus = _worker(InsertStatement(obj.getOwned())); + if (!workerStatus.isOK()) { + _logFailure(workerStatus); + } }); return true; } diff --git a/src/mongo/db/concurrency/deferred_writer.h b/src/mongo/db/concurrency/deferred_writer.h index 0ac8238fa8d..1f45fbb8d15 100644 --- a/src/mongo/db/concurrency/deferred_writer.h +++ b/src/mongo/db/concurrency/deferred_writer.h @@ -136,7 +136,7 @@ private: /** * The method that the worker thread will run. */ - void _worker(InsertStatement stmt); + Status _worker(InsertStatement stmt) noexcept; /** * The options for the collection, in case we need to create it. diff --git a/src/mongo/db/repl/dbcheck.cpp b/src/mongo/db/repl/dbcheck.cpp index 87cebb7b4bf..028df1c3469 100644 --- a/src/mongo/db/repl/dbcheck.cpp +++ b/src/mongo/db/repl/dbcheck.cpp @@ -141,12 +141,14 @@ std::unique_ptr<HealthLogEntry> dbCheckHealthLogEntry(const NamespaceString& nss std::unique_ptr<HealthLogEntry> dbCheckErrorHealthLogEntry(const NamespaceString& nss, const std::string& msg, OplogEntriesEnum operation, - const Status& err) { - return dbCheckHealthLogEntry(nss, - SeverityEnum::Error, - msg, - operation, - BSON("success" << false << "error" << err.toString())); + const Status& err, + const BSONObj& context) { + return dbCheckHealthLogEntry( + nss, + SeverityEnum::Error, + msg, + operation, + BSON("success" << false << "error" << err.toString() << "context" << context)); } std::unique_ptr<HealthLogEntry> dbCheckWarningHealthLogEntry(const NamespaceString& nss, @@ -171,13 +173,22 @@ std::unique_ptr<HealthLogEntry> dbCheckBatchEntry( const std::string& foundHash, const BSONKey& minKey, const BSONKey& maxKey, + const boost::optional<Timestamp>& readTimestamp, const repl::OpTime& optime, const boost::optional<CollectionOptions>& options) { auto hashes = expectedFound(expectedHash, foundHash); - auto data = BSON("success" << true << "count" << count << "bytes" << bytes << "md5" - << hashes.second << "minKey" << minKey.elem() << "maxKey" - << maxKey.elem() << "optime" << optime); + BSONObjBuilder builder; + builder.append("success", true); + builder.append("count", count); + builder.append("bytes", bytes); + builder.append("md5", hashes.second); + builder.appendAs(minKey.elem(), "minKey"); + builder.appendAs(maxKey.elem(), "maxKey"); + if (readTimestamp) { + builder.append("readTimestamp", *readTimestamp); + } + builder.append("optime", optime.toBSON()); const auto hashesMatch = hashes.first; const auto severity = [&] { @@ -196,7 +207,7 @@ std::unique_ptr<HealthLogEntry> dbCheckBatchEntry( std::string msg = "dbCheck batch " + (hashesMatch ? std::string("consistent") : std::string("inconsistent")); - return dbCheckHealthLogEntry(nss, severity, msg, OplogEntriesEnum::Batch, data); + return dbCheckHealthLogEntry(nss, severity, msg, OplogEntriesEnum::Batch, builder.obj()); } DbCheckHasher::DbCheckHasher(OperationContext* opCtx, @@ -332,62 +343,57 @@ namespace { Status dbCheckBatchOnSecondary(OperationContext* opCtx, const repl::OpTime& optime, const DbCheckOplogBatch& entry) { - AutoGetCollection coll(opCtx, entry.getNss(), MODE_S); - const auto& collection = coll.getCollection(); - - if (!collection) { - const auto msg = "Collection under dbCheck no longer exists"; - auto logEntry = dbCheckHealthLogEntry(entry.getNss(), - SeverityEnum::Info, - "dbCheck failed", - OplogEntriesEnum::Batch, - BSON("success" << false << "info" << msg)); - HealthLog::get(opCtx).log(*logEntry); - return Status::OK(); - } - const auto msg = "replication consistency check"; // Set up the hasher, - Status status = Status::OK(); boost::optional<DbCheckHasher> hasher; try { + auto lockMode = MODE_S; + if (entry.getReadTimestamp()) { + lockMode = MODE_IS; + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + entry.getReadTimestamp()); + } + + AutoGetCollection coll(opCtx, entry.getNss(), lockMode); + const auto& collection = coll.getCollection(); + + if (!collection) { + const auto msg = "Collection under dbCheck no longer exists"; + auto logEntry = dbCheckHealthLogEntry(entry.getNss(), + SeverityEnum::Info, + "dbCheck failed", + OplogEntriesEnum::Batch, + BSON("success" << false << "info" << msg)); + HealthLog::get(opCtx).log(*logEntry); + return Status::OK(); + } + hasher.emplace(opCtx, collection, entry.getMinKey(), entry.getMaxKey()); + uassertStatusOK(hasher->hashAll(opCtx)); + + std::string expected = entry.getMd5().toString(); + std::string found = hasher->total(); + + auto logEntry = dbCheckBatchEntry(entry.getNss(), + hasher->docsSeen(), + hasher->bytesSeen(), + expected, + found, + entry.getMinKey(), + hasher->lastKey(), + entry.getReadTimestamp(), + optime, + collection->getCollectionOptions()); + + HealthLog::get(opCtx).log(*logEntry); } catch (const DBException& exception) { + // In case of an error, report it to the health log, auto logEntry = dbCheckErrorHealthLogEntry( - entry.getNss(), msg, OplogEntriesEnum::Batch, exception.toStatus()); + entry.getNss(), msg, OplogEntriesEnum::Batch, exception.toStatus(), entry.toBSON()); HealthLog::get(opCtx).log(*logEntry); return Status::OK(); } - - // run the hasher. - if (status.isOK()) { - status = hasher->hashAll(opCtx); - } - - // In case of an error, report it to the health log, - if (!status.isOK()) { - auto logEntry = - dbCheckErrorHealthLogEntry(entry.getNss(), msg, OplogEntriesEnum::Batch, status); - HealthLog::get(opCtx).log(*logEntry); - return Status::OK(); - } - - std::string expected = entry.getMd5().toString(); - std::string found = hasher->total(); - - auto logEntry = dbCheckBatchEntry(entry.getNss(), - hasher->docsSeen(), - hasher->bytesSeen(), - expected, - found, - entry.getMinKey(), - hasher->lastKey(), - optime, - collection->getCollectionOptions()); - - HealthLog::get(opCtx).log(*logEntry); - return Status::OK(); } diff --git a/src/mongo/db/repl/dbcheck.h b/src/mongo/db/repl/dbcheck.h index 150580e7aa1..73c9bef29f1 100644 --- a/src/mongo/db/repl/dbcheck.h +++ b/src/mongo/db/repl/dbcheck.h @@ -63,7 +63,8 @@ std::unique_ptr<HealthLogEntry> dbCheckHealthLogEntry(const NamespaceString& nss std::unique_ptr<HealthLogEntry> dbCheckErrorHealthLogEntry(const NamespaceString& nss, const std::string& msg, OplogEntriesEnum operation, - const Status& err); + const Status& err, + const BSONObj& context = BSONObj()); std::unique_ptr<HealthLogEntry> dbCheckWarningHealthLogEntry(const NamespaceString& nss, const std::string& msg, @@ -80,6 +81,7 @@ std::unique_ptr<HealthLogEntry> dbCheckBatchEntry( const std::string& foundHash, const BSONKey& minKey, const BSONKey& maxKey, + const boost::optional<Timestamp>& timestamp, const repl::OpTime& optime, const boost::optional<CollectionOptions>& options = boost::none); diff --git a/src/mongo/db/repl/dbcheck.idl b/src/mongo/db/repl/dbcheck.idl index 9a16e3456e0..c1e7fedfacc 100644 --- a/src/mongo/db/repl/dbcheck.idl +++ b/src/mongo/db/repl/dbcheck.idl @@ -130,6 +130,9 @@ structs: validator: gte: 10 lte: 20000 + snapshotRead: + type: safeBool + default: true DbCheckAllInvocation: description: "Command object for database-wide form of dbCheck invocation" @@ -158,6 +161,9 @@ structs: validator: gte: 10 lte: 20000 + snapshotRead: + type: safeBool + default: true DbCheckOplogBatch: description: "Oplog entry for a dbCheck batch" @@ -180,6 +186,9 @@ structs: maxRate: type: safeInt64 optional: true + readTimestamp: + type: timestamp + optional: true DbCheckOplogCollection: description: "Oplog entry for dbCheck collection metadata" |