diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-03-18 23:52:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-19 00:24:26 +0000 |
commit | c141ef8536d51f05a6fa4017de20286d154d09e1 (patch) | |
tree | 022bd1ffe70d91657498a2237ca8e49ed12de74b /src | |
parent | 6f2f79447a83f1cccb208d815e049699c8f86fbe (diff) | |
download | mongo-c141ef8536d51f05a6fa4017de20286d154d09e1.tar.gz |
SERVER-63494 Transfer history for retryable transactions with more than one oplog entry across migrations
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.cpp | 296 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.h | 53 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 605 |
8 files changed, 918 insertions, 82 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 2d21959e896..a29fe4b790d 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1508,7 +1508,6 @@ int logOplogEntriesForTransaction( const auto txnParticipant = TransactionParticipant::get(opCtx); OpTimeBundle prevWriteOpTime; - auto numEntriesWritten = 0; // Writes to the oplog only require a Global intent lock. Guaranteed by // OplogSlotReserver. @@ -1528,6 +1527,7 @@ int logOplogEntriesForTransaction( MutableOplogEntry imageEntry; imageEntry.setSessionId(*opCtx->getLogicalSessionId()); imageEntry.setTxnNumber(*opCtx->getTxnNumber()); + imageEntry.setStatementIds(statement.getStatementIds()); imageEntry.setOpType(repl::OpTypeEnum::kNoop); imageEntry.setObject(imageDoc); imageEntry.setNss(statement.getNss()); @@ -1674,10 +1674,9 @@ int logOplogEntriesForTransaction( // Advance the iterator to the beginning of the remaining unpacked statements. stmtsIter = nextStmt; - numEntriesWritten++; } - return numEntriesWritten; + return currOplogSlot - oplogSlots.begin(); } void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 03ea1dd3481..5aff3904cb7 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -1938,6 +1938,14 @@ protected: if (updateOplogEntry.getTxnNumber()) { ASSERT_EQ(*updateOplogEntry.getTxnNumber(), *preImage.getTxnNumber()); } + if (!updateOplogEntry.getStatementIds().empty()) { + const auto& updateOplogStmtIds = updateOplogEntry.getStatementIds(); + const auto& preImageOplogStmtIds = preImage.getStatementIds(); + ASSERT_EQ(updateOplogStmtIds.size(), preImageOplogStmtIds.size()); + for (size_t i = 0; i < updateOplogStmtIds.size(); i++) { + ASSERT_EQ(updateOplogStmtIds[i], preImageOplogStmtIds[i]); + } + } } else { ASSERT_FALSE(updateOplogEntry.getPreImageOpTime()); } @@ -1967,6 +1975,14 @@ protected: if (updateOplogEntry.getTxnNumber()) { ASSERT_EQ(*updateOplogEntry.getTxnNumber(), *postImage.getTxnNumber()); } + if (!updateOplogEntry.getStatementIds().empty()) { + const auto& updateOplogStmtIds = updateOplogEntry.getStatementIds(); + const auto& postImageOplogStmtIds = postImage.getStatementIds(); + ASSERT_EQ(updateOplogStmtIds.size(), postImageOplogStmtIds.size()); + for (size_t i = 0; i < updateOplogStmtIds.size(); i++) { + ASSERT_EQ(updateOplogStmtIds[i], postImageOplogStmtIds[i]); + } + } } else { ASSERT_FALSE(updateOplogEntry.getPostImageOpTime()); } @@ -2384,6 +2400,14 @@ protected: if (deleteOplogEntry.getTxnNumber()) { ASSERT_EQ(*deleteOplogEntry.getTxnNumber(), *preImage.getTxnNumber()); } + if (!deleteOplogEntry.getStatementIds().empty()) { + const auto& deleteOplogStmtIds = deleteOplogEntry.getStatementIds(); + const auto& preImageOplogStmtIds = preImage.getStatementIds(); + ASSERT_EQ(deleteOplogStmtIds.size(), preImageOplogStmtIds.size()); + for (size_t i = 0; i < deleteOplogStmtIds.size(); i++) { + ASSERT_EQ(deleteOplogStmtIds[i], preImageOplogStmtIds[i]); + } + } } else { ASSERT_FALSE(deleteOplogEntry.getPreImageOpTime()); } diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index fb6d803712c..41dcdd121f3 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -257,7 +257,7 @@ SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry) { res.setN(1); res.setNModified(1); } else if (entry.getOpType() == repl::OpTypeEnum::kNoop) { - if (entry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0) { + if (isWouldChangeOwningShardSentinelOplogEntry(entry)) { uasserted(ErrorCodes::IncompleteTransactionHistory, kWouldChangeOwningShardRetryContext); } @@ -281,7 +281,7 @@ write_ops::FindAndModifyCommandReply parseOplogEntryForFindAndModify( // Migrated op and WouldChangeOwningShard sentinel case. if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) { - if (oplogEntry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0) { + if (isWouldChangeOwningShardSentinelOplogEntry(oplogEntry)) { uasserted(ErrorCodes::IncompleteTransactionHistory, kWouldChangeOwningShardRetryContext); } diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h index de808d3e067..ad3be539fd7 100644 --- a/src/mongo/db/ops/write_ops_retryability.h +++ b/src/mongo/db/ops/write_ops_retryability.h @@ -40,6 +40,12 @@ class OperationContext; const BSONObj kWouldChangeOwningShardSentinel(BSON("$wouldChangeOwningShard" << 1)); +template <typename OplogEntryType> +bool isWouldChangeOwningShardSentinelOplogEntry(const OplogEntryType& oplogEntry) { + return (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) && + (oplogEntry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0); +} + /** * Returns the single write result corresponding to the given oplog entry for document update. I.e., * the single write result that would have been returned by the statement that would have resulted diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 0b605ff92a0..afa092fe137 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -40,6 +40,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/migration_session_id.h" @@ -216,6 +217,10 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, // and o2 will be empty. // (3) Oplog entries that are a dead sentinel, which the donor sent over as the replacement // for a prepare oplog entry or unprepared transaction commit oplog entry. + // (4) Oplog entries that are a WouldChangeOwningShard sentinel entry, used for making + // retries of a WouldChangeOwningShard update or findAndModify fail with + // IncompleteTransactionHistory. In this case, the o field is non-empty and the o2 + // field is an empty BSONObj. BSONObj object2; if (oplogEntry.getObject2()) { @@ -224,7 +229,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, oplogEntry.setObject2(object2); } - if (object2.isEmpty()) { + if (object2.isEmpty() && !isWouldChangeOwningShardSentinelOplogEntry(oplogEntry)) { result.isPrePostImage = true; uassert(40632, diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 36785825a04..a79cefc1973 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -38,6 +38,8 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" +#include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/repl/apply_ops_command_info.h" #include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/repl_client_info.h" @@ -187,6 +189,24 @@ repl::OplogEntry makeSentinelOplogEntry(const LogicalSessionId& lsid, {kIncompleteHistoryStmtId}); // statement id } +/** + * If the given oplog entry is an oplog entry for a retryable internal transaction, returns a copy + * of it but with the session id and transaction number set to the session id and transaction number + * of the retryable write that it corresponds to. Otherwise, returns the original oplog entry. + */ +repl::OplogEntry downConvertSessionInfoIfNeeded(const repl::OplogEntry& oplogEntry) { + const auto sessionId = oplogEntry.getSessionId(); + if (isInternalSessionForRetryableWrite(*sessionId)) { + auto mutableOplogEntry = + fassert(6349401, repl::MutableOplogEntry::parse(oplogEntry.getEntry().toBSON())); + mutableOplogEntry.setSessionId(*getParentSessionId(*sessionId)); + mutableOplogEntry.setTxnNumber(*sessionId->getTxnNumber()); + + return {mutableOplogEntry.toBSON()}; + } + return oplogEntry; +} + } // namespace SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* opCtx, @@ -309,7 +329,9 @@ void SessionCatalogMigrationSource::onCloneCleanup() { SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard<Latch> _lk(_sessionCloneMutex); - if (_lastFetchedOplog) { + if (_lastFetchedOplogImage) { + return OplogResult(_lastFetchedOplogImage, false); + } else if (_lastFetchedOplog) { return OplogResult(_lastFetchedOplog, false); } } @@ -317,9 +339,8 @@ SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLas { stdx::lock_guard<Latch> _lk(_newOplogMutex); if (_lastFetchedNewWriteOplogImage) { - return OplogResult(_lastFetchedNewWriteOplogImage.get(), false); + return OplogResult(_lastFetchedNewWriteOplogImage, false); } - return OplogResult(_lastFetchedNewWriteOplog, true); } } @@ -389,38 +410,105 @@ bool SessionCatalogMigrationSource::shouldSkipOplogEntry(const mongo::repl::Oplo return false; } -bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) { +void SessionCatalogMigrationSource::_extractOplogEntriesForInternalTransactionForRetryableWrite( + WithLock, + const repl::OplogEntry& applyOpsOplogEntry, + std::vector<repl::OplogEntry>* oplogBuffer) { + invariant(isInternalSessionForRetryableWrite(*applyOpsOplogEntry.getSessionId())); + invariant(applyOpsOplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + + auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(applyOpsOplogEntry.getObject()); + auto unrolledOp = + uassertStatusOK(repl::MutableOplogEntry::parse(applyOpsOplogEntry.getEntry().toBSON())); + + for (const auto& innerOp : applyOpsInfo.getOperations()) { + auto replOp = repl::ReplOperation::parse( + {"SessionOplogIterator::_extractOplogEntriesForInternalTransactionForRetryableWrite"}, + innerOp); + + if (replOp.getStatementIds().empty()) { + // Skip this operation since it is not retryable. + continue; + } + + if (replOp.getNss() != _ns && !isWouldChangeOwningShardSentinelOplogEntry(replOp)) { + // Skip this operation since it does not involve the namespace being migrated. + continue; + } + + unrolledOp.setDurableReplOperation(replOp); + auto unrolledOplogEntry = repl::OplogEntry(unrolledOp.toBSON()); + + if (shouldSkipOplogEntry(unrolledOplogEntry, _keyPattern, _chunkRange)) { + continue; + } + + oplogBuffer->emplace_back(unrolledOplogEntry); + } +} + +bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock lk, OperationContext* opCtx) { while (_currentOplogIterator) { - if (auto nextOplog = _currentOplogIterator->getNext(opCtx)) { + if (_unprocessedOplogBuffer.empty()) { + // The oplog buffer is empty. Fetch the next oplog entry from the current session + // oplog iterator. + auto nextOplog = _currentOplogIterator->getNext(opCtx); + + if (!nextOplog) { + _currentOplogIterator.reset(); + return false; + } + + // Determine if this oplog entry should be migrated. If so, add the oplog entry or the + // oplog entries derived from it to the oplog buffer. + + if (isInternalSessionForRetryableWrite(*nextOplog->getSessionId())) { + invariant(nextOplog->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + // Derive retryable write oplog entries from this retryable internal transaction + // applyOps oplog entry, and add them to the oplog buffer. + _extractOplogEntriesForInternalTransactionForRetryableWrite( + lk, *nextOplog, &_unprocessedOplogBuffer); + continue; + } + + // We only expect to see two kinds of oplog entries here: + // - Dead-end sentinel oplog entries which by design should have stmtId equal to + // kIncompleteHistoryStmtId. + // - CRUD or noop oplog entries for retryable writes which by design should have a + // stmtId. auto nextStmtIds = nextOplog->getStatementIds(); + invariant(!nextStmtIds.empty()); // Skip the rest of the chain for this session since the ns is unrelated with the // current one being migrated. It is ok to not check the rest of the chain because // retryable writes doesn't allow touching different namespaces. - if (nextStmtIds.empty() || - (nextStmtIds.front() != kIncompleteHistoryStmtId && nextOplog->getNss() != _ns)) { + if (nextStmtIds.front() != kIncompleteHistoryStmtId && nextOplog->getNss() != _ns) { _currentOplogIterator.reset(); return false; } - // Skipping an entry here will also result in the pre/post images to also not be sent in - // the migration as they're handled by 'fetchPrePostImageOplog' below + // Skipping an entry here will also result in the pre/post images to also not be + // sent in the migration as they're handled by 'fetchPrePostImageOplog' below. if (shouldSkipOplogEntry(nextOplog.get(), _keyPattern, _chunkRange)) { continue; } - auto doc = fetchPrePostImageOplog(opCtx, &(nextOplog.get())); - if (doc) { - _lastFetchedOplogBuffer.push_back(*nextOplog); - _lastFetchedOplog = *doc; - } else { - _lastFetchedOplog = *nextOplog; - } + _unprocessedOplogBuffer.emplace_back(*nextOplog); + } - return true; - } else { - _currentOplogIterator.reset(); + // Peek the next oplog entry in the buffer and process it. We cannot pop the oplog + // entry upfront since it may require fetching/forging a pre or post image and the reads + // done as part of that can fail with a WriteConflictException error. + auto nextOplog = _unprocessedOplogBuffer.back(); + auto nextImageOplog = fetchPrePostImageOplog(opCtx, &nextOplog); + invariant(!_lastFetchedOplogImage); + invariant(!_lastFetchedOplog); + if (nextImageOplog) { + _lastFetchedOplogImage = downConvertSessionInfoIfNeeded(*nextImageOplog); } + _lastFetchedOplog = downConvertSessionInfoIfNeeded(nextOplog); + _unprocessedOplogBuffer.pop_back(); + return true; } return false; @@ -428,16 +516,20 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() { stdx::lock_guard<Latch> _lk(_sessionCloneMutex); - return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty() || + return _lastFetchedOplog || !_unprocessedOplogBuffer.empty() || !_sessionOplogIterators.empty() || _currentOplogIterator; } bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) { stdx::unique_lock<Latch> lk(_sessionCloneMutex); - if (!_lastFetchedOplogBuffer.empty()) { - _lastFetchedOplog = _lastFetchedOplogBuffer.back(); - _lastFetchedOplogBuffer.pop_back(); + if (_lastFetchedOplogImage) { + // When `_lastFetchedOplogImage` is set, it means we found an oplog entry with a pre/post + // image. At this step, we've already returned the image oplog entry, but we have yet to + // return the original oplog entry stored in `_lastFetchedOplog`. We will unset this value + // and return such that the next call to `getLastFetchedOplog` will return + // `_lastFetchedOplog`. + _lastFetchedOplogImage.reset(); return true; } @@ -460,15 +552,16 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC } bool SessionCatalogMigrationSource::_hasNewWrites(WithLock) { - return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty(); + return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty() || + !_unprocessedNewWriteOplogBuffer.empty(); } bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* opCtx) { - repl::OpTime nextOpTimeToFetch; - EntryAtOpTimeType entryAtOpTimeType; + boost::optional<repl::OplogEntry> nextNewWriteOplog; { - stdx::lock_guard<Latch> lk(_newOplogMutex); + stdx::unique_lock<Latch> lk(_newOplogMutex); + if (_lastFetchedNewWriteOplogImage) { // When `_lastFetchedNewWriteOplogImage` is set, it means we found an oplog entry with // a pre/post image. At this step, we've already returned the image oplog entry, but we @@ -479,52 +572,97 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op return true; } - if (_newWriteOpTimeList.empty()) { - _lastFetchedNewWriteOplog.reset(); + _lastFetchedNewWriteOplog.reset(); + + if (_unprocessedNewWriteOplogBuffer.empty() && _newWriteOpTimeList.empty()) { return false; } - std::tie(nextOpTimeToFetch, entryAtOpTimeType) = _newWriteOpTimeList.front(); - } - - DBDirectClient client(opCtx); - const auto& newWriteOplogDoc = - client.findOne(NamespaceString::kRsOplogNamespace, nextOpTimeToFetch.asQuery()); - - uassert(40620, - str::stream() << "Unable to fetch oplog entry with opTime: " - << nextOpTimeToFetch.toBSON(), - !newWriteOplogDoc.isEmpty()); - - auto newWriteOplogEntry = uassertStatusOK(repl::OplogEntry::parse(newWriteOplogDoc)); + if (_unprocessedNewWriteOplogBuffer.empty()) { + // The oplog buffer is empty. Peek the next opTime and fetch its oplog entry while not + // holding the mutex. We cannot dequeue the opTime upfront since the the read can fail + // with a WriteConflictException error. + repl::OpTime opTimeToFetch; + EntryAtOpTimeType entryAtOpTimeType; + std::tie(opTimeToFetch, entryAtOpTimeType) = _newWriteOpTimeList.front(); + + lk.unlock(); + DBDirectClient client(opCtx); + const auto& nextNewWriteOplogDoc = + client.findOne(NamespaceString::kRsOplogNamespace, opTimeToFetch.asQuery()); + uassert(40620, + str::stream() << "Unable to fetch oplog entry with opTime: " + << opTimeToFetch.toBSON(), + !nextNewWriteOplogDoc.isEmpty()); + auto nextNewWriteOplog = uassertStatusOK(repl::OplogEntry::parse(nextNewWriteOplogDoc)); + lk.lock(); + + // Determine if this oplog entry should be migrated. If so, add the oplog entry or the + // oplog entries derived from it to the oplog buffer. Finally, dequeue the opTime. + + if (entryAtOpTimeType == EntryAtOpTimeType::kRetryableWrite) { + _unprocessedNewWriteOplogBuffer.emplace_back(nextNewWriteOplog); + _newWriteOpTimeList.pop_front(); + } else if (entryAtOpTimeType == EntryAtOpTimeType::kTransaction) { + invariant(nextNewWriteOplog.getCommandType() == + repl::OplogEntry::CommandType::kApplyOps); + const auto sessionId = *nextNewWriteOplog.getSessionId(); + + if (isInternalSessionForNonRetryableWrite(sessionId)) { + // TODO (SERVER-64331): Determine if chunk migration should migrate internal + // sessions for non-retryable writes. + _newWriteOpTimeList.pop_front(); + return false; + } - // If this oplog entry corresponds to transaction prepare/commit, replace it with a sentinel - // entry. - if (entryAtOpTimeType == EntryAtOpTimeType::kTransaction) { - const auto sessionId = *newWriteOplogEntry.getSessionId(); + if (isInternalSessionForRetryableWrite(sessionId)) { + // Derive retryable write oplog entries from this retryable internal + // transaction applyOps oplog entry, and add them to the oplog buffer. + _extractOplogEntriesForInternalTransactionForRetryableWrite( + lk, nextNewWriteOplog, &_unprocessedNewWriteOplogBuffer); + _newWriteOpTimeList.pop_front(); + + if (auto prevOpTime = nextNewWriteOplog.getPrevWriteOpTimeInTransaction(); + prevOpTime && !prevOpTime->isNull()) { + // Add the opTime for the previous applyOps oplog entry in the transaction + // to the queue. + _notifyNewWriteOpTime(lk, *prevOpTime, EntryAtOpTimeType::kTransaction); + } + + lk.unlock(); + return _fetchNextNewWriteOplog(opCtx); + } - if (isInternalSessionForNonRetryableWrite(sessionId)) { - // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions - // for non-retryable writes. - return false; + // This applyOps oplog entry corresponds to non-internal transaction prepare/commit, + // replace it with a dead-end sentinel oplog entry. + auto sentinelOplogEntry = + makeSentinelOplogEntry(sessionId, + *nextNewWriteOplog.getTxnNumber(), + opCtx->getServiceContext()->getFastClockSource()->now()); + _unprocessedNewWriteOplogBuffer.emplace_back(sentinelOplogEntry); + _newWriteOpTimeList.pop_front(); + } else { + MONGO_UNREACHABLE; + } } - newWriteOplogEntry = - makeSentinelOplogEntry(*newWriteOplogEntry.getSessionId(), - *newWriteOplogEntry.getTxnNumber(), - opCtx->getServiceContext()->getFastClockSource()->now()); + // Peek the next oplog entry in the buffer and process it below. We cannot pop the oplog + // entry upfront since it may require fetching/forging a pre or post image and the reads + // done as part of that can fail with a WriteConflictException error. + nextNewWriteOplog = _unprocessedNewWriteOplogBuffer.back(); } - auto imageNoopOplogEntry = fetchPrePostImageOplog(opCtx, &newWriteOplogEntry); - + auto nextNewWriteImageOplog = fetchPrePostImageOplog(opCtx, &(*nextNewWriteOplog)); { stdx::lock_guard<Latch> lk(_newOplogMutex); - _lastFetchedNewWriteOplog = newWriteOplogEntry; - _newWriteOpTimeList.pop_front(); - - if (imageNoopOplogEntry) { - _lastFetchedNewWriteOplogImage = imageNoopOplogEntry; + invariant(!_lastFetchedNewWriteOplogImage); + invariant(!_lastFetchedNewWriteOplog); + if (nextNewWriteImageOplog) { + _lastFetchedNewWriteOplogImage = + downConvertSessionInfoIfNeeded(*nextNewWriteImageOplog); } + _lastFetchedNewWriteOplog = downConvertSessionInfoIfNeeded(*nextNewWriteOplog); + _unprocessedNewWriteOplogBuffer.pop_back(); } return true; @@ -533,6 +671,12 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime, EntryAtOpTimeType entryAtOpTimeType) { stdx::lock_guard<Latch> lk(_newOplogMutex); + _notifyNewWriteOpTime(lk, opTime, entryAtOpTimeType); +} + +void SessionCatalogMigrationSource::_notifyNewWriteOpTime(WithLock, + repl::OpTime opTime, + EntryAtOpTimeType entryAtOpTimeType) { _newWriteOpTimeList.emplace_back(opTime, entryAtOpTimeType); if (_newOplogNotification) { @@ -543,14 +687,26 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime, SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator( SessionTxnRecord txnRecord, int expectedRollbackId) - : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId) { + : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId), _entryType([&] { + if (isInternalSessionForRetryableWrite(_record.getSessionId())) { + // The SessionCatalogMigrationSource should not try to create a SessionOplogIterator + // for a retryable internal transaction that has aborted or is still in progress or + // prepare. + invariant(_record.getState() == DurableTxnStateEnum::kCommitted); + return EntryType::kRetryableTransaction; + } + // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions for + // non-retryable writes. + invariant(!getParentSessionId(txnRecord.getSessionId())); + return _record.getState() ? EntryType::kNonRetryableTransaction + : EntryType::kRetryableWrite; + }()) { _writeHistoryIterator = std::make_unique<TransactionHistoryIterator>(_record.getLastWriteOpTime()); } boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::SessionOplogIterator::getNext( OperationContext* opCtx) { - if (!_writeHistoryIterator || !_writeHistoryIterator->hasNext()) { return boost::none; } @@ -558,14 +714,24 @@ boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::SessionOplogIte try { uassert(ErrorCodes::IncompleteTransactionHistory, str::stream() << "Cannot migrate multi-statement transaction state", - !_record.getState()); + _entryType == SessionOplogIterator::EntryType::kRetryableWrite || + _entryType == SessionOplogIterator::EntryType::kRetryableTransaction); // Note: during SessionCatalogMigrationSource::init, we inserted a document and wait for it // to committed to the majority. In addition, the TransactionHistoryIterator uses OpTime // to query for the oplog. This means that if we can successfully fetch the oplog, we are // guaranteed that they are majority committed. If we can't fetch the oplog, it can either // mean that the oplog has been rolled over or was rolled back. - return _writeHistoryIterator->next(opCtx); + auto nextOplog = _writeHistoryIterator->next(opCtx); + + if (_entryType == SessionOplogIterator::EntryType::kRetryableTransaction) { + if (nextOplog.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction) { + return getNext(opCtx); + } + + invariant(nextOplog.getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + } + return nextOplog; } catch (const AssertionException& excep) { if (excep.code() == ErrorCodes::IncompleteTransactionHistory) { // Note: no need to check if in replicaSet mode because having an iterator implies diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 3603ddfb055..f3b87baf7fe 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -177,6 +177,8 @@ private: */ class SessionOplogIterator { public: + enum class EntryType { kRetryableWrite, kNonRetryableTransaction, kRetryableTransaction }; + SessionOplogIterator(SessionTxnRecord txnRecord, int expectedRollbackId); /** @@ -199,6 +201,8 @@ private: private: const SessionTxnRecord _record; const int _initialRollbackId; + const EntryType _entryType; + std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator; }; @@ -242,6 +246,23 @@ private: */ bool _fetchNextNewWriteOplog(OperationContext* opCtx); + /** + * Same as notifyNewWriteOpTime but must be called while holding the _newOplogMutex. + */ + void _notifyNewWriteOpTime(WithLock, + repl::OpTime opTimestamp, + EntryAtOpTimeType entryAtOpTimeType); + + /* + * Derives retryable write oplog entries from the given retryable internal transaction applyOps + * oplog entry, and adds the ones that are related to the migration the given oplog buffer. Must + * be called while holding the mutex that protects the buffer. + */ + void _extractOplogEntriesForInternalTransactionForRetryableWrite( + WithLock, + const repl::OplogEntry& applyOplogEntry, + std::vector<repl::OplogEntry>* oplogBuffer); + // Namespace for which the migration is happening const NamespaceString _ns; @@ -252,26 +273,31 @@ private: const ChunkRange _chunkRange; const ShardKeyPattern _keyPattern; - // Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator, - // _lastFetchedOplogBuffer, _lastFetchedOplog + // Protects _sessionOplogIterators, _currentOplogIterator, _lastFetchedOplog, + // _lastFetchedOplogImage and _unprocessedOplogBuffer. Mutex _sessionCloneMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_sessionCloneMutex"); // List of remaining session records that needs to be cloned. std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators; - // Points to the current session record eing cloned. + // Points to the current session record being cloned. std::unique_ptr<SessionOplogIterator> _currentOplogIterator; - // Used for temporarily storng oplog entries for operations that has more than one entry. - // For example, findAndModify generates one for the actual operation and another for the - // pre/post image. - std::vector<repl::OplogEntry> _lastFetchedOplogBuffer; - // Used to store the last fetched oplog. This enables calling get multiple times. + // Used to store the last fetched and processed oplog entry from _currentOplogIterator. This + // enables calling get() multiple times. boost::optional<repl::OplogEntry> _lastFetchedOplog; - // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification + // Used to store the pre/post image for _lastFetchedNewWriteOplog if there is one. + boost::optional<repl::OplogEntry> _lastFetchedOplogImage; + + // Used to store the last fetched oplog entries from _currentOplogIterator that have not been + // processed. + std::vector<repl::OplogEntry> _unprocessedOplogBuffer; + + // Protects _newWriteOpTimeList, _lastFetchedNewWriteOplog, _lastFetchedNewWriteOplogImage, + // _unprocessedNewWriteOplogBuffer, _state, _newOplogNotification. Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex"); // The average size of documents in config.transactions. @@ -280,12 +306,17 @@ private: // Stores oplog opTime of new writes that are coming in. std::list<std::pair<repl::OpTime, EntryAtOpTimeType>> _newWriteOpTimeList; - // Used to store the last fetched oplog from _newWriteTsList. + // Used to store the last fetched and processed oplog entry from _newWriteOpTimeList. This + // enables calling get() multiple times. boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog; - // Used to store an image for `_lastFetchedNewWriteOplog` if there is one. + // Used to store the pre/post image oplog entry when _lastFetchedNewWriteOplog if there is one. boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage; + // Used to store the last fetched oplog entries from _newWriteOpTimeList that have not been + // processed. + std::vector<repl::OplogEntry> _unprocessedNewWriteOplogBuffer; + // Stores the current state. State _state{State::kActive}; diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index f2fe2b99e4a..1427c74bafe 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -36,6 +36,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/mock_repl_coord_server_fixture.h" #include "mongo/db/repl/oplog_entry.h" @@ -56,6 +57,7 @@ namespace { using executor::RemoteCommandRequest; const NamespaceString kNs("a.b"); +const NamespaceString kOtherNs("a.b.c"); const KeyPattern kShardKey(BSON("x" << 1)); const ChunkRange kChunkRange(BSON("x" << 0), BSON("x" << 100)); const KeyPattern kNestedShardKey(BSON("x.y" << 1)); @@ -960,6 +962,263 @@ TEST_F(SessionCatalogMigrationSourceTest, ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } +TEST_F(SessionCatalogMigrationSourceTest, + DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteBasic) { + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op1 = makeDurableReplOp( + repl::OpTypeEnum::kUpdate, kNs, BSON("$set" << BSON("_id" << 1)), BSON("x" << 1), {1}); + // op without stmtId. + auto op2 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 2), BSONObj(), {}); + // op for a different ns. + auto op3 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << 3), BSONObj(), {3}); + auto op4 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 4), BSONObj(), {4}); + // op that does not touch the chunk being migrated. + auto op5 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5}); + // WouldChangeOwningShard sentinel op. + auto op6 = makeDurableReplOp( + repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6}); + + auto applyOpsOpTime1 = repl::OpTime(Timestamp(130, 1), 1); + auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, + {}, // prevOpTime + {op1, op2, op3}, + sessionId, + txnNumber, + false, // isPrepare + true); // isPartial + insertOplogEntry(entry1); + + auto applyOpsOpTime2 = repl::OpTime(Timestamp(130, 2), 1); + auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2, + entry1.getOpTime(), // prevOpTime + {op4, op5, op6}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry2); + + auto applyOpsOpTime3 = repl::OpTime(Timestamp(130, 3), 1); + auto entry3 = makeApplyOpsOplogEntry(applyOpsOpTime3, + entry2.getOpTime(), // prevOpTime + {}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry3); + + migrationSource.notifyNewWriteOpTime( + entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + + const auto expectedSessionId = *getParentSessionId(sessionId); + const auto expectedTxnNumber = *sessionId.getTxnNumber(); + const std::vector<repl::DurableReplOperation> expectedOps{op6, op4, op1}; + + for (const auto& op : expectedOps) { + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), op.toBSON()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, + DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteFetchPrePostImage) { + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto preImageOpTimeForOp2 = repl::OpTime(Timestamp(140, 1), 1); + auto preImageEntryForOp2 = makeOplogEntry(preImageOpTimeForOp2, + repl::OpTypeEnum::kNoop, + BSON("x" << 2), // o + boost::none, // o2 + Date_t::now(), // wallClockTime + sessionId, + txnNumber, + {2}, // stmtIds + {}); // prevOpTime + insertOplogEntry(preImageEntryForOp2); + + auto postImageOpTimeForOp4 = repl::OpTime(Timestamp(140, 2), 1); + auto postImageEntryForOp4 = makeOplogEntry(postImageOpTimeForOp4, + repl::OpTypeEnum::kNoop, + BSON("_id" << 4 << "x" << 4), // o + boost::none, // o2 + Date_t::now(), // wallClockTime, + sessionId, + txnNumber, + {4}, // stmtIds + {}); // prevOpTime + insertOplogEntry(postImageEntryForOp4); + + auto op1 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1}); + auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate, + kNs, + BSON("$set" << BSON("_id" << 2)), + BSON("x" << 2), + {1}, // stmtIds + boost::none, // needsRetryImage + preImageOpTimeForOp2); // preImageOpTime + auto op3 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3}); + auto op4 = makeDurableReplOp(repl::OpTypeEnum::kUpdate, + kNs, + BSON("$set" << BSON("_id" << 4)), + BSON("x" << 4), + {4}, + boost::none, // needsRetryImage + boost::none, // preImageOpTime + postImageOpTimeForOp4); // postImageOpTime + auto op5 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 5), BSONObj(), {5}); + + auto applyOpsOpTime1 = repl::OpTime(Timestamp(140, 3), 1); + auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, + {}, // prevOpTime + {op1, op2, op3}, + sessionId, + txnNumber, + false, // isPrepare + true); // isPartial + insertOplogEntry(entry1); + + auto applyOpsOpTime2 = repl::OpTime(Timestamp(140, 4), 1); + auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2, + entry1.getOpTime(), // prevOpTime + {op4, op5}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry2); + + migrationSource.notifyNewWriteOpTime( + entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + + const auto expectedSessionId = *getParentSessionId(sessionId); + const auto expectedTxnNumber = *sessionId.getTxnNumber(); + const std::vector<repl::DurableReplOperation> expectedOps{ + op5, + postImageEntryForOp4.getDurableReplOperation(), + op4, + op3, + preImageEntryForOp2.getDurableReplOperation(), + op2, + op1}; + + for (const auto& op : expectedOps) { + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + if (nextOplogResult.oplog->getOpType() == repl::OpTypeEnum::kNoop) { + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + } else { + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); + } + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), op.toBSON()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, + DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteForgePrePostImage) { + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + std::vector<repl::RetryImageEnum> cases{repl::RetryImageEnum::kPreImage, + repl::RetryImageEnum::kPostImage}; + auto opTimeSecs = 150; + for (auto imageType : cases) { + const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op1 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1}); + auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate, + kNs, + BSON("$set" << BSON("_id" << 1)), + BSON("x" << 2), + {2}, + imageType /* needsRetryImage */); + auto op3 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3}); + + auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 2), 1); + auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, + {}, // prevOpTime + {op1}, + sessionId, + txnNumber, + false, // isPrepare + true); // isPartial + insertOplogEntry(entry1); + + auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 3), 1); + auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2, + entry1.getOpTime(), // prevOpTime + {op2, op3}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry2); + + repl::ImageEntry imageEntryForOp2; + imageEntryForOp2.set_id(sessionId); + imageEntryForOp2.setTxnNumber(txnNumber); + imageEntryForOp2.setTs(applyOpsOpTime2.getTimestamp()); + imageEntryForOp2.setImageKind(imageType); + imageEntryForOp2.setImage(*op2.getObject2()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kConfigImagesNamespace.ns(), imageEntryForOp2.toBSON()); + + migrationSource.notifyNewWriteOpTime( + entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + + const auto expectedSessionId = *getParentSessionId(sessionId); + const auto expectedTxnNumber = *sessionId.getTxnNumber(); + const auto expectedImageOpForOp2 = makeDurableReplOp( + repl::OpTypeEnum::kNoop, kNs, imageEntryForOp2.getImage(), boost::none, {0}); + const std::vector<repl::DurableReplOperation> expectedOps{ + op3, expectedImageOpForOp2, op2, op1}; + + for (const auto& op : expectedOps) { + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + if (nextOplogResult.oplog->getOpType() == repl::OpTypeEnum::kNoop) { + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + } else { + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); + } + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), + op.toBSON()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + opTimeSecs++; + } +} + TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) { const auto sessionId = makeLogicalSessionIdForTest(); const auto txnNumber = TxnNumber{1}; @@ -1221,6 +1480,352 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreCommittedInternalTransactionForN } TEST_F(SessionCatalogMigrationSourceTest, + DeriveOplogEntriesForCommittedInternalTransactionForRetryableWriteBasic) { + auto opTimeSecs = 210; + + auto runTest = [&](bool isPrepared) { + const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op1 = makeDurableReplOp( + repl::OpTypeEnum::kUpdate, kNs, BSON("$set" << BSON("_id" << 1)), BSON("x" << 1), {1}); + // op without stmtId. + auto op2 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 2), BSONObj(), {}); + // op for a different ns. + auto op3 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << 3), BSONObj(), {3}); + auto op4 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 4), BSONObj(), {4}); + // op that does not touch the chunk being migrated. + auto op5 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5}); + // WouldChangeOwningShard sentinel op. + auto op6 = makeDurableReplOp( + repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6}); + + auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 1), 1); + auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, + {}, // prevOpTime + {op1, op2, op3}, + sessionId, + txnNumber, + false, // isPrepare + true); // isPartial + insertOplogEntry(entry1); + + auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 2), 1); + auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2, + entry1.getOpTime(), // prevOpTime + {op4, op5, op6}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry2); + + auto applyOpsOpTime3 = repl::OpTime(Timestamp(opTimeSecs, 3), 1); + auto entry3 = makeApplyOpsOplogEntry(applyOpsOpTime3, + entry2.getOpTime(), // prevOpTime + {}, + sessionId, + txnNumber, + isPrepared, // isPrepare + false); // isPartial + insertOplogEntry(entry3); + + repl::OpTime lastWriteOpTime; + if (isPrepared) { + auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 4), 1); + auto entry4 = makeCommandOplogEntry(commitOpTime, + entry2.getOpTime(), + BSON("commitTransaction" << 1), + sessionId, + txnNumber); + insertOplogEntry(entry4); + lastWriteOpTime = commitOpTime; + } else { + lastWriteOpTime = applyOpsOpTime3; + } + + SessionTxnRecord txnRecord; + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(lastWriteOpTime); + txnRecord.setLastWriteDate(Date_t::now()); + txnRecord.setState(DurableTxnStateEnum::kCommitted); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + const auto expectedSessionId = *getParentSessionId(sessionId); + const auto expectedTxnNumber = *sessionId.getTxnNumber(); + const std::vector<repl::DurableReplOperation> expectedOps{op6, op4, op1}; + + for (const auto& op : expectedOps) { + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), + op.toBSON()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + opTimeSecs++; + client.remove(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); + }; + + runTest(false /*isPrepared */); + runTest(true /*isPrepared */); +} + +TEST_F(SessionCatalogMigrationSourceTest, + DeriveOplogEntriesForCommittedInternalTransactionForRetryableWriteFetchPrePostImage) { + auto opTimeSecs = 220; + + auto runTest = [&](bool isPrepared) { + const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto preImageOpTimeForOp2 = repl::OpTime(Timestamp(opTimeSecs, 1), 1); + auto preImageEntryForOp2 = makeOplogEntry(preImageOpTimeForOp2, + repl::OpTypeEnum::kNoop, + BSON("x" << 2), // o + boost::none, // o2 + Date_t::now(), // wallClockTime + sessionId, + txnNumber, + {2}, // stmtIds + {}); // prevOpTime + insertOplogEntry(preImageEntryForOp2); + + auto postImageOpTimeForOp4 = repl::OpTime(Timestamp(opTimeSecs, 2), 1); + auto postImageEntryForOp4 = makeOplogEntry(postImageOpTimeForOp4, + repl::OpTypeEnum::kNoop, + BSON("_id" << 4 << "x" << 4), // o + boost::none, // o2 + Date_t::now(), // wallClockTime, + sessionId, + txnNumber, + {4}, // stmtIds + {}); // prevOpTime + insertOplogEntry(postImageEntryForOp4); + + auto op1 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1}); + auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate, + kNs, + BSON("$set" << BSON("_id" << 2)), + BSON("x" << 2), + {2}, // stmtIds + boost::none, // needsRetryImage + preImageOpTimeForOp2); // preImageOpTime + auto op3 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3}); + auto op4 = makeDurableReplOp(repl::OpTypeEnum::kUpdate, + kNs, + BSON("$set" << BSON("_id" << 4)), + BSON("x" << 4), + {4}, + boost::none, // needsRetryImage + boost::none, // preImageOpTime + postImageOpTimeForOp4); // postImageOpTime + auto op5 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 5), BSONObj(), {5}); + + auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 3), 1); + auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, + {}, // prevOpTime + {op1, op2, op3}, + sessionId, + txnNumber, + false, // isPrepare + true); // isPartial + insertOplogEntry(entry1); + + auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 4), 1); + auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2, + entry1.getOpTime(), // prevOpTime + {op4, op5}, + sessionId, + txnNumber, + isPrepared, // isPrepare + false); // isPartial + insertOplogEntry(entry2); + + repl::OpTime lastWriteOpTime; + if (isPrepared) { + auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 5), 1); + auto entry3 = makeCommandOplogEntry(commitOpTime, + entry2.getOpTime(), + BSON("commitTransaction" << 1), + sessionId, + txnNumber); + insertOplogEntry(entry3); + lastWriteOpTime = commitOpTime; + } else { + lastWriteOpTime = applyOpsOpTime2; + } + + SessionTxnRecord txnRecord; + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(lastWriteOpTime); + txnRecord.setLastWriteDate(Date_t::now()); + txnRecord.setState(DurableTxnStateEnum::kCommitted); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + const auto expectedSessionId = *getParentSessionId(sessionId); + const auto expectedTxnNumber = *sessionId.getTxnNumber(); + const std::vector<repl::DurableReplOperation> expectedOps{ + op5, + postImageEntryForOp4.getDurableReplOperation(), + op4, + op3, + preImageEntryForOp2.getDurableReplOperation(), + op2, + op1}; + + for (const auto& op : expectedOps) { + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), + op.toBSON()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + opTimeSecs++; + client.remove(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); + }; + + runTest(false /*isPrepared */); + runTest(true /*isPrepared */); +} + +TEST_F( + SessionCatalogMigrationSourceTest, + DeriveOplogEntriesForCommittedUnpreparedInternalTransactionForRetryableWriteForgePrePostImage) { + auto opTimeSecs = 230; + + std::vector<repl::RetryImageEnum> cases{repl::RetryImageEnum::kPreImage, + repl::RetryImageEnum::kPostImage}; + auto runTest = [&](bool isPrepared) { + for (auto imageType : cases) { + const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op1 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1}); + auto op2 = makeDurableReplOp(repl::OpTypeEnum::kUpdate, + kNs, + BSON("$set" << BSON("_id" << 2)), + BSON("x" << 2), + {2}, + imageType /* needsRetryImage */); + auto op3 = + makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 3), BSONObj(), {3}); + + auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 2), 1); + auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, + {}, // prevOpTime + {op1}, + sessionId, + txnNumber, + false, // isPrepare + true); // isPartial + insertOplogEntry(entry1); + + auto applyOpsOpTime2 = repl::OpTime(Timestamp(opTimeSecs, 3), 1); + auto entry2 = makeApplyOpsOplogEntry(applyOpsOpTime2, + entry1.getOpTime(), // prevOpTime + {op2, op3}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry2); + + repl::OpTime lastWriteOpTime; + if (isPrepared) { + auto commitOpTime = repl::OpTime(Timestamp(opTimeSecs, 4), 1); + auto entry3 = makeCommandOplogEntry(commitOpTime, + entry2.getOpTime(), + BSON("commitTransaction" << 1), + sessionId, + txnNumber); + insertOplogEntry(entry3); + lastWriteOpTime = commitOpTime; + } else { + lastWriteOpTime = applyOpsOpTime2; + } + + DBDirectClient client(opCtx()); + + SessionTxnRecord txnRecord; + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(lastWriteOpTime); + txnRecord.setLastWriteDate(Date_t::now()); + txnRecord.setState(DurableTxnStateEnum::kCommitted); + + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), + txnRecord.toBSON()); + + repl::ImageEntry imageEntryForOp2; + imageEntryForOp2.set_id(sessionId); + imageEntryForOp2.setTxnNumber(txnNumber); + imageEntryForOp2.setTs(applyOpsOpTime2.getTimestamp()); + imageEntryForOp2.setImageKind(imageType); + imageEntryForOp2.setImage(*op2.getObject2()); + + client.insert(NamespaceString::kConfigImagesNamespace.ns(), imageEntryForOp2.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + const auto expectedSessionId = *getParentSessionId(sessionId); + const auto expectedTxnNumber = *sessionId.getTxnNumber(); + const auto expectedImageOpForOp2 = makeDurableReplOp( + repl::OpTypeEnum::kNoop, kNs, imageEntryForOp2.getImage(), boost::none, {0}); + const std::vector<repl::DurableReplOperation> expectedOps{ + op3, expectedImageOpForOp2, op2, op1}; + + for (const auto& op : expectedOps) { + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), expectedSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), expectedTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), + op.toBSON()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + opTimeSecs++; + client.remove(NamespaceString::kSessionTransactionsTableNamespace.ns(), + txnRecord.toBSON()); + } + }; + + runTest(false /*isPrepared */); + runTest(true /*isPrepared */); +} + +TEST_F(SessionCatalogMigrationSourceTest, ReturnDeadEndSentinelOplogEntryForPreparedNonInternalTransaction) { const auto sessionId = makeLogicalSessionIdForTest(); const auto txnNumber = TxnNumber{1}; |