diff options
author | Jason Chan <jason.chan@10gen.com> | 2021-05-11 11:20:19 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-14 15:03:33 +0000 |
commit | fc57a64f959668b04ff7c7f2efdd68ce95a11736 (patch) | |
tree | 91d37d6bc4cd4f05d4582838119952f588d588c7 | |
parent | 955cd836df419fc13c453d56e67f7a98981fc1fb (diff) | |
download | mongo-fc57a64f959668b04ff7c7f2efdd68ce95a11736.tar.gz |
SERVER-56563 Forge noop image oplog entries for chunk migration
5 files changed, 95 insertions, 6 deletions
diff --git a/jstests/replsets/retryable_writes_initial_sync.js b/jstests/replsets/retryable_writes_initial_sync.js index b6a663dc319..ce9bc2448f2 100644 --- a/jstests/replsets/retryable_writes_initial_sync.js +++ b/jstests/replsets/retryable_writes_initial_sync.js @@ -1,6 +1,8 @@ /** * Tests that retryable findAndModify data stored in the `config.image_collection` side collection * do not get populated by nodes doing oplog application while in initial sync. + * + * @tags: [uses_transactions] */ (function() { 'use strict'; diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 2883d5a8f5e..ade41c4e059 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -55,7 +55,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, const repl::OplogEntry& oplogWithCorrectLinks) { auto opType = oplogEntry.getOpType(); auto ts = oplogEntry.getTimestamp(); - const auto needsRetryImage = oplogEntry.getNeedsRetryImage(); + const auto needsRetryImage = oplogWithCorrectLinks.getNeedsRetryImage(); if (opType == repl::OpTypeEnum::kDelete) { uassert( diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index c35928016d8..82eda55c6e1 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -234,7 +234,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, result.isPrePostImage = true; uassert(40632, - str::stream() << "Can't handle 2 pre/post image oplog in a row. Prevoius oplog " + str::stream() << "Can't handle 2 pre/post image oplog in a row. Previous oplog " << lastResult.oplogTime.getTimestamp().toString() << ", oplog ts: " << oplogEntry.getTimestamp().toString() @@ -263,6 +263,32 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, BSONObj object(result.isPrePostImage ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); + boost::optional<repl::RetryImageEnum> needsRetryImage; + if (oplogEntry.getNeedsRetryImage()) { + if (!lastResult.isPrePostImage) { + // The source was unable to find the matching image entry for this oplog entry. In + // this case, we do not have a pre- or post- image oplog link so we should set the + // 'needsRetryImage' field to avoid validation errors on retries of this operation. + needsRetryImage = oplogEntry.getNeedsRetryImage().get(); + } else { + // Downconvert the oplog entry by removing the `needsRetryImage` field and appending a + // pre- or post- image opTime link. + BSONObj newBson = + oplogEntry.toBSON().removeField(repl::OplogEntryBase::kNeedsRetryImageFieldName); + BSONObjBuilder appendImageOpTimeBuilder(std::move(newBson)); + switch (*oplogEntry.getNeedsRetryImage()) { + case repl::RetryImageEnum::kPreImage: + appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPreImageOpTimeFieldName, + lastResult.oplogTime.toBSON()); + break; + case repl::RetryImageEnum::kPostImage: + appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPostImageOpTimeFieldName, + lastResult.oplogTime.toBSON()); + break; + } + oplogEntry = parseOplog(appendImageOpTimeBuilder.obj().getOwned()); + } + } auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevOpTime = session->getLastWriteOpTime(result.txnNum); @@ -292,7 +318,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, stmtId, oplogLink, OplogSlot(), - {}); + needsRetryImage); const auto& oplogOpTime = result.oplogTime; uassert(40633, diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 7d451b0b81e..f3148280cb3 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -36,6 +36,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/session.h" @@ -52,8 +53,47 @@ namespace { PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64()); +boost::optional<repl::OplogEntry> forgeNoopEntryFromImageCollection(const repl::OplogEntry& entry, + OperationContext* opCtx) { + invariant(entry.getNeedsRetryImage()); + DBDirectClient client(opCtx); + const auto query = BSON("_id" << entry.getSessionId()->toBSON()); + auto imageDoc = client.findOne(NamespaceString::kConfigImagesNamespace.ns(), query); + if (imageDoc.isEmpty()) { + return boost::none; + } + auto imageEntry = repl::ImageEntry::parse( + IDLParserErrorContext("Parse image entry in nextSessionMigrationBatch"), imageDoc); + if (imageEntry.getTxnNumber() != *entry.getTxnNumber()) { + // In our snapshot, fetch the current transaction number for a session. If that transaction + // number doesn't match what's found on the image lookup, it implies that the image is not + // the correct version for this oplog entry. We will not forge a noop from it. + return boost::none; + } + + BSONObjBuilder b; + // The opTime, namespace, and wall fields are expected to get overwritten by the + // destination. + b.append("ts", Timestamp::min()); + b.append("t", -1LL); + b.appendDate("wall", Date_t::now()); + b.append("ns", entry.getNamespace().ns()); + b.append("op", OpType_serializer(repl::OpTypeEnum::kNoop)); + b.append("v", repl::OplogEntry::kOplogVersion); + // The 'h' field is used for pv0 which is now deprecated. + b.append("h", 0LL); + b.append("o", imageEntry.getImage()); + b.append(repl::OplogEntryBase::kSessionIdFieldName, imageEntry.get_id().toBSON()); + b.append(repl::OplogEntryBase::kTxnNumberFieldName, imageEntry.getTxnNumber()); + b.append(repl::OplogEntryBase::kStatementIdFieldName, *entry.getStatementId()); + return uassertStatusOK(repl::OplogEntry::parse(b.obj())); +} + boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx, const repl::OplogEntry& oplog) { + if (oplog.getNeedsRetryImage()) { + return forgeNoopEntryFromImageCollection(oplog, opCtx); + } auto opTimeToFetch = oplog.getPreImageOpTime(); if (!opTimeToFetch) { @@ -201,6 +241,9 @@ SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLas { stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + if (_lastFetchedNewWriteOplogImage) { + return OplogResult(_lastFetchedNewWriteOplogImage.get(), false); + } return OplogResult(_lastFetchedNewWriteOplog, true); } } @@ -312,6 +355,14 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + if (_lastFetchedNewWriteOplogImage) { + // This indicates that the last consumed oplog entry was a forged image oplog. We must + // not buffer the next oplog entry until we have consumed the original oplog entry that + // generated the forged image. + _lastFetchedNewWriteOplogImage.reset(); + return true; + } + if (_newWriteOpTimeList.empty()) { _lastFetchedNewWriteOplog.reset(); return false; @@ -330,13 +381,20 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op str::stream() << "Unable to fetch oplog entry with opTime: " << nextOpTimeToFetch.toBSON(), !newWriteOplog.isEmpty()); - + auto newWriteOplogEntry = uassertStatusOK(repl::OplogEntry::parse(newWriteOplog)); + boost::optional<repl::OplogEntry> forgedNoopImage; + if (newWriteOplogEntry.getNeedsRetryImage()) { + forgedNoopImage = forgeNoopEntryFromImageCollection(newWriteOplogEntry, opCtx); + } { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); - _lastFetchedNewWriteOplog = uassertStatusOK(repl::OplogEntry::parse(newWriteOplog)); + _lastFetchedNewWriteOplog = newWriteOplogEntry; _newWriteOpTimeList.pop_front(); - } + if (forgedNoopImage) { + _lastFetchedNewWriteOplogImage = forgedNoopImage; + } + } return true; } diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 148c5c29d60..032f6af3ccf 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -238,6 +238,9 @@ private: // Used to store the last fetched oplog. This enables calling get multiple times. boost::optional<repl::OplogEntry> _lastFetchedOplog; + // Used to store an image when `_lastFetchedNewWriteOplog` has a `needsRetryImage` field. + boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage; + // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification stdx::mutex _newOplogMutex; |