summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Shuvalov <andrew.shuvalov@mongodb.com>2021-05-28 18:45:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-28 19:05:18 +0000
commitd9aee7e28be05a49e092b4acd483906eef7e735e (patch)
tree517569cd1219c84b7a39c75b56165ae790be8973
parent2d6c0ba951c9a0d7c327d7df7d3b282d8ef177a7 (diff)
downloadmongo-d9aee7e28be05a49e092b4acd483906eef7e735e.tar.gz
SERVER-56563: [RRFaM] Forge noop image oplog entries for chunk migration
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp2
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp30
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp67
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h3
5 files changed, 99 insertions, 4 deletions
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 619aed300f8..92a050ae717 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -53,7 +53,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
const repl::OplogEntry& oplogWithCorrectLinks) {
auto opType = oplogEntry.getOpType();
auto ts = oplogEntry.getTimestamp();
- const bool needsRetryImage = oplogEntry.getNeedsRetryImage().is_initialized();
+ const bool needsRetryImage = oplogWithCorrectLinks.getNeedsRetryImage().is_initialized();
if (opType == repl::OpTypeEnum::kDelete) {
uassert(
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 9878bb9fbe3..126692909c6 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -104,6 +104,7 @@ env.Library(
'transaction_coordinator',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/repl/image_collection_entry',
'$BUILD_DIR/mongo/db/session_catalog',
'$BUILD_DIR/mongo/idl/server_parameter',
],
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 6a7a3ee75de..fa66fe265f9 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -220,7 +220,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
result.isPrePostImage = true;
uassert(40632,
- str::stream() << "Can't handle 2 pre/post image oplog in a row. Prevoius oplog "
+ str::stream() << "Can't handle 2 pre/post image oplog in a row. Previous oplog "
<< lastResult.oplogTime.getTimestamp().toString()
<< ", oplog ts: " << oplogEntry.getTimestamp().toString() << ": "
<< oplogBSON,
@@ -263,6 +263,32 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
BSONObj object(result.isPrePostImage
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
+ boost::optional<repl::RetryImageEnum> needsRetryImage;
+ if (oplogEntry.getNeedsRetryImage()) {
+ if (!lastResult.isPrePostImage) {
+ // The source was unable to find the matching image entry for this oplog entry. In
+ // this case, we do not have a pre- or post- image oplog link so we should set the
+ // 'needsRetryImage' field to avoid validation errors on retries of this operation.
+ needsRetryImage = oplogEntry.getNeedsRetryImage().get();
+ } else {
+ // Downconvert the oplog entry by removing the `needsRetryImage` field and appending a
+ // pre- or post- image opTime link.
+ BSONObj newBson =
+ oplogEntry.toBSON().removeField(repl::OplogEntryBase::kNeedsRetryImageFieldName);
+ BSONObjBuilder appendImageOpTimeBuilder(std::move(newBson));
+ switch (*oplogEntry.getNeedsRetryImage()) {
+ case repl::RetryImageEnum::kPreImage:
+ appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPreImageOpTimeFieldName,
+ lastResult.oplogTime.toBSON());
+ break;
+ case repl::RetryImageEnum::kPostImage:
+ appendImageOpTimeBuilder.append(repl::OplogEntryBase::kPostImageOpTimeFieldName,
+ lastResult.oplogTime.toBSON());
+ break;
+ }
+ oplogEntry = parseOplog(appendImageOpTimeBuilder.obj().getOwned());
+ }
+ }
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime();
@@ -292,7 +318,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
stmtId,
oplogLink,
OplogSlot(),
- {});
+ needsRetryImage);
const auto& oplogOpTime = result.oplogTime;
uassert(40633,
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 30bf462209c..7c3ac3b2fd3 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/session.h"
@@ -53,8 +54,51 @@ namespace {
PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64());
+boost::optional<repl::OplogEntry> forgeNoopEntryFromImageCollection(
+ OperationContext* opCtx, const repl::OplogEntry& retryableFindAndModifyOplogEntry) {
+ invariant(retryableFindAndModifyOplogEntry.getNeedsRetryImage());
+
+ DBDirectClient client(opCtx);
+ BSONObj imageObj =
+ client.findOne(NamespaceString::kConfigImagesNamespace.ns(),
+ BSON("_id" << retryableFindAndModifyOplogEntry.getSessionId()->toBSON()),
+ nullptr);
+ if (imageObj.isEmpty()) {
+ return boost::none;
+ }
+
+ auto imageEntry = repl::ImageEntry::parse(IDLParserErrorContext("image entry"), imageObj);
+ if (imageEntry.getTxnNumber() != retryableFindAndModifyOplogEntry.getTxnNumber()) {
+ // In our snapshot, fetch the current transaction number for a session. If that transaction
+ // number doesn't match what's found on the image lookup, it implies that the image is not
+ // the correct version for this oplog entry. We will not forge a noop from it.
+ return boost::none;
+ }
+
+ BSONObjBuilder b;
+ // The opTime, namespace, and wall fields are expected to get overwritten by the
+ // destination.
+ b.append("ts", Timestamp::min());
+ b.append("t", -1LL);
+ b.appendDate("wall", Date_t::now());
+ b.append("ns", retryableFindAndModifyOplogEntry.getNss().ns());
+ b.append("op", OpType_serializer(repl::OpTypeEnum::kNoop));
+ b.append("v", repl::OplogEntry::kOplogVersion);
+ // The 'h' field is used for pv0 which is now deprecated.
+ b.append("h", 0LL);
+ b.append("o", imageEntry.getImage());
+ b.append(repl::OplogEntryBase::kSessionIdFieldName, imageEntry.get_id().toBSON());
+ b.append(repl::OplogEntryBase::kTxnNumberFieldName, imageEntry.getTxnNumber());
+ b.append(repl::OplogEntryBase::kStatementIdFieldName,
+ *retryableFindAndModifyOplogEntry.getStatementId());
+ return uassertStatusOK(repl::OplogEntry::parse(b.obj()));
+}
+
boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx,
const repl::OplogEntry& oplog) {
+ if (oplog.getNeedsRetryImage()) {
+ return forgeNoopEntryFromImageCollection(opCtx, oplog);
+ }
auto opTimeToFetch = oplog.getPreImageOpTime();
if (!opTimeToFetch) {
@@ -214,6 +258,9 @@ SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLas
{
stdx::lock_guard<Latch> _lk(_newOplogMutex);
+ if (_lastFetchedNewWriteOplogImage) {
+ return OplogResult(_lastFetchedNewWriteOplogImage.get(), false);
+ }
return OplogResult(_lastFetchedNewWriteOplog, true);
}
}
@@ -335,6 +382,16 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
{
stdx::lock_guard<Latch> lk(_newOplogMutex);
+ if (_lastFetchedNewWriteOplogImage) {
+ // When `_lastFetchedNewWriteOplogImage` is set, it means we found an oplog entry with
+ // `needsRetryImage`. At this step, we've already returned the image document, but we
+ // have yet to return the original oplog entry stored in `_lastFetchedNewWriteOplog`. We
+ // will unset this value and return such that the next call to `getLastFetchedOplog`
+ // will return `_lastFetchedNewWriteOplog`.
+ _lastFetchedNewWriteOplogImage.reset();
+ return true;
+ }
+
if (_newWriteOpTimeList.empty()) {
_lastFetchedNewWriteOplog.reset();
return false;
@@ -355,7 +412,6 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
<< nextOpTimeToFetch.toBSON(),
!newWriteOplogDoc.isEmpty());
-
auto newWriteOplogEntry = uassertStatusOK(repl::OplogEntry::parse(newWriteOplogDoc));
// If this oplog entry corresponds to transaction prepare/commit, replace it with a sentinel
@@ -367,10 +423,19 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
opCtx->getServiceContext()->getFastClockSource()->now());
}
+ boost::optional<repl::OplogEntry> forgedNoopImage;
+ if (newWriteOplogEntry.getNeedsRetryImage()) {
+ forgedNoopImage = forgeNoopEntryFromImageCollection(opCtx, newWriteOplogEntry);
+ }
+
{
stdx::lock_guard<Latch> lk(_newOplogMutex);
_lastFetchedNewWriteOplog = newWriteOplogEntry;
_newWriteOpTimeList.pop_front();
+
+ if (forgedNoopImage) {
+ _lastFetchedNewWriteOplogImage = forgedNoopImage;
+ }
}
return true;
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index df0d9d80259..eacfa916ae6 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -248,6 +248,9 @@ private:
// Used to store the last fetched oplog. This enables calling get multiple times.
boost::optional<repl::OplogEntry> _lastFetchedOplog;
+ // Used to store an image when `_lastFetchedNewWriteOplog` has a `needsRetryImage` field.
+ boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage;
+
// Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification
Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex");