summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2021-05-11 11:20:19 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-14 15:03:33 +0000
commitfc57a64f959668b04ff7c7f2efdd68ce95a11736 (patch)
tree91d37d6bc4cd4f05d4582838119952f588d588c7
parent955cd836df419fc13c453d56e67f7a98981fc1fb (diff)
downloadmongo-fc57a64f959668b04ff7c7f2efdd68ce95a11736.tar.gz
SERVER-56563 Forge noop image oplog entries for chunk migration
-rw-r--r--jstests/replsets/retryable_writes_initial_sync.js2
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp30
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp64
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h3
5 files changed, 95 insertions, 6 deletions
diff --git a/jstests/replsets/retryable_writes_initial_sync.js b/jstests/replsets/retryable_writes_initial_sync.js
index b6a663dc319..ce9bc2448f2 100644
--- a/jstests/replsets/retryable_writes_initial_sync.js
+++ b/jstests/replsets/retryable_writes_initial_sync.js
@@ -1,6 +1,8 @@
/**
* Tests that retryable findAndModify data stored in the `config.image_collection` side collection
* do not get populated by nodes doing oplog application while in initial sync.
+ *
+ * @tags: [uses_transactions]
*/
(function() {
'use strict';
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 2883d5a8f5e..ade41c4e059 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -55,7 +55,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
const repl::OplogEntry& oplogWithCorrectLinks) {
auto opType = oplogEntry.getOpType();
auto ts = oplogEntry.getTimestamp();
- const auto needsRetryImage = oplogEntry.getNeedsRetryImage();
+ const auto needsRetryImage = oplogWithCorrectLinks.getNeedsRetryImage();
if (opType == repl::OpTypeEnum::kDelete) {
uassert(
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index c35928016d8..82eda55c6e1 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -234,7 +234,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
result.isPrePostImage = true;
uassert(40632,
- str::stream() << "Can't handle 2 pre/post image oplog in a row. Prevoius oplog "
+ str::stream() << "Can't handle 2 pre/post image oplog in a row. Previous oplog "
<< lastResult.oplogTime.getTimestamp().toString()
<< ", oplog ts: "
<< oplogEntry.getTimestamp().toString()
@@ -263,6 +263,32 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
BSONObj object(result.isPrePostImage
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
+ boost::optional<repl::RetryImageEnum> needsRetryImage;
+ if (oplogEntry.getNeedsRetryImage()) {
+ if (!lastResult.isPrePostImage) {
+ // The source was unable to find the matching image entry for this oplog entry. In
+ // this case, we do not have a pre- or post- image oplog link so we should set the
+ // 'needsRetryImage' field to avoid validation errors on retries of this operation.
+ needsRetryImage = oplogEntry.getNeedsRetryImage().get();
+ } else {
+ // Downconvert the oplog entry by removing the `needsRetryImage` field and appending a
+ // pre- or post- image opTime link.
+ BSONObj newBson =
+ oplogEntry.toBSON().removeField(repl::OplogEntryBase::kNeedsRetryImageFieldName);
+ BSONObjBuilder appendImageOpTimeBuilder(std::move(newBson));
+ switch (*oplogEntry.getNeedsRetryImage()) {
+ case repl::RetryImageEnum::kPreImage:
+ appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPreImageOpTimeFieldName,
+ lastResult.oplogTime.toBSON());
+ break;
+ case repl::RetryImageEnum::kPostImage:
+ appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPostImageOpTimeFieldName,
+ lastResult.oplogTime.toBSON());
+ break;
+ }
+ oplogEntry = parseOplog(appendImageOpTimeBuilder.obj().getOwned());
+ }
+ }
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevOpTime = session->getLastWriteOpTime(result.txnNum);
@@ -292,7 +318,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
stmtId,
oplogLink,
OplogSlot(),
- {});
+ needsRetryImage);
const auto& oplogOpTime = result.oplogTime;
uassert(40633,
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 7d451b0b81e..f3148280cb3 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/session.h"
@@ -52,8 +53,47 @@ namespace {
PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64());
+boost::optional<repl::OplogEntry> forgeNoopEntryFromImageCollection(const repl::OplogEntry& entry,
+ OperationContext* opCtx) {
+ invariant(entry.getNeedsRetryImage());
+ DBDirectClient client(opCtx);
+ const auto query = BSON("_id" << entry.getSessionId()->toBSON());
+ auto imageDoc = client.findOne(NamespaceString::kConfigImagesNamespace.ns(), query);
+ if (imageDoc.isEmpty()) {
+ return boost::none;
+ }
+ auto imageEntry = repl::ImageEntry::parse(
+ IDLParserErrorContext("Parse image entry in nextSessionMigrationBatch"), imageDoc);
+ if (imageEntry.getTxnNumber() != *entry.getTxnNumber()) {
+ // In our snapshot, fetch the current transaction number for a session. If that transaction
+ // number doesn't match what's found on the image lookup, it implies that the image is not
+ // the correct version for this oplog entry. We will not forge a noop from it.
+ return boost::none;
+ }
+
+ BSONObjBuilder b;
+ // The opTime, namespace, and wall fields are expected to get overwritten by the
+ // destination.
+ b.append("ts", Timestamp::min());
+ b.append("t", -1LL);
+ b.appendDate("wall", Date_t::now());
+ b.append("ns", entry.getNamespace().ns());
+ b.append("op", OpType_serializer(repl::OpTypeEnum::kNoop));
+ b.append("v", repl::OplogEntry::kOplogVersion);
+ // The 'h' field is used for pv0 which is now deprecated.
+ b.append("h", 0LL);
+ b.append("o", imageEntry.getImage());
+ b.append(repl::OplogEntryBase::kSessionIdFieldName, imageEntry.get_id().toBSON());
+ b.append(repl::OplogEntryBase::kTxnNumberFieldName, imageEntry.getTxnNumber());
+ b.append(repl::OplogEntryBase::kStatementIdFieldName, *entry.getStatementId());
+ return uassertStatusOK(repl::OplogEntry::parse(b.obj()));
+}
+
boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx,
const repl::OplogEntry& oplog) {
+ if (oplog.getNeedsRetryImage()) {
+ return forgeNoopEntryFromImageCollection(oplog, opCtx);
+ }
auto opTimeToFetch = oplog.getPreImageOpTime();
if (!opTimeToFetch) {
@@ -201,6 +241,9 @@ SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLas
{
stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+ if (_lastFetchedNewWriteOplogImage) {
+ return OplogResult(_lastFetchedNewWriteOplogImage.get(), false);
+ }
return OplogResult(_lastFetchedNewWriteOplog, true);
}
}
@@ -312,6 +355,14 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
{
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+ if (_lastFetchedNewWriteOplogImage) {
+ // This indicates that the last consumed oplog entry was a forged image oplog. We must
+ // not buffer the next oplog entry until we have consumed the original oplog entry that
+ // generated the forged image.
+ _lastFetchedNewWriteOplogImage.reset();
+ return true;
+ }
+
if (_newWriteOpTimeList.empty()) {
_lastFetchedNewWriteOplog.reset();
return false;
@@ -330,13 +381,20 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
str::stream() << "Unable to fetch oplog entry with opTime: "
<< nextOpTimeToFetch.toBSON(),
!newWriteOplog.isEmpty());
-
+ auto newWriteOplogEntry = uassertStatusOK(repl::OplogEntry::parse(newWriteOplog));
+ boost::optional<repl::OplogEntry> forgedNoopImage;
+ if (newWriteOplogEntry.getNeedsRetryImage()) {
+ forgedNoopImage = forgeNoopEntryFromImageCollection(newWriteOplogEntry, opCtx);
+ }
{
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
- _lastFetchedNewWriteOplog = uassertStatusOK(repl::OplogEntry::parse(newWriteOplog));
+ _lastFetchedNewWriteOplog = newWriteOplogEntry;
_newWriteOpTimeList.pop_front();
- }
+ if (forgedNoopImage) {
+ _lastFetchedNewWriteOplogImage = forgedNoopImage;
+ }
+ }
return true;
}
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 148c5c29d60..032f6af3ccf 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -238,6 +238,9 @@ private:
// Used to store the last fetched oplog. This enables calling get multiple times.
boost::optional<repl::OplogEntry> _lastFetchedOplog;
+ // Used to store an image when `_lastFetchedNewWriteOplog` has a `needsRetryImage` field.
+ boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage;
+
// Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification
stdx::mutex _newOplogMutex;