summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-03-18 23:52:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-19 00:24:26 +0000
commitc141ef8536d51f05a6fa4017de20286d154d09e1 (patch)
tree022bd1ffe70d91657498a2237ca8e49ed12de74b /src
parent6f2f79447a83f1cccb208d815e049699c8f86fbe (diff)
downloadmongo-c141ef8536d51f05a6fa4017de20286d154d09e1.tar.gz
SERVER-63494 Transfer history for retryable transactions with more than one oplog entry across migrations
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/op_observer_impl.cpp5
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp24
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp4
-rw-r--r--src/mongo/db/ops/write_ops_retryability.h6
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp7
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp296
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h53
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp605
8 files changed, 918 insertions, 82 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 2d21959e896..a29fe4b790d 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1508,7 +1508,6 @@ int logOplogEntriesForTransaction(
const auto txnParticipant = TransactionParticipant::get(opCtx);
OpTimeBundle prevWriteOpTime;
- auto numEntriesWritten = 0;
// Writes to the oplog only require a Global intent lock. Guaranteed by
// OplogSlotReserver.
@@ -1528,6 +1527,7 @@ int logOplogEntriesForTransaction(
MutableOplogEntry imageEntry;
imageEntry.setSessionId(*opCtx->getLogicalSessionId());
imageEntry.setTxnNumber(*opCtx->getTxnNumber());
+ imageEntry.setStatementIds(statement.getStatementIds());
imageEntry.setOpType(repl::OpTypeEnum::kNoop);
imageEntry.setObject(imageDoc);
imageEntry.setNss(statement.getNss());
@@ -1674,10 +1674,9 @@ int logOplogEntriesForTransaction(
// Advance the iterator to the beginning of the remaining unpacked statements.
stmtsIter = nextStmt;
- numEntriesWritten++;
}
- return numEntriesWritten;
+ return currOplogSlot - oplogSlots.begin();
}
void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 03ea1dd3481..5aff3904cb7 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -1938,6 +1938,14 @@ protected:
if (updateOplogEntry.getTxnNumber()) {
ASSERT_EQ(*updateOplogEntry.getTxnNumber(), *preImage.getTxnNumber());
}
+ if (!updateOplogEntry.getStatementIds().empty()) {
+ const auto& updateOplogStmtIds = updateOplogEntry.getStatementIds();
+ const auto& preImageOplogStmtIds = preImage.getStatementIds();
+ ASSERT_EQ(updateOplogStmtIds.size(), preImageOplogStmtIds.size());
+ for (size_t i = 0; i < updateOplogStmtIds.size(); i++) {
+ ASSERT_EQ(updateOplogStmtIds[i], preImageOplogStmtIds[i]);
+ }
+ }
} else {
ASSERT_FALSE(updateOplogEntry.getPreImageOpTime());
}
@@ -1967,6 +1975,14 @@ protected:
if (updateOplogEntry.getTxnNumber()) {
ASSERT_EQ(*updateOplogEntry.getTxnNumber(), *postImage.getTxnNumber());
}
+ if (!updateOplogEntry.getStatementIds().empty()) {
+ const auto& updateOplogStmtIds = updateOplogEntry.getStatementIds();
+ const auto& postImageOplogStmtIds = postImage.getStatementIds();
+ ASSERT_EQ(updateOplogStmtIds.size(), postImageOplogStmtIds.size());
+ for (size_t i = 0; i < updateOplogStmtIds.size(); i++) {
+ ASSERT_EQ(updateOplogStmtIds[i], postImageOplogStmtIds[i]);
+ }
+ }
} else {
ASSERT_FALSE(updateOplogEntry.getPostImageOpTime());
}
@@ -2384,6 +2400,14 @@ protected:
if (deleteOplogEntry.getTxnNumber()) {
ASSERT_EQ(*deleteOplogEntry.getTxnNumber(), *preImage.getTxnNumber());
}
+ if (!deleteOplogEntry.getStatementIds().empty()) {
+ const auto& deleteOplogStmtIds = deleteOplogEntry.getStatementIds();
+ const auto& preImageOplogStmtIds = preImage.getStatementIds();
+ ASSERT_EQ(deleteOplogStmtIds.size(), preImageOplogStmtIds.size());
+ for (size_t i = 0; i < deleteOplogStmtIds.size(); i++) {
+ ASSERT_EQ(deleteOplogStmtIds[i], preImageOplogStmtIds[i]);
+ }
+ }
} else {
ASSERT_FALSE(deleteOplogEntry.getPreImageOpTime());
}
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index fb6d803712c..41dcdd121f3 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -257,7 +257,7 @@ SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry) {
res.setN(1);
res.setNModified(1);
} else if (entry.getOpType() == repl::OpTypeEnum::kNoop) {
- if (entry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0) {
+ if (isWouldChangeOwningShardSentinelOplogEntry(entry)) {
uasserted(ErrorCodes::IncompleteTransactionHistory,
kWouldChangeOwningShardRetryContext);
}
@@ -281,7 +281,7 @@ write_ops::FindAndModifyCommandReply parseOplogEntryForFindAndModify(
// Migrated op and WouldChangeOwningShard sentinel case.
if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) {
- if (oplogEntry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0) {
+ if (isWouldChangeOwningShardSentinelOplogEntry(oplogEntry)) {
uasserted(ErrorCodes::IncompleteTransactionHistory,
kWouldChangeOwningShardRetryContext);
}
diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h
index de808d3e067..ad3be539fd7 100644
--- a/src/mongo/db/ops/write_ops_retryability.h
+++ b/src/mongo/db/ops/write_ops_retryability.h
@@ -40,6 +40,12 @@ class OperationContext;
const BSONObj kWouldChangeOwningShardSentinel(BSON("$wouldChangeOwningShard" << 1));
+template <typename OplogEntryType>
+bool isWouldChangeOwningShardSentinelOplogEntry(const OplogEntryType& oplogEntry) {
+ return (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) &&
+ (oplogEntry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0);
+}
+
/**
* Returns the single write result corresponding to the given oplog entry for document update. I.e.,
* the single write result that would have been returned by the statement that would have resulted
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 0b605ff92a0..afa092fe137 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
@@ -216,6 +217,10 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
// and o2 will be empty.
// (3) Oplog entries that are a dead sentinel, which the donor sent over as the replacement
// for a prepare oplog entry or unprepared transaction commit oplog entry.
+ // (4) Oplog entries that are a WouldChangeOwningShard sentinel entry, used for making
+ // retries of a WouldChangeOwningShard update or findAndModify fail with
+ // IncompleteTransactionHistory. In this case, the o field is non-empty and the o2
+ // field is an empty BSONObj.
BSONObj object2;
if (oplogEntry.getObject2()) {
@@ -224,7 +229,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
oplogEntry.setObject2(object2);
}
- if (object2.isEmpty()) {
+ if (object2.isEmpty() && !isWouldChangeOwningShardSentinelOplogEntry(oplogEntry)) {
result.isPrePostImage = true;
uassert(40632,
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 36785825a04..a79cefc1973 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -38,6 +38,8 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -187,6 +189,24 @@ repl::OplogEntry makeSentinelOplogEntry(const LogicalSessionId& lsid,
{kIncompleteHistoryStmtId}); // statement id
}
+/**
+ * If the given oplog entry is an oplog entry for a retryable internal transaction, returns a copy
+ * of it but with the session id and transaction number set to the session id and transaction number
+ * of the retryable write that it corresponds to. Otherwise, returns the original oplog entry.
+ */
+repl::OplogEntry downConvertSessionInfoIfNeeded(const repl::OplogEntry& oplogEntry) {
+ const auto sessionId = oplogEntry.getSessionId();
+ if (isInternalSessionForRetryableWrite(*sessionId)) {
+ auto mutableOplogEntry =
+ fassert(6349401, repl::MutableOplogEntry::parse(oplogEntry.getEntry().toBSON()));
+ mutableOplogEntry.setSessionId(*getParentSessionId(*sessionId));
+ mutableOplogEntry.setTxnNumber(*sessionId->getTxnNumber());
+
+ return {mutableOplogEntry.toBSON()};
+ }
+ return oplogEntry;
+}
+
} // namespace
SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* opCtx,
@@ -309,7 +329,9 @@ void SessionCatalogMigrationSource::onCloneCleanup() {
SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() {
{
stdx::lock_guard<Latch> _lk(_sessionCloneMutex);
- if (_lastFetchedOplog) {
+ if (_lastFetchedOplogImage) {
+ return OplogResult(_lastFetchedOplogImage, false);
+ } else if (_lastFetchedOplog) {
return OplogResult(_lastFetchedOplog, false);
}
}
@@ -317,9 +339,8 @@ SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLas
{
stdx::lock_guard<Latch> _lk(_newOplogMutex);
if (_lastFetchedNewWriteOplogImage) {
- return OplogResult(_lastFetchedNewWriteOplogImage.get(), false);
+ return OplogResult(_lastFetchedNewWriteOplogImage, false);
}
-
return OplogResult(_lastFetchedNewWriteOplog, true);
}
}
@@ -389,38 +410,105 @@ bool SessionCatalogMigrationSource::shouldSkipOplogEntry(const mongo::repl::Oplo
return false;
}
-bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) {
+void SessionCatalogMigrationSource::_extractOplogEntriesForInternalTransactionForRetryableWrite(
+ WithLock,
+ const repl::OplogEntry& applyOpsOplogEntry,
+ std::vector<repl::OplogEntry>* oplogBuffer) {
+ invariant(isInternalSessionForRetryableWrite(*applyOpsOplogEntry.getSessionId()));
+ invariant(applyOpsOplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+
+ auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(applyOpsOplogEntry.getObject());
+ auto unrolledOp =
+ uassertStatusOK(repl::MutableOplogEntry::parse(applyOpsOplogEntry.getEntry().toBSON()));
+
+ for (const auto& innerOp : applyOpsInfo.getOperations()) {
+ auto replOp = repl::ReplOperation::parse(
+ {"SessionOplogIterator::_extractOplogEntriesForInternalTransactionForRetryableWrite"},
+ innerOp);
+
+ if (replOp.getStatementIds().empty()) {
+ // Skip this operation since it is not retryable.
+ continue;
+ }
+
+ if (replOp.getNss() != _ns && !isWouldChangeOwningShardSentinelOplogEntry(replOp)) {
+ // Skip this operation since it does not involve the namespace being migrated.
+ continue;
+ }
+
+ unrolledOp.setDurableReplOperation(replOp);
+ auto unrolledOplogEntry = repl::OplogEntry(unrolledOp.toBSON());
+
+ if (shouldSkipOplogEntry(unrolledOplogEntry, _keyPattern, _chunkRange)) {
+ continue;
+ }
+
+ oplogBuffer->emplace_back(unrolledOplogEntry);
+ }
+}
+
+bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock lk, OperationContext* opCtx) {
while (_currentOplogIterator) {
- if (auto nextOplog = _currentOplogIterator->getNext(opCtx)) {
+ if (_unprocessedOplogBuffer.empty()) {
+ // The oplog buffer is empty. Fetch the next oplog entry from the current session
+ // oplog iterator.
+ auto nextOplog = _currentOplogIterator->getNext(opCtx);
+
+ if (!nextOplog) {
+ _currentOplogIterator.reset();
+ return false;
+ }
+
+ // Determine if this oplog entry should be migrated. If so, add the oplog entry or the
+ // oplog entries derived from it to the oplog buffer.
+
+ if (isInternalSessionForRetryableWrite(*nextOplog->getSessionId())) {
+ invariant(nextOplog->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ // Derive retryable write oplog entries from this retryable internal transaction
+ // applyOps oplog entry, and add them to the oplog buffer.
+ _extractOplogEntriesForInternalTransactionForRetryableWrite(
+ lk, *nextOplog, &_unprocessedOplogBuffer);
+ continue;
+ }
+
+ // We only expect to see two kinds of oplog entries here:
+ // - Dead-end sentinel oplog entries which by design should have stmtId equal to
+ // kIncompleteHistoryStmtId.
+ // - CRUD or noop oplog entries for retryable writes which by design should have a
+ // stmtId.
auto nextStmtIds = nextOplog->getStatementIds();
+ invariant(!nextStmtIds.empty());
// Skip the rest of the chain for this session since the ns is unrelated with the
// current one being migrated. It is ok to not check the rest of the chain because
// retryable writes doesn't allow touching different namespaces.
- if (nextStmtIds.empty() ||
- (nextStmtIds.front() != kIncompleteHistoryStmtId && nextOplog->getNss() != _ns)) {
+ if (nextStmtIds.front() != kIncompleteHistoryStmtId && nextOplog->getNss() != _ns) {
_currentOplogIterator.reset();
return false;
}
- // Skipping an entry here will also result in the pre/post images to also not be sent in
- // the migration as they're handled by 'fetchPrePostImageOplog' below
+ // Skipping an entry here will also result in the pre/post images to also not be
+ // sent in the migration as they're handled by 'fetchPrePostImageOplog' below.
if (shouldSkipOplogEntry(nextOplog.get(), _keyPattern, _chunkRange)) {
continue;
}
- auto doc = fetchPrePostImageOplog(opCtx, &(nextOplog.get()));
- if (doc) {
- _lastFetchedOplogBuffer.push_back(*nextOplog);
- _lastFetchedOplog = *doc;
- } else {
- _lastFetchedOplog = *nextOplog;
- }
+ _unprocessedOplogBuffer.emplace_back(*nextOplog);
+ }
- return true;
- } else {
- _currentOplogIterator.reset();
+ // Peek the next oplog entry in the buffer and process it. We cannot pop the oplog
+ // entry upfront since it may require fetching/forging a pre or post image and the reads
+ // done as part of that can fail with a WriteConflictException error.
+ auto nextOplog = _unprocessedOplogBuffer.back();
+ auto nextImageOplog = fetchPrePostImageOplog(opCtx, &nextOplog);
+ invariant(!_lastFetchedOplogImage);
+ invariant(!_lastFetchedOplog);
+ if (nextImageOplog) {
+ _lastFetchedOplogImage = downConvertSessionInfoIfNeeded(*nextImageOplog);
}
+ _lastFetchedOplog = downConvertSessionInfoIfNeeded(nextOplog);
+ _unprocessedOplogBuffer.pop_back();
+ return true;
}
return false;
@@ -428,16 +516,20 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() {
stdx::lock_guard<Latch> _lk(_sessionCloneMutex);
- return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty() ||
+ return _lastFetchedOplog || !_unprocessedOplogBuffer.empty() ||
!_sessionOplogIterators.empty() || _currentOplogIterator;
}
bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) {
stdx::unique_lock<Latch> lk(_sessionCloneMutex);
- if (!_lastFetchedOplogBuffer.empty()) {
- _lastFetchedOplog = _lastFetchedOplogBuffer.back();
- _lastFetchedOplogBuffer.pop_back();
+ if (_lastFetchedOplogImage) {
+ // When `_lastFetchedOplogImage` is set, it means we found an oplog entry with a pre/post
+ // image. At this step, we've already returned the image oplog entry, but we have yet to
+ // return the original oplog entry stored in `_lastFetchedOplog`. We will unset this value
+ // and return such that the next call to `getLastFetchedOplog` will return
+ // `_lastFetchedOplog`.
+ _lastFetchedOplogImage.reset();
return true;
}
@@ -460,15 +552,16 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
}
bool SessionCatalogMigrationSource::_hasNewWrites(WithLock) {
- return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty();
+ return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty() ||
+ !_unprocessedNewWriteOplogBuffer.empty();
}
bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* opCtx) {
- repl::OpTime nextOpTimeToFetch;
- EntryAtOpTimeType entryAtOpTimeType;
+ boost::optional<repl::OplogEntry> nextNewWriteOplog;
{
- stdx::lock_guard<Latch> lk(_newOplogMutex);
+ stdx::unique_lock<Latch> lk(_newOplogMutex);
+
if (_lastFetchedNewWriteOplogImage) {
// When `_lastFetchedNewWriteOplogImage` is set, it means we found an oplog entry with
// a pre/post image. At this step, we've already returned the image oplog entry, but we
@@ -479,52 +572,97 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
return true;
}
- if (_newWriteOpTimeList.empty()) {
- _lastFetchedNewWriteOplog.reset();
+ _lastFetchedNewWriteOplog.reset();
+
+ if (_unprocessedNewWriteOplogBuffer.empty() && _newWriteOpTimeList.empty()) {
return false;
}
- std::tie(nextOpTimeToFetch, entryAtOpTimeType) = _newWriteOpTimeList.front();
- }
-
- DBDirectClient client(opCtx);
- const auto& newWriteOplogDoc =
- client.findOne(NamespaceString::kRsOplogNamespace, nextOpTimeToFetch.asQuery());
-
- uassert(40620,
- str::stream() << "Unable to fetch oplog entry with opTime: "
- << nextOpTimeToFetch.toBSON(),
- !newWriteOplogDoc.isEmpty());
-
- auto newWriteOplogEntry = uassertStatusOK(repl::OplogEntry::parse(newWriteOplogDoc));
+ if (_unprocessedNewWriteOplogBuffer.empty()) {
+ // The oplog buffer is empty. Peek the next opTime and fetch its oplog entry while not
+ // holding the mutex. We cannot dequeue the opTime upfront since the the read can fail
+ // with a WriteConflictException error.
+ repl::OpTime opTimeToFetch;
+ EntryAtOpTimeType entryAtOpTimeType;
+ std::tie(opTimeToFetch, entryAtOpTimeType) = _newWriteOpTimeList.front();
+
+ lk.unlock();
+ DBDirectClient client(opCtx);
+ const auto& nextNewWriteOplogDoc =
+ client.findOne(NamespaceString::kRsOplogNamespace, opTimeToFetch.asQuery());
+ uassert(40620,
+ str::stream() << "Unable to fetch oplog entry with opTime: "
+ << opTimeToFetch.toBSON(),
+ !nextNewWriteOplogDoc.isEmpty());
+ auto nextNewWriteOplog = uassertStatusOK(repl::OplogEntry::parse(nextNewWriteOplogDoc));
+ lk.lock();
+
+ // Determine if this oplog entry should be migrated. If so, add the oplog entry or the
+ // oplog entries derived from it to the oplog buffer. Finally, dequeue the opTime.
+
+ if (entryAtOpTimeType == EntryAtOpTimeType::kRetryableWrite) {
+ _unprocessedNewWriteOplogBuffer.emplace_back(nextNewWriteOplog);
+ _newWriteOpTimeList.pop_front();
+ } else if (entryAtOpTimeType == EntryAtOpTimeType::kTransaction) {
+ invariant(nextNewWriteOplog.getCommandType() ==
+ repl::OplogEntry::CommandType::kApplyOps);
+ const auto sessionId = *nextNewWriteOplog.getSessionId();
+
+ if (isInternalSessionForNonRetryableWrite(sessionId)) {
+ // TODO (SERVER-64331): Determine if chunk migration should migrate internal
+ // sessions for non-retryable writes.
+ _newWriteOpTimeList.pop_front();
+ return false;
+ }
- // If this oplog entry corresponds to transaction prepare/commit, replace it with a sentinel
- // entry.
- if (entryAtOpTimeType == EntryAtOpTimeType::kTransaction) {
- const auto sessionId = *newWriteOplogEntry.getSessionId();
+ if (isInternalSessionForRetryableWrite(sessionId)) {
+ // Derive retryable write oplog entries from this retryable internal
+ // transaction applyOps oplog entry, and add them to the oplog buffer.
+ _extractOplogEntriesForInternalTransactionForRetryableWrite(
+ lk, nextNewWriteOplog, &_unprocessedNewWriteOplogBuffer);
+ _newWriteOpTimeList.pop_front();
+
+ if (auto prevOpTime = nextNewWriteOplog.getPrevWriteOpTimeInTransaction();
+ prevOpTime && !prevOpTime->isNull()) {
+ // Add the opTime for the previous applyOps oplog entry in the transaction
+ // to the queue.
+ _notifyNewWriteOpTime(lk, *prevOpTime, EntryAtOpTimeType::kTransaction);
+ }
+
+ lk.unlock();
+ return _fetchNextNewWriteOplog(opCtx);
+ }
- if (isInternalSessionForNonRetryableWrite(sessionId)) {
- // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions
- // for non-retryable writes.
- return false;
+ // This applyOps oplog entry corresponds to non-internal transaction prepare/commit,
+ // replace it with a dead-end sentinel oplog entry.
+ auto sentinelOplogEntry =
+ makeSentinelOplogEntry(sessionId,
+ *nextNewWriteOplog.getTxnNumber(),
+ opCtx->getServiceContext()->getFastClockSource()->now());
+ _unprocessedNewWriteOplogBuffer.emplace_back(sentinelOplogEntry);
+ _newWriteOpTimeList.pop_front();
+ } else {
+ MONGO_UNREACHABLE;
+ }
}
- newWriteOplogEntry =
- makeSentinelOplogEntry(*newWriteOplogEntry.getSessionId(),
- *newWriteOplogEntry.getTxnNumber(),
- opCtx->getServiceContext()->getFastClockSource()->now());
+ // Peek the next oplog entry in the buffer and process it below. We cannot pop the oplog
+ // entry upfront since it may require fetching/forging a pre or post image and the reads
+ // done as part of that can fail with a WriteConflictException error.
+ nextNewWriteOplog = _unprocessedNewWriteOplogBuffer.back();
}
- auto imageNoopOplogEntry = fetchPrePostImageOplog(opCtx, &newWriteOplogEntry);
-
+ auto nextNewWriteImageOplog = fetchPrePostImageOplog(opCtx, &(*nextNewWriteOplog));
{
stdx::lock_guard<Latch> lk(_newOplogMutex);
- _lastFetchedNewWriteOplog = newWriteOplogEntry;
- _newWriteOpTimeList.pop_front();
-
- if (imageNoopOplogEntry) {
- _lastFetchedNewWriteOplogImage = imageNoopOplogEntry;
+ invariant(!_lastFetchedNewWriteOplogImage);
+ invariant(!_lastFetchedNewWriteOplog);
+ if (nextNewWriteImageOplog) {
+ _lastFetchedNewWriteOplogImage =
+ downConvertSessionInfoIfNeeded(*nextNewWriteImageOplog);
}
+ _lastFetchedNewWriteOplog = downConvertSessionInfoIfNeeded(*nextNewWriteOplog);
+ _unprocessedNewWriteOplogBuffer.pop_back();
}
return true;
@@ -533,6 +671,12 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime,
EntryAtOpTimeType entryAtOpTimeType) {
stdx::lock_guard<Latch> lk(_newOplogMutex);
+ _notifyNewWriteOpTime(lk, opTime, entryAtOpTimeType);
+}
+
+void SessionCatalogMigrationSource::_notifyNewWriteOpTime(WithLock,
+ repl::OpTime opTime,
+ EntryAtOpTimeType entryAtOpTimeType) {
_newWriteOpTimeList.emplace_back(opTime, entryAtOpTimeType);
if (_newOplogNotification) {
@@ -543,14 +687,26 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime,
SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator(
SessionTxnRecord txnRecord, int expectedRollbackId)
- : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId) {
+ : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId), _entryType([&] {
+ if (isInternalSessionForRetryableWrite(_record.getSessionId())) {
+ // The SessionCatalogMigrationSource should not try to create a SessionOplogIterator
+ // for a retryable internal transaction that has aborted or is still in progress or
+ // prepare.
+ invariant(_record.getState() == DurableTxnStateEnum::kCommitted);
+ return EntryType::kRetryableTransaction;
+ }
+ // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions for
+ // non-retryable writes.
+ invariant(!getParentSessionId(txnRecord.getSessionId()));
+ return _record.getState() ? EntryType::kNonRetryableTransaction
+ : EntryType::kRetryableWrite;
+ }()) {
_writeHistoryIterator =
std::make_unique<TransactionHistoryIterator>(_record.getLastWriteOpTime());
}
boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::SessionOplogIterator::getNext(
OperationContext* opCtx) {
-
if (!_writeHistoryIterator || !_writeHistoryIterator->hasNext()) {
return boost::none;
}
@@ -558,14 +714,24 @@ boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::SessionOplogIte
try {
uassert(ErrorCodes::IncompleteTransactionHistory,
str::stream() << "Cannot migrate multi-statement transaction state",
- !_record.getState());
+ _entryType == SessionOplogIterator::EntryType::kRetryableWrite ||
+ _entryType == SessionOplogIterator::EntryType::kRetryableTransaction);
// Note: during SessionCatalogMigrationSource::init, we inserted a document and wait for it
// to committed to the majority. In addition, the TransactionHistoryIterator uses OpTime
// to query for the oplog. This means that if we can successfully fetch the oplog, we are
// guaranteed that they are majority committed. If we can't fetch the oplog, it can either
// mean that the oplog has been rolled over or was rolled back.
- return _writeHistoryIterator->next(opCtx);
+ auto nextOplog = _writeHistoryIterator->next(opCtx);
+
+ if (_entryType == SessionOplogIterator::EntryType::kRetryableTransaction) {
+ if (nextOplog.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction) {
+ return getNext(opCtx);
+ }
+
+ invariant(nextOplog.getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ }
+ return nextOplog;
} catch (const AssertionException& excep) {
if (excep.code() == ErrorCodes::IncompleteTransactionHistory) {
// Note: no need to check if in replicaSet mode because having an iterator implies
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 3603ddfb055..f3b87baf7fe 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -177,6 +177,8 @@ private:
*/
class SessionOplogIterator {
public:
+ enum class EntryType { kRetryableWrite, kNonRetryableTransaction, kRetryableTransaction };
+
SessionOplogIterator(SessionTxnRecord txnRecord, int expectedRollbackId);
/**
@@ -199,6 +201,8 @@ private:
private:
const SessionTxnRecord _record;
const int _initialRollbackId;
+ const EntryType _entryType;
+
std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
};
@@ -242,6 +246,23 @@ private:
*/
bool _fetchNextNewWriteOplog(OperationContext* opCtx);
+ /**
+ * Same as notifyNewWriteOpTime but must be called while holding the _newOplogMutex.
+ */
+ void _notifyNewWriteOpTime(WithLock,
+ repl::OpTime opTimestamp,
+ EntryAtOpTimeType entryAtOpTimeType);
+
+ /*
+ * Derives retryable write oplog entries from the given retryable internal transaction applyOps
+ * oplog entry, and adds the ones that are related to the migration the given oplog buffer. Must
+ * be called while holding the mutex that protects the buffer.
+ */
+ void _extractOplogEntriesForInternalTransactionForRetryableWrite(
+ WithLock,
+ const repl::OplogEntry& applyOplogEntry,
+ std::vector<repl::OplogEntry>* oplogBuffer);
+
// Namespace for which the migration is happening
const NamespaceString _ns;
@@ -252,26 +273,31 @@ private:
const ChunkRange _chunkRange;
const ShardKeyPattern _keyPattern;
- // Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator,
- // _lastFetchedOplogBuffer, _lastFetchedOplog
+ // Protects _sessionOplogIterators, _currentOplogIterator, _lastFetchedOplog,
+ // _lastFetchedOplogImage and _unprocessedOplogBuffer.
Mutex _sessionCloneMutex =
MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_sessionCloneMutex");
// List of remaining session records that needs to be cloned.
std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators;
- // Points to the current session record eing cloned.
+ // Points to the current session record being cloned.
std::unique_ptr<SessionOplogIterator> _currentOplogIterator;
- // Used for temporarily storng oplog entries for operations that has more than one entry.
- // For example, findAndModify generates one for the actual operation and another for the
- // pre/post image.
- std::vector<repl::OplogEntry> _lastFetchedOplogBuffer;
- // Used to store the last fetched oplog. This enables calling get multiple times.
+ // Used to store the last fetched and processed oplog entry from _currentOplogIterator. This
+ // enables calling get() multiple times.
boost::optional<repl::OplogEntry> _lastFetchedOplog;
- // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification
+ // Used to store the pre/post image for _lastFetchedNewWriteOplog if there is one.
+ boost::optional<repl::OplogEntry> _lastFetchedOplogImage;
+
+ // Used to store the last fetched oplog entries from _currentOplogIterator that have not been
+ // processed.
+ std::vector<repl::OplogEntry> _unprocessedOplogBuffer;
+
+ // Protects _newWriteOpTimeList, _lastFetchedNewWriteOplog, _lastFetchedNewWriteOplogImage,
+ // _unprocessedNewWriteOplogBuffer, _state, _newOplogNotification.
Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex");
// The average size of documents in config.transactions.
@@ -280,12 +306,17 @@ private:
// Stores oplog opTime of new writes that are coming in.
std::list<std::pair<repl::OpTime, EntryAtOpTimeType>> _newWriteOpTimeList;
- // Used to store the last fetched oplog from _newWriteTsList.
+ // Used to store the last fetched and processed oplog entry from _newWriteOpTimeList. This
+ // enables calling get() multiple times.
boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog;
- // Used to store an image for `_lastFetchedNewWriteOplog` if there is one.
+ // Used to store the pre/post image oplog entry when _lastFetchedNewWriteOplog if there is one.
boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage;
+ // Used to store the last fetched oplog entries from _newWriteOpTimeList that have not been
+ // processed.
+ std::vector<repl::OplogEntry> _unprocessedNewWriteOplogBuffer;
+
// Stores the current state.
State _state{State::kActive};
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index f2fe2b99e4a..1427c74bafe 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
#include "mongo/db/repl/oplog_entry.h"
@@ -56,6 +57,7 @@ namespace {
using executor::RemoteCommandRequest;
const NamespaceString kNs("a.b");
+const NamespaceString kOtherNs("a.b.c");
const KeyPattern kShardKey(BSON("x" << 1));
const ChunkRange kChunkRange(BSON("x" << 0), BSON("x" << 100));
const KeyPattern kNestedShardKey(BSON("x.y" << 1));
@@ -960,6 +962,263 @@ TEST_F(SessionCatalogMigrationSourceTest,
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
+TEST_F(SessionCatalogMigrationSourceTest,
+ DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteBasic) {
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op1 = makeDurableReplOp(
+ repl::OpTypeEnum::kUpdate, kNs, BSON("$set" << BSON("_id" << 1)), BSON("x" << 1), {1});
+ // op without stmtId.
+ auto op2 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 2), BSONObj(), {});
+ // op for a different ns.
+ auto op3 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << 3), BSONObj(), {3});
+ auto op4 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 4), BSONObj(), {4});
+ // op that does not touch the chunk being migrated.
+ auto op5 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5});
+ // WouldChangeOwningShard sentinel op.
+ auto op6 = makeDurableReplOp(
+ repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6});
+
+ auto applyOpsOpTime1 = repl::OpTime(Timestamp(130, 1), 1);
+ auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
+ {}, // prevOpTime
+ {op1, op2, op3},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ true); // isPartial
+ insertOplogEntry(entry1);
+
+ auto applyOpsOpTime2 = repl::OpTime(Timestamp(130, 2), 1);
+ auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2,
+ entry1.getOpTime(), // prevOpTime
+ {op4, op5, op6},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry2);
+
+ auto applyOpsOpTime3 = repl::OpTime(Timestamp(130, 3), 1);
+ auto entry3 = makeApplyOpsOplogEntry(applyOpsOpTime3,
+ entry2.getOpTime(), // prevOpTime
+ {},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry3);
+
+ migrationSource.notifyNewWriteOpTime(
+ entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+
+ const auto expectedSessionId = *getParentSessionId(sessionId);
+ const auto expectedTxnNumber = *sessionId.getTxnNumber();
+ const std::vector<repl::DurableReplOperation> expectedOps{op6, op4, op1};
+
+ for (const auto& op : expectedOps) {
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), op.toBSON());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteFetchPrePostImage) {
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto preImageOpTimeForOp2 = repl::OpTime(Timestamp(140, 1), 1);
+ auto preImageEntryForOp2 = makeOplogEntry(preImageOpTimeForOp2,
+ repl::OpTypeEnum::kNoop,
+ BSON("x" << 2), // o
+ boost::none, // o2
+ Date_t::now(), // wallClockTime
+ sessionId,
+ txnNumber,
+ {2}, // stmtIds
+ {}); // prevOpTime
+ insertOplogEntry(preImageEntryForOp2);
+
+ auto postImageOpTimeForOp4 = repl::OpTime(Timestamp(140, 2), 1);
+ auto postImageEntryForOp4 = makeOplogEntry(postImageOpTimeForOp4,
+ repl::OpTypeEnum::kNoop,
+ BSON("_id" << 4 << "x" << 4), // o
+ boost::none, // o2
+ Date_t::now(), // wallClockTime,
+ sessionId,
+ txnNumber,
+ {4}, // stmtIds
+ {}); // prevOpTime
+ insertOplogEntry(postImageEntryForOp4);
+
+ auto op1 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1});
+ auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("$set" << BSON("_id" << 2)),
+ BSON("x" << 2),
+ {1}, // stmtIds
+ boost::none, // needsRetryImage
+ preImageOpTimeForOp2); // preImageOpTime
+ auto op3 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3});
+ auto op4 = makeDurableReplOp(repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("$set" << BSON("_id" << 4)),
+ BSON("x" << 4),
+ {4},
+ boost::none, // needsRetryImage
+ boost::none, // preImageOpTime
+ postImageOpTimeForOp4); // postImageOpTime
+ auto op5 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 5), BSONObj(), {5});
+
+ auto applyOpsOpTime1 = repl::OpTime(Timestamp(140, 3), 1);
+ auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
+ {}, // prevOpTime
+ {op1, op2, op3},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ true); // isPartial
+ insertOplogEntry(entry1);
+
+ auto applyOpsOpTime2 = repl::OpTime(Timestamp(140, 4), 1);
+ auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2,
+ entry1.getOpTime(), // prevOpTime
+ {op4, op5},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry2);
+
+ migrationSource.notifyNewWriteOpTime(
+ entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+
+ const auto expectedSessionId = *getParentSessionId(sessionId);
+ const auto expectedTxnNumber = *sessionId.getTxnNumber();
+ const std::vector<repl::DurableReplOperation> expectedOps{
+ op5,
+ postImageEntryForOp4.getDurableReplOperation(),
+ op4,
+ op3,
+ preImageEntryForOp2.getDurableReplOperation(),
+ op2,
+ op1};
+
+ for (const auto& op : expectedOps) {
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ if (nextOplogResult.oplog->getOpType() == repl::OpTypeEnum::kNoop) {
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ } else {
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
+ }
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), op.toBSON());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteForgePrePostImage) {
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ std::vector<repl::RetryImageEnum> cases{repl::RetryImageEnum::kPreImage,
+ repl::RetryImageEnum::kPostImage};
+ auto opTimeSecs = 150;
+ for (auto imageType : cases) {
+ const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op1 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1});
+ auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("$set" << BSON("_id" << 1)),
+ BSON("x" << 2),
+ {2},
+ imageType /* needsRetryImage */);
+ auto op3 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3});
+
+ auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 2), 1);
+ auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
+ {}, // prevOpTime
+ {op1},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ true); // isPartial
+ insertOplogEntry(entry1);
+
+ auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 3), 1);
+ auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2,
+ entry1.getOpTime(), // prevOpTime
+ {op2, op3},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry2);
+
+ repl::ImageEntry imageEntryForOp2;
+ imageEntryForOp2.set_id(sessionId);
+ imageEntryForOp2.setTxnNumber(txnNumber);
+ imageEntryForOp2.setTs(applyOpsOpTime2.getTimestamp());
+ imageEntryForOp2.setImageKind(imageType);
+ imageEntryForOp2.setImage(*op2.getObject2());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kConfigImagesNamespace.ns(), imageEntryForOp2.toBSON());
+
+ migrationSource.notifyNewWriteOpTime(
+ entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+
+ const auto expectedSessionId = *getParentSessionId(sessionId);
+ const auto expectedTxnNumber = *sessionId.getTxnNumber();
+ const auto expectedImageOpForOp2 = makeDurableReplOp(
+ repl::OpTypeEnum::kNoop, kNs, imageEntryForOp2.getImage(), boost::none, {0});
+ const std::vector<repl::DurableReplOperation> expectedOps{
+ op3, expectedImageOpForOp2, op2, op1};
+
+ for (const auto& op : expectedOps) {
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ if (nextOplogResult.oplog->getOpType() == repl::OpTypeEnum::kNoop) {
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ } else {
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
+ }
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(),
+ op.toBSON());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ opTimeSecs++;
+ }
+}
+
TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) {
const auto sessionId = makeLogicalSessionIdForTest();
const auto txnNumber = TxnNumber{1};
@@ -1221,6 +1480,352 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreCommittedInternalTransactionForN
}
TEST_F(SessionCatalogMigrationSourceTest,
+ DeriveOplogEntriesForCommittedInternalTransactionForRetryableWriteBasic) {
+ auto opTimeSecs = 210;
+
+ auto runTest = [&](bool isPrepared) {
+ const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op1 = makeDurableReplOp(
+ repl::OpTypeEnum::kUpdate, kNs, BSON("$set" << BSON("_id" << 1)), BSON("x" << 1), {1});
+ // op without stmtId.
+ auto op2 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 2), BSONObj(), {});
+ // op for a different ns.
+ auto op3 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << 3), BSONObj(), {3});
+ auto op4 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 4), BSONObj(), {4});
+ // op that does not touch the chunk being migrated.
+ auto op5 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5});
+ // WouldChangeOwningShard sentinel op.
+ auto op6 = makeDurableReplOp(
+ repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6});
+
+ auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 1), 1);
+ auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
+ {}, // prevOpTime
+ {op1, op2, op3},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ true); // isPartial
+ insertOplogEntry(entry1);
+
+ auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 2), 1);
+ auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2,
+ entry1.getOpTime(), // prevOpTime
+ {op4, op5, op6},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry2);
+
+ auto applyOpsOpTime3 = repl::OpTime(Timestamp(opTimeSecs, 3), 1);
+ auto entry3 = makeApplyOpsOplogEntry(applyOpsOpTime3,
+ entry2.getOpTime(), // prevOpTime
+ {},
+ sessionId,
+ txnNumber,
+ isPrepared, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry3);
+
+ repl::OpTime lastWriteOpTime;
+ if (isPrepared) {
+ auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 4), 1);
+ auto entry4 = makeCommandOplogEntry(commitOpTime,
+ entry2.getOpTime(),
+ BSON("commitTransaction" << 1),
+ sessionId,
+ txnNumber);
+ insertOplogEntry(entry4);
+ lastWriteOpTime = commitOpTime;
+ } else {
+ lastWriteOpTime = applyOpsOpTime3;
+ }
+
+ SessionTxnRecord txnRecord;
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(lastWriteOpTime);
+ txnRecord.setLastWriteDate(Date_t::now());
+ txnRecord.setState(DurableTxnStateEnum::kCommitted);
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ const auto expectedSessionId = *getParentSessionId(sessionId);
+ const auto expectedTxnNumber = *sessionId.getTxnNumber();
+ const std::vector<repl::DurableReplOperation> expectedOps{op6, op4, op1};
+
+ for (const auto& op : expectedOps) {
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(),
+ op.toBSON());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ opTimeSecs++;
+ client.remove(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
+ };
+
+ runTest(false /*isPrepared */);
+ runTest(true /*isPrepared */);
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ DeriveOplogEntriesForCommittedInternalTransactionForRetryableWriteFetchPrePostImage) {
+ auto opTimeSecs = 220;
+
+ auto runTest = [&](bool isPrepared) {
+ const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto preImageOpTimeForOp2 = repl::OpTime(Timestamp(opTimeSecs, 1), 1);
+ auto preImageEntryForOp2 = makeOplogEntry(preImageOpTimeForOp2,
+ repl::OpTypeEnum::kNoop,
+ BSON("x" << 2), // o
+ boost::none, // o2
+ Date_t::now(), // wallClockTime
+ sessionId,
+ txnNumber,
+ {2}, // stmtIds
+ {}); // prevOpTime
+ insertOplogEntry(preImageEntryForOp2);
+
+ auto postImageOpTimeForOp4 = repl::OpTime(Timestamp(opTimeSecs, 2), 1);
+ auto postImageEntryForOp4 = makeOplogEntry(postImageOpTimeForOp4,
+ repl::OpTypeEnum::kNoop,
+ BSON("_id" << 4 << "x" << 4), // o
+ boost::none, // o2
+ Date_t::now(), // wallClockTime,
+ sessionId,
+ txnNumber,
+ {4}, // stmtIds
+ {}); // prevOpTime
+ insertOplogEntry(postImageEntryForOp4);
+
+ auto op1 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1});
+ auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("$set" << BSON("_id" << 2)),
+ BSON("x" << 2),
+ {2}, // stmtIds
+ boost::none, // needsRetryImage
+ preImageOpTimeForOp2); // preImageOpTime
+ auto op3 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3});
+ auto op4 = makeDurableReplOp(repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("$set" << BSON("_id" << 4)),
+ BSON("x" << 4),
+ {4},
+ boost::none, // needsRetryImage
+ boost::none, // preImageOpTime
+ postImageOpTimeForOp4); // postImageOpTime
+ auto op5 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 5), BSONObj(), {5});
+
+ auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 3), 1);
+ auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
+ {}, // prevOpTime
+ {op1, op2, op3},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ true); // isPartial
+ insertOplogEntry(entry1);
+
+ auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 4), 1);
+ auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2,
+ entry1.getOpTime(), // prevOpTime
+ {op4, op5},
+ sessionId,
+ txnNumber,
+ isPrepared, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry2);
+
+ repl::OpTime lastWriteOpTime;
+ if (isPrepared) {
+ auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 5), 1);
+ auto entry3 = makeCommandOplogEntry(commitOpTime,
+ entry2.getOpTime(),
+ BSON("commitTransaction" << 1),
+ sessionId,
+ txnNumber);
+ insertOplogEntry(entry3);
+ lastWriteOpTime = commitOpTime;
+ } else {
+ lastWriteOpTime = applyOpsOpTime2;
+ }
+
+ SessionTxnRecord txnRecord;
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(lastWriteOpTime);
+ txnRecord.setLastWriteDate(Date_t::now());
+ txnRecord.setState(DurableTxnStateEnum::kCommitted);
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ const auto expectedSessionId = *getParentSessionId(sessionId);
+ const auto expectedTxnNumber = *sessionId.getTxnNumber();
+ const std::vector<repl::DurableReplOperation> expectedOps{
+ op5,
+ postImageEntryForOp4.getDurableReplOperation(),
+ op4,
+ op3,
+ preImageEntryForOp2.getDurableReplOperation(),
+ op2,
+ op1};
+
+ for (const auto& op : expectedOps) {
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(),
+ op.toBSON());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ opTimeSecs++;
+ client.remove(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
+ };
+
+ runTest(false /*isPrepared */);
+ runTest(true /*isPrepared */);
+}
+
+TEST_F(
+ SessionCatalogMigrationSourceTest,
+ DeriveOplogEntriesForCommittedUnpreparedInternalTransactionForRetryableWriteForgePrePostImage) {
+ auto opTimeSecs = 230;
+
+ std::vector<repl::RetryImageEnum> cases{repl::RetryImageEnum::kPreImage,
+ repl::RetryImageEnum::kPostImage};
+ auto runTest = [&](bool isPrepared) {
+ for (auto imageType : cases) {
+ const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op1 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1});
+ auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("$set" << BSON("_id" << 2)),
+ BSON("x" << 2),
+ {2},
+ imageType /* needsRetryImage */);
+ auto op3 =
+ makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3});
+
+ auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 2), 1);
+ auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
+ {}, // prevOpTime
+ {op1},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ true); // isPartial
+ insertOplogEntry(entry1);
+
+ auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 3), 1);
+ auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2,
+ entry1.getOpTime(), // prevOpTime
+ {op2, op3},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry2);
+
+ repl::OpTime lastWriteOpTime;
+ if (isPrepared) {
+ auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 4), 1);
+ auto entry3 = makeCommandOplogEntry(commitOpTime,
+ entry2.getOpTime(),
+ BSON("commitTransaction" << 1),
+ sessionId,
+ txnNumber);
+ insertOplogEntry(entry3);
+ lastWriteOpTime = commitOpTime;
+ } else {
+ lastWriteOpTime = applyOpsOpTime2;
+ }
+
+ DBDirectClient client(opCtx());
+
+ SessionTxnRecord txnRecord;
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(lastWriteOpTime);
+ txnRecord.setLastWriteDate(Date_t::now());
+ txnRecord.setState(DurableTxnStateEnum::kCommitted);
+
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ txnRecord.toBSON());
+
+ repl::ImageEntry imageEntryForOp2;
+ imageEntryForOp2.set_id(sessionId);
+ imageEntryForOp2.setTxnNumber(txnNumber);
+ imageEntryForOp2.setTs(applyOpsOpTime2.getTimestamp());
+ imageEntryForOp2.setImageKind(imageType);
+ imageEntryForOp2.setImage(*op2.getObject2());
+
+ client.insert(NamespaceString::kConfigImagesNamespace.ns(), imageEntryForOp2.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ const auto expectedSessionId = *getParentSessionId(sessionId);
+ const auto expectedTxnNumber = *sessionId.getTxnNumber();
+ const auto expectedImageOpForOp2 = makeDurableReplOp(
+ repl::OpTypeEnum::kNoop, kNs, imageEntryForOp2.getImage(), boost::none, {0});
+ const std::vector<repl::DurableReplOperation> expectedOps{
+ op3, expectedImageOpForOp2, op2, op1};
+
+ for (const auto& op : expectedOps) {
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(),
+ op.toBSON());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ opTimeSecs++;
+ client.remove(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ txnRecord.toBSON());
+ }
+ };
+
+ runTest(false /*isPrepared */);
+ runTest(true /*isPrepared */);
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
ReturnDeadEndSentinelOplogEntryForPreparedNonInternalTransaction) {
const auto sessionId = makeLogicalSessionIdForTest();
const auto txnNumber = TxnNumber{1};