diff options
author | Jason Chan <jason.chan@mongodb.com> | 2021-04-26 20:06:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-27 22:55:12 +0000 |
commit | 70a27d12642793fed2b05da116910799ea67c4f6 (patch) | |
tree | 16da3e81bd60a6dac282b74bd4de62919ed4d3e2 /src/mongo/db | |
parent | 3fc7aedd7469e5bb0dfd53280c4845b6b5c39377 (diff) | |
download | mongo-70a27d12642793fed2b05da116910799ea67c4f6.tar.gz |
SERVER-55305 Do not coalesce updates across different txnNumbers for retryableWrites
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.h | 6 |
5 files changed, 85 insertions, 13 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 6a4d08d8c37..93072539fba 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -734,6 +734,14 @@ void OplogApplierImpl::fillWriterVectors( } } +void OplogApplierImpl::fillWriterVectors_forTest( + OperationContext* opCtx, + std::vector<OplogEntry>* ops, + std::vector<std::vector<const OplogEntry*>>* writerVectors, + std::vector<std::vector<OplogEntry>>* derivedOps) noexcept { + fillWriterVectors(opCtx, ops, writerVectors, derivedOps); +} + Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx, const OplogEntryOrGroupedInserts& entryOrGroupedInserts, OplogApplication::Mode oplogApplicationMode) { diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index a43e18dc41f..9eb1b15a169 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -74,6 +74,10 @@ public: const Options& options, ThreadPool* writerPool); + void fillWriterVectors_forTest(OperationContext* opCtx, + std::vector<OplogEntry>* ops, + std::vector<std::vector<const OplogEntry*>>* writerVectors, + std::vector<std::vector<OplogEntry>>* derivedOps) noexcept; private: /** diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index 90c142b60a4..ece2a1f0e87 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -484,6 +484,56 @@ TEST_F(OplogApplierImplTest, ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } +TEST_F(OplogApplierImplTest, + TxnTableUpdatesDoNotGetCoalescedForRetryableWritesAcrossDifferentTxnNumbers) { + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + const NamespaceString& nss{"test", "foo"}; + repl::OpTime firstInsertOpTime(Timestamp(1, 0), 1); + auto firstRetryableOp = makeInsertDocumentOplogEntryWithSessionInfo( + firstInsertOpTime, nss, BSON("_id" << 1), sessionInfo); + + repl::OpTime secondInsertOpTime(Timestamp(2, 0), 1); + sessionInfo.setTxnNumber(4); + auto secondRetryableOp = makeInsertDocumentOplogEntryWithSessionInfo( + secondInsertOpTime, nss, BSON("_id" << 2), sessionInfo); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + std::vector<std::vector<const OplogEntry*>> writerVectors(writerPool->getStats().numThreads); + std::vector<std::vector<OplogEntry>> derivedOps; + std::vector<OplogEntry> ops{firstRetryableOp, secondRetryableOp}; + oplogApplier.fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps); + // We expect a total of two derived ops - one for each distinct 'txnNumber'. + ASSERT_EQUALS(2, derivedOps.size()); + ASSERT_EQUALS(1, derivedOps[0].size()); + ASSERT_EQUALS(1, derivedOps[1].size()); + const auto firstDerivedOp = derivedOps[0][0]; + ASSERT_EQUALS(firstInsertOpTime.getTimestamp(), + firstDerivedOp.getObject()["lastWriteOpTime"]["ts"].timestamp()); + ASSERT_EQUALS(NamespaceString::kSessionTransactionsTableNamespace, firstDerivedOp.getNss()); + ASSERT_EQUALS(*firstRetryableOp.getTxnNumber(), + firstDerivedOp.getObject()["txnNum"].numberInt()); + const auto secondDerivedOp = derivedOps[1][0]; + ASSERT_EQUALS(*secondRetryableOp.getTxnNumber(), + secondDerivedOp.getObject()["txnNum"].numberInt()); + ASSERT_EQUALS(NamespaceString::kSessionTransactionsTableNamespace, secondDerivedOp.getNss()); + ASSERT_EQUALS(secondInsertOpTime.getTimestamp(), + secondDerivedOp.getObject()["lastWriteOpTime"]["ts"].timestamp()); +} + class MultiOplogEntryOplogApplierImplTest : public OplogApplierImplTest { public: MultiOplogEntryOplogApplierImplTest() diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index 96363d75e63..a3470efaa50 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -161,8 +161,7 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateOrFlush( return _flush(entry); } - _updateSessionInfo(entry); - return boost::none; + return _updateSessionInfo(entry); } boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession( @@ -187,11 +186,12 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession( return boost::none; } -void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { +boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateSessionInfo( + const OplogEntry& entry) { const auto& sessionInfo = entry.getOperationSessionInfo(); if (!sessionInfo.getTxnNumber()) { - return; + return {}; } const auto& lsid = sessionInfo.getSessionId(); @@ -200,24 +200,33 @@ void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { // Ignore pre/post image no-op oplog entries. These entries will not have an o2 field. if (entry.getOpType() == OpTypeEnum::kNoop) { if (!entry.getFromMigrate() || !*entry.getFromMigrate()) { - return; + return {}; } if (!entry.getObject2()) { - return; + return {}; } } auto iter = _sessionsToUpdate.find(*lsid); if (iter == _sessionsToUpdate.end()) { _sessionsToUpdate.emplace(*lsid, entry); - return; + return {}; } const auto& existingSessionInfo = iter->second.getOperationSessionInfo(); - if (*sessionInfo.getTxnNumber() >= *existingSessionInfo.getTxnNumber()) { + const auto existingTxnNumber = *existingSessionInfo.getTxnNumber(); + if (*sessionInfo.getTxnNumber() == existingTxnNumber) { + iter->second = entry; + return {}; + } + + if (*sessionInfo.getTxnNumber() > existingTxnNumber) { + // Do not coalesce updates across txn numbers. For more details, see SERVER-55305. + auto updateOplog = createMatchingTransactionTableUpdate(iter->second); + invariant(updateOplog); iter->second = entry; - return; + return std::vector<OplogEntry>{std::move(*updateOplog)}; } LOGV2_FATAL_NOTRACE(50843, @@ -226,8 +235,7 @@ void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { "oplog entry: {existingEntry}", "lsid"_attr = lsid->toBSON(), "sessionInfo_getTxnNumber"_attr = *sessionInfo.getTxnNumber(), - "existingSessionInfo_getTxnNumber"_attr = - *existingSessionInfo.getTxnNumber(), + "existingSessionInfo_getTxnNumber"_attr = existingTxnNumber, "newEntry"_attr = redact(entry.toBSONForLogging()), "existingEntry"_attr = redact(iter->second.toBSONForLogging())); } diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h index 2ef43795ff7..b792a3b3235 100644 --- a/src/mongo/db/repl/session_update_tracker.h +++ b/src/mongo/db/repl/session_update_tracker.h @@ -85,9 +85,11 @@ private: std::vector<OplogEntry> _flushForQueryPredicate(const BSONObj& queryPredicate); /** - * Extract transaction information from the oplog if any and records them internally. + * Extract transaction information from the oplog if any and records them internally. Returns + * a list of 'config.transactions' table updates to be flushed if 'entry' has a 'txnNumber' + * greater than the currently stored session information. */ - void _updateSessionInfo(const OplogEntry& entry); + boost::optional<std::vector<OplogEntry>> _updateSessionInfo(const OplogEntry& entry); /** * Inspects the oplog entry and determines whether this needs to update the session info or |