summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2021-05-13 23:01:39 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-14 14:44:26 +0000
commit9fe221be9a9b4bb04c6d9e8727e5760d1813b408 (patch)
tree70502b6c984674ed92071b2d15ee3bd30c46ec3d /src/mongo/db/s
parent7462cb443b3ef72b41ad6ca4defe0b45bc18495a (diff)
downloadmongo-9fe221be9a9b4bb04c6d9e8727e5760d1813b408.tar.gz
SERVER-56563: Downconvert `needsRetryImage` oplog entries for chunk migrations.
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp17
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp95
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h3
4 files changed, 112 insertions, 4 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index c1ae47f7027..0c50ffb75ff 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -141,6 +141,7 @@ env.Library(
'$BUILD_DIR/mongo/client/clientdriver_minimal',
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/db/repl/image_collection_entry',
'$BUILD_DIR/mongo/db/rs_local_client',
'$BUILD_DIR/mongo/db/session_catalog',
'$BUILD_DIR/mongo/idl/server_parameter',
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 1a3f06a7e42..80226a9c463 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -107,6 +107,23 @@ void setPrePostImageTs(const ProcessOplogResult& lastResult, repl::MutableOplogE
<< redact(entry->toBSON()) << " to have txnNumber: " << lastResult.txnNum,
lastResult.txnNum == entry->getTxnNumber());
+ // PM-2213 introduces oplog entries that link to pre/post images in the
+ // `config.image_collection` table. For chunk migration, we downconvert to the classic format
+ // where the image is stored as a no-op in the oplog. A chunk migration source will always send
+ // the appropriate no-op. This code on the destination patches up the CRUD operation oplog entry
+ // to look like the classic format.
+ if (entry->getNeedsRetryImage()) {
+ switch (entry->getNeedsRetryImage().get()) {
+ case repl::RetryImageEnum::kPreImage:
+ entry->setPreImageOpTime({repl::OpTime()});
+ break;
+ case repl::RetryImageEnum::kPostImage:
+ entry->setPostImageOpTime({repl::OpTime()});
+ break;
+ }
+ entry->setNeedsRetryImage(boost::none);
+ }
+
if (entry->getPreImageOpTime()) {
entry->setPreImageOpTime(lastResult.oplogTime);
} else if (entry->getPostImageOpTime()) {
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 5090df2b309..6b953f2c6ae 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/session.h"
@@ -54,12 +55,66 @@ namespace {
PseudoRandom hashGenerator(SecureRandom().nextInt64());
+boost::optional<repl::OplogEntry> forgeNoopEntryFromImageCollection(
+ OperationContext* opCtx, const repl::OplogEntry retryableFindAndModifyOplogEntry) {
+ invariant(retryableFindAndModifyOplogEntry.getNeedsRetryImage());
+
+ DBDirectClient client(opCtx);
+ BSONObj imageObj =
+ client.findOne(NamespaceString::kConfigImagesNamespace.ns(),
+ BSON("_id" << retryableFindAndModifyOplogEntry.getSessionId()->toBSON()),
+ nullptr);
+ if (imageObj.isEmpty()) {
+ return boost::none;
+ }
+
+ auto image = repl::ImageEntry::parse(IDLParserErrorContext("image entry"), imageObj);
+ if (image.getTxnNumber() != retryableFindAndModifyOplogEntry.getTxnNumber()) {
+ // In our snapshot, fetch the current transaction number for a session. If that transaction
+ // number doesn't match what's found on the image lookup, it implies that the image is not
+ // the correct version for this oplog entry. We will not forge a noop from it.
+ return boost::none;
+ }
+
+ repl::MutableOplogEntry forgedNoop;
+ forgedNoop.setSessionId(image.get_id());
+ forgedNoop.setTxnNumber(image.getTxnNumber());
+ forgedNoop.setObject(image.getImage());
+ forgedNoop.setOpType(repl::OpTypeEnum::kNoop);
+ // The wallclock and namespace are not directly available on the txn document when
+ // forging the noop image document.
+ forgedNoop.setWallClockTime(Date_t::now());
+ forgedNoop.setNss(retryableFindAndModifyOplogEntry.getNss());
+ forgedNoop.setUuid(retryableFindAndModifyOplogEntry.getUuid());
+ // The OpTime is probably the last write time, but the destination will overwrite this
+ // anyways. Just set an OpTime to satisfy the IDL constraints for calling `toBSON`.
+ repl::OpTimeBase opTimeBase(Timestamp::min());
+ opTimeBase.setTerm(-1);
+ forgedNoop.setOpTimeBase(opTimeBase);
+ forgedNoop.setStatementIds({0});
+ forgedNoop.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp::min(), -1));
+ return repl::OplogEntry::parse(forgedNoop.toBSON()).getValue();
+}
+
boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx,
- const repl::OplogEntry& oplog) {
- auto opTimeToFetch = oplog.getPreImageOpTime();
+ repl::OplogEntry* oplog) {
+ if (oplog->getNeedsRetryImage()) {
+ auto ret = forgeNoopEntryFromImageCollection(opCtx, *oplog);
+ if (ret == boost::none) {
+ // No pre/post image was found. Defensively strip the `needsRetryImage` value to remove
+ // any notion this operation was a retryable findAndModify. If the request is retried on
+ // the destination, it will surface an error to the user.
+ auto mutableOplog =
+ fassert(5676405, repl::MutableOplogEntry::parse(oplog->getEntry().toBSON()));
+ mutableOplog.setNeedsRetryImage(boost::none);
+ *oplog = repl::OplogEntry(mutableOplog.toBSON());
+ }
+ return ret;
+ }
+ auto opTimeToFetch = oplog->getPreImageOpTime();
if (!opTimeToFetch) {
- opTimeToFetch = oplog.getPostImageOpTime();
+ opTimeToFetch = oplog->getPostImageOpTime();
}
if (!opTimeToFetch) {
@@ -223,6 +278,10 @@ SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLas
{
stdx::lock_guard<Latch> _lk(_newOplogMutex);
+ if (_lastFetchedNewWriteOplogImage) {
+ return OplogResult(_lastFetchedNewWriteOplogImage.get(), false);
+ }
+
return OplogResult(_lastFetchedNewWriteOplog, true);
}
}
@@ -282,7 +341,7 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
}
}
- auto doc = fetchPrePostImageOplog(opCtx, *nextOplog);
+ auto doc = fetchPrePostImageOplog(opCtx, &(nextOplog.get()));
if (doc) {
_lastFetchedOplogBuffer.push_back(*nextOplog);
_lastFetchedOplog = *doc;
@@ -342,6 +401,15 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
{
stdx::lock_guard<Latch> lk(_newOplogMutex);
+ if (_lastFetchedNewWriteOplogImage) {
+ // When `_lastFetchedNewWriteOplogImage` is set, it means we found an oplog entry with
+ // `needsRetryImage`. At this step, we've already returned the image document, but we
+ // have yet to return the original oplog entry stored in `_lastFetchedNewWriteOplog`. We
+ // will unset this value and return such that the next call to `getLastFetchedOplog`
+ // will return `_lastFetchedNewWriteOplog`.
+ _lastFetchedNewWriteOplogImage.reset();
+ return true;
+ }
if (_newWriteOpTimeList.empty()) {
_lastFetchedNewWriteOplog.reset();
@@ -371,10 +439,29 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
opCtx->getServiceContext()->getFastClockSource()->now());
}
+ boost::optional<repl::OplogEntry> forgedNoopImage;
+ if (newWriteOplogEntry.getNeedsRetryImage()) {
+ // Generate the image outside of the mutex. Assign it atomically with the actual oplog
+ // entry.
+ forgedNoopImage = forgeNoopEntryFromImageCollection(opCtx, newWriteOplogEntry);
+ if (forgedNoopImage == boost::none) {
+ // No pre/post image was found. Defensively strip the `needsRetryImage` value to remove
+ // any notion this operation was a retryable findAndModify. If the request is retried on
+ // the destination, it will surface an error to the user.
+ auto mutableOplog = fassert(5676404, repl::MutableOplogEntry::parse(newWriteOplogDoc));
+ mutableOplog.setNeedsRetryImage(boost::none);
+ newWriteOplogEntry = repl::OplogEntry(mutableOplog.toBSON());
+ }
+ }
+
{
stdx::lock_guard<Latch> lk(_newOplogMutex);
_lastFetchedNewWriteOplog = newWriteOplogEntry;
_newWriteOpTimeList.pop_front();
+
+ if (forgedNoopImage) {
+ _lastFetchedNewWriteOplogImage = forgedNoopImage;
+ }
}
return true;
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index df0d9d80259..9e138af12a1 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -258,6 +258,9 @@ private:
// Used to store the last fetched oplog from _newWriteTsList.
boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog;
+ // Used to store an image when `_lastFetchedNewWriteOplog` has a `needsRetryImage` field.
+ boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage;
+
// Stores the current state.
State _state{State::kActive};