summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2021-04-26 20:06:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-27 22:55:12 +0000
commit70a27d12642793fed2b05da116910799ea67c4f6 (patch)
tree16da3e81bd60a6dac282b74bd4de62919ed4d3e2 /src/mongo
parent3fc7aedd7469e5bb0dfd53280c4845b6b5c39377 (diff)
downloadmongo-70a27d12642793fed2b05da116910799ea67c4f6.tar.gz
SERVER-55305 Do not coalesce updates across different txnNumbers for retryableWrites
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp8
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h4
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp50
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp30
-rw-r--r--src/mongo/db/repl/session_update_tracker.h6
5 files changed, 85 insertions, 13 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 6a4d08d8c37..93072539fba 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -734,6 +734,14 @@ void OplogApplierImpl::fillWriterVectors(
}
}
+void OplogApplierImpl::fillWriterVectors_forTest(
+ OperationContext* opCtx,
+ std::vector<OplogEntry>* ops,
+ std::vector<std::vector<const OplogEntry*>>* writerVectors,
+ std::vector<std::vector<OplogEntry>>* derivedOps) noexcept {
+ fillWriterVectors(opCtx, ops, writerVectors, derivedOps);
+}
+
Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx,
const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
OplogApplication::Mode oplogApplicationMode) {
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index a43e18dc41f..9eb1b15a169 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -74,6 +74,10 @@ public:
const Options& options,
ThreadPool* writerPool);
+ void fillWriterVectors_forTest(OperationContext* opCtx,
+ std::vector<OplogEntry>* ops,
+ std::vector<std::vector<const OplogEntry*>>* writerVectors,
+ std::vector<std::vector<OplogEntry>>* derivedOps) noexcept;
private:
/**
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index 90c142b60a4..ece2a1f0e87 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -484,6 +484,56 @@ TEST_F(OplogApplierImplTest,
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
}
+TEST_F(OplogApplierImplTest,
+ TxnTableUpdatesDoNotGetCoalescedForRetryableWritesAcrossDifferentTxnNumbers) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ const NamespaceString& nss{"test", "foo"};
+ repl::OpTime firstInsertOpTime(Timestamp(1, 0), 1);
+ auto firstRetryableOp = makeInsertDocumentOplogEntryWithSessionInfo(
+ firstInsertOpTime, nss, BSON("_id" << 1), sessionInfo);
+
+ repl::OpTime secondInsertOpTime(Timestamp(2, 0), 1);
+ sessionInfo.setTxnNumber(4);
+ auto secondRetryableOp = makeInsertDocumentOplogEntryWithSessionInfo(
+ secondInsertOpTime, nss, BSON("_id" << 2), sessionInfo);
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ std::vector<std::vector<const OplogEntry*>> writerVectors(writerPool->getStats().numThreads);
+ std::vector<std::vector<OplogEntry>> derivedOps;
+ std::vector<OplogEntry> ops{firstRetryableOp, secondRetryableOp};
+ oplogApplier.fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+ // We expect a total of two derived ops - one for each distinct 'txnNumber'.
+ ASSERT_EQUALS(2, derivedOps.size());
+ ASSERT_EQUALS(1, derivedOps[0].size());
+ ASSERT_EQUALS(1, derivedOps[1].size());
+ const auto firstDerivedOp = derivedOps[0][0];
+ ASSERT_EQUALS(firstInsertOpTime.getTimestamp(),
+ firstDerivedOp.getObject()["lastWriteOpTime"]["ts"].timestamp());
+ ASSERT_EQUALS(NamespaceString::kSessionTransactionsTableNamespace, firstDerivedOp.getNss());
+ ASSERT_EQUALS(*firstRetryableOp.getTxnNumber(),
+ firstDerivedOp.getObject()["txnNum"].numberInt());
+ const auto secondDerivedOp = derivedOps[1][0];
+ ASSERT_EQUALS(*secondRetryableOp.getTxnNumber(),
+ secondDerivedOp.getObject()["txnNum"].numberInt());
+ ASSERT_EQUALS(NamespaceString::kSessionTransactionsTableNamespace, secondDerivedOp.getNss());
+ ASSERT_EQUALS(secondInsertOpTime.getTimestamp(),
+ secondDerivedOp.getObject()["lastWriteOpTime"]["ts"].timestamp());
+}
+
class MultiOplogEntryOplogApplierImplTest : public OplogApplierImplTest {
public:
MultiOplogEntryOplogApplierImplTest()
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index 96363d75e63..a3470efaa50 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -161,8 +161,7 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateOrFlush(
return _flush(entry);
}
- _updateSessionInfo(entry);
- return boost::none;
+ return _updateSessionInfo(entry);
}
boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession(
@@ -187,11 +186,12 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession(
return boost::none;
}
-void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) {
+boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateSessionInfo(
+ const OplogEntry& entry) {
const auto& sessionInfo = entry.getOperationSessionInfo();
if (!sessionInfo.getTxnNumber()) {
- return;
+ return {};
}
const auto& lsid = sessionInfo.getSessionId();
@@ -200,24 +200,33 @@ void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) {
// Ignore pre/post image no-op oplog entries. These entries will not have an o2 field.
if (entry.getOpType() == OpTypeEnum::kNoop) {
if (!entry.getFromMigrate() || !*entry.getFromMigrate()) {
- return;
+ return {};
}
if (!entry.getObject2()) {
- return;
+ return {};
}
}
auto iter = _sessionsToUpdate.find(*lsid);
if (iter == _sessionsToUpdate.end()) {
_sessionsToUpdate.emplace(*lsid, entry);
- return;
+ return {};
}
const auto& existingSessionInfo = iter->second.getOperationSessionInfo();
- if (*sessionInfo.getTxnNumber() >= *existingSessionInfo.getTxnNumber()) {
+ const auto existingTxnNumber = *existingSessionInfo.getTxnNumber();
+ if (*sessionInfo.getTxnNumber() == existingTxnNumber) {
+ iter->second = entry;
+ return {};
+ }
+
+ if (*sessionInfo.getTxnNumber() > existingTxnNumber) {
+ // Do not coalesce updates across txn numbers. For more details, see SERVER-55305.
+ auto updateOplog = createMatchingTransactionTableUpdate(iter->second);
+ invariant(updateOplog);
iter->second = entry;
- return;
+ return std::vector<OplogEntry>{std::move(*updateOplog)};
}
LOGV2_FATAL_NOTRACE(50843,
@@ -226,8 +235,7 @@ void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) {
"oplog entry: {existingEntry}",
"lsid"_attr = lsid->toBSON(),
"sessionInfo_getTxnNumber"_attr = *sessionInfo.getTxnNumber(),
- "existingSessionInfo_getTxnNumber"_attr =
- *existingSessionInfo.getTxnNumber(),
+ "existingSessionInfo_getTxnNumber"_attr = existingTxnNumber,
"newEntry"_attr = redact(entry.toBSONForLogging()),
"existingEntry"_attr = redact(iter->second.toBSONForLogging()));
}
diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h
index 2ef43795ff7..b792a3b3235 100644
--- a/src/mongo/db/repl/session_update_tracker.h
+++ b/src/mongo/db/repl/session_update_tracker.h
@@ -85,9 +85,11 @@ private:
std::vector<OplogEntry> _flushForQueryPredicate(const BSONObj& queryPredicate);
/**
- * Extract transaction information from the oplog if any and records them internally.
+ * Extract transaction information from the oplog if any and records them internally. Returns
+ * a list of 'config.transactions' table updates to be flushed if 'entry' has a 'txnNumber'
+ * greater than the currently stored session information.
*/
- void _updateSessionInfo(const OplogEntry& entry);
+ boost::optional<std::vector<OplogEntry>> _updateSessionInfo(const OplogEntry& entry);
/**
* Inspects the oplog entry and determines whether this needs to update the session info or