diff options
author | Andrew Shuvalov <andrew.shuvalov@mongodb.com> | 2021-05-28 18:45:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-28 19:05:18 +0000 |
commit | d9aee7e28be05a49e092b4acd483906eef7e735e (patch) | |
tree | 517569cd1219c84b7a39c75b56165ae790be8973 | |
parent | 2d6c0ba951c9a0d7c327d7df7d3b282d8ef177a7 (diff) | |
download | mongo-d9aee7e28be05a49e092b4acd483906eef7e735e.tar.gz |
SERVER-56563: [RRFaM] Forge noop image oplog entries for chunk migration
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.cpp | 67 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.h | 3 |
5 files changed, 99 insertions, 4 deletions
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 619aed300f8..92a050ae717 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -53,7 +53,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, const repl::OplogEntry& oplogWithCorrectLinks) { auto opType = oplogEntry.getOpType(); auto ts = oplogEntry.getTimestamp(); - const bool needsRetryImage = oplogEntry.getNeedsRetryImage().is_initialized(); + const bool needsRetryImage = oplogWithCorrectLinks.getNeedsRetryImage().is_initialized(); if (opType == repl::OpTypeEnum::kDelete) { uassert( diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 9878bb9fbe3..126692909c6 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -104,6 +104,7 @@ env.Library( 'transaction_coordinator', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/repl/image_collection_entry', '$BUILD_DIR/mongo/db/session_catalog', '$BUILD_DIR/mongo/idl/server_parameter', ], diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 6a7a3ee75de..fa66fe265f9 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -220,7 +220,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, result.isPrePostImage = true; uassert(40632, - str::stream() << "Can't handle 2 pre/post image oplog in a row. Prevoius oplog " + str::stream() << "Can't handle 2 pre/post image oplog in a row. Previous oplog " << lastResult.oplogTime.getTimestamp().toString() << ", oplog ts: " << oplogEntry.getTimestamp().toString() << ": " << oplogBSON, @@ -263,6 +263,32 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, BSONObj object(result.isPrePostImage ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); + boost::optional<repl::RetryImageEnum> needsRetryImage; + if (oplogEntry.getNeedsRetryImage()) { + if (!lastResult.isPrePostImage) { + // The source was unable to find the matching image entry for this oplog entry. In + // this case, we do not have a pre- or post- image oplog link so we should set the + // 'needsRetryImage' field to avoid validation errors on retries of this operation. + needsRetryImage = oplogEntry.getNeedsRetryImage().get(); + } else { + // Downconvert the oplog entry by removing the `needsRetryImage` field and appending a + // pre- or post- image opTime link. + BSONObj newBson = + oplogEntry.toBSON().removeField(repl::OplogEntryBase::kNeedsRetryImageFieldName); + BSONObjBuilder appendImageOpTimeBuilder(std::move(newBson)); + switch (*oplogEntry.getNeedsRetryImage()) { + case repl::RetryImageEnum::kPreImage: + appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPreImageOpTimeFieldName, + lastResult.oplogTime.toBSON()); + break; + case repl::RetryImageEnum::kPostImage: + appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPostImageOpTimeFieldName, + lastResult.oplogTime.toBSON()); + break; + } + oplogEntry = parseOplog(appendImageOpTimeBuilder.obj().getOwned()); + } + } auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime(); @@ -292,7 +318,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, stmtId, oplogLink, OplogSlot(), - {}); + needsRetryImage); const auto& oplogOpTime = result.oplogTime; uassert(40633, diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 30bf462209c..7c3ac3b2fd3 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -36,6 +36,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" +#include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/session.h" @@ -53,8 +54,51 @@ namespace { PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64()); +boost::optional<repl::OplogEntry> forgeNoopEntryFromImageCollection( + OperationContext* opCtx, const repl::OplogEntry& retryableFindAndModifyOplogEntry) { + invariant(retryableFindAndModifyOplogEntry.getNeedsRetryImage()); + + DBDirectClient client(opCtx); + BSONObj imageObj = + client.findOne(NamespaceString::kConfigImagesNamespace.ns(), + BSON("_id" << retryableFindAndModifyOplogEntry.getSessionId()->toBSON()), + nullptr); + if (imageObj.isEmpty()) { + return boost::none; + } + + auto imageEntry = repl::ImageEntry::parse(IDLParserErrorContext("image entry"), imageObj); + if (imageEntry.getTxnNumber() != retryableFindAndModifyOplogEntry.getTxnNumber()) { + // In our snapshot, fetch the current transaction number for a session. If that transaction + // number doesn't match what's found on the image lookup, it implies that the image is not + // the correct version for this oplog entry. We will not forge a noop from it. + return boost::none; + } + + BSONObjBuilder b; + // The opTime, namespace, and wall fields are expected to get overwritten by the + // destination. + b.append("ts", Timestamp::min()); + b.append("t", -1LL); + b.appendDate("wall", Date_t::now()); + b.append("ns", retryableFindAndModifyOplogEntry.getNss().ns()); + b.append("op", OpType_serializer(repl::OpTypeEnum::kNoop)); + b.append("v", repl::OplogEntry::kOplogVersion); + // The 'h' field is used for pv0 which is now deprecated. + b.append("h", 0LL); + b.append("o", imageEntry.getImage()); + b.append(repl::OplogEntryBase::kSessionIdFieldName, imageEntry.get_id().toBSON()); + b.append(repl::OplogEntryBase::kTxnNumberFieldName, imageEntry.getTxnNumber()); + b.append(repl::OplogEntryBase::kStatementIdFieldName, + *retryableFindAndModifyOplogEntry.getStatementId()); + return uassertStatusOK(repl::OplogEntry::parse(b.obj())); +} + boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx, const repl::OplogEntry& oplog) { + if (oplog.getNeedsRetryImage()) { + return forgeNoopEntryFromImageCollection(opCtx, oplog); + } auto opTimeToFetch = oplog.getPreImageOpTime(); if (!opTimeToFetch) { @@ -214,6 +258,9 @@ SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLas { stdx::lock_guard<Latch> _lk(_newOplogMutex); + if (_lastFetchedNewWriteOplogImage) { + return OplogResult(_lastFetchedNewWriteOplogImage.get(), false); + } return OplogResult(_lastFetchedNewWriteOplog, true); } } @@ -335,6 +382,16 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op { stdx::lock_guard<Latch> lk(_newOplogMutex); + if (_lastFetchedNewWriteOplogImage) { + // When `_lastFetchedNewWriteOplogImage` is set, it means we found an oplog entry with + // `needsRetryImage`. At this step, we've already returned the image document, but we + // have yet to return the original oplog entry stored in `_lastFetchedNewWriteOplog`. We + // will unset this value and return such that the next call to `getLastFetchedOplog` + // will return `_lastFetchedNewWriteOplog`. + _lastFetchedNewWriteOplogImage.reset(); + return true; + } + if (_newWriteOpTimeList.empty()) { _lastFetchedNewWriteOplog.reset(); return false; @@ -355,7 +412,6 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op << nextOpTimeToFetch.toBSON(), !newWriteOplogDoc.isEmpty()); - auto newWriteOplogEntry = uassertStatusOK(repl::OplogEntry::parse(newWriteOplogDoc)); // If this oplog entry corresponds to transaction prepare/commit, replace it with a sentinel @@ -367,10 +423,19 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op opCtx->getServiceContext()->getFastClockSource()->now()); } + boost::optional<repl::OplogEntry> forgedNoopImage; + if (newWriteOplogEntry.getNeedsRetryImage()) { + forgedNoopImage = forgeNoopEntryFromImageCollection(opCtx, newWriteOplogEntry); + } + { stdx::lock_guard<Latch> lk(_newOplogMutex); _lastFetchedNewWriteOplog = newWriteOplogEntry; _newWriteOpTimeList.pop_front(); + + if (forgedNoopImage) { + _lastFetchedNewWriteOplogImage = forgedNoopImage; + } } return true; diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index df0d9d80259..eacfa916ae6 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -248,6 +248,9 @@ private: // Used to store the last fetched oplog. This enables calling get multiple times. boost::optional<repl::OplogEntry> _lastFetchedOplog; + // Used to store an image when `_lastFetchedNewWriteOplog` has a `needsRetryImage` field. + boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage; + // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex"); |