summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2018-03-01 14:03:07 -0500
committerRandolph Tan <randolph@10gen.com>2018-03-12 13:29:33 -0400
commitf23b7e7bf3a2960b76a29709a7adeba0effa6b42 (patch)
treed1d8e4f95f9b7a5f2cb86514732ed58a2f5b513e /src/mongo
parentd3dcbd8a7c07f8c60b2b1e4da935fdcab53b9267 (diff)
downloadmongo-f23b7e7bf3a2960b76a29709a7adeba0effa6b42.tar.gz
SERVER-32445 config.transactions table can get out of sync when the TransactionReaper remove entries
Secondary replication of config.transactions table is now changed to create oplog entries of the actual updates to mirror what the primary would have done.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp1
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp1
-rw-r--r--src/mongo/db/repl/apply_ops_test.cpp1
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp1
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp1
-rw-r--r--src/mongo/db/repl/multiapplier_test.cpp1
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp7
-rw-r--r--src/mongo/db/repl/oplog_entry.h1
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp1
-rw-r--r--src/mongo/db/repl/sync_tail.cpp81
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp374
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp1
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp1
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp1
-rw-r--r--src/mongo/db/session.cpp89
-rw-r--r--src/mongo/db/session.h17
-rw-r--r--src/mongo/db/session_test.cpp1
-rw-r--r--src/mongo/db/transaction_history_iterator_test.cpp1
-rw-r--r--src/mongo/dbtests/repltests.cpp1
20 files changed, 365 insertions, 218 deletions
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index b7c403cf0f4..f1371a79e21 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -68,6 +68,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
oField, // o
o2Field, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 157c4157a02..4310dbb5419 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -92,6 +92,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTypeEnum opType,
object, // o
object2, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
index 6d729dd7857..2b0696c8531 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
@@ -57,6 +57,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
object, // o
boost::none, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp
index 115a68f8e36..ada452132ea 100644
--- a/src/mongo/db/repl/apply_ops_test.cpp
+++ b/src/mongo/db/repl/apply_ops_test.cpp
@@ -360,6 +360,7 @@ OplogEntry makeOplogEntry(OpTypeEnum opType, const BSONObj& oField) {
oField, // o
boost::none, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 8dc9b4823e5..61f02153c77 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -82,6 +82,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
object, // o
object2, // o2
sessionInfo, // sessionInfo
+ boost::none, // upsert
wallClockTime, // wall clock time
stmtId, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 04ce849a71e..26735134ade 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -544,6 +544,7 @@ OplogEntry makeOplogEntry(int t,
oField, // o
boost::none, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp
index 023d99d0796..7cabe3df3fa 100644
--- a/src/mongo/db/repl/multiapplier_test.cpp
+++ b/src/mongo/db/repl/multiapplier_test.cpp
@@ -77,6 +77,7 @@ OplogEntry makeOplogEntry(int ts) {
BSONObj(), // o
boost::none, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 722ed0e596c..ce474f04976 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -87,6 +87,7 @@ BSONObj makeOplogEntryDoc(OpTime opTime,
const BSONObj& oField,
const boost::optional<BSONObj>& o2Field,
const OperationSessionInfo& sessionInfo,
+ const boost::optional<bool>& isUpsert,
const boost::optional<mongo::Date_t>& wallClockTime,
const boost::optional<StmtId>& statementId,
const boost::optional<OpTime>& prevWriteOpTimeInTransaction,
@@ -110,6 +111,10 @@ BSONObj makeOplogEntryDoc(OpTime opTime,
if (o2Field) {
builder.append(OplogEntryBase::kObject2FieldName, o2Field.get());
}
+ if (isUpsert) {
+ invariant(o2Field);
+ builder.append(OplogEntryBase::kUpsertFieldName, isUpsert.get());
+ }
if (wallClockTime) {
builder.append(OplogEntryBase::kWallClockTimeFieldName, wallClockTime.get());
}
@@ -202,6 +207,7 @@ OplogEntry::OplogEntry(OpTime opTime,
const BSONObj& oField,
const boost::optional<BSONObj>& o2Field,
const OperationSessionInfo& sessionInfo,
+ const boost::optional<bool>& isUpsert,
const boost::optional<mongo::Date_t>& wallClockTime,
const boost::optional<StmtId>& statementId,
const boost::optional<OpTime>& prevWriteOpTimeInTransaction,
@@ -217,6 +223,7 @@ OplogEntry::OplogEntry(OpTime opTime,
oField,
o2Field,
sessionInfo,
+ isUpsert,
wallClockTime,
statementId,
prevWriteOpTimeInTransaction,
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 6325891d6d7..455d2af5af7 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -85,6 +85,7 @@ public:
const BSONObj& oField,
const boost::optional<BSONObj>& o2Field,
const OperationSessionInfo& sessionInfo,
+ const boost::optional<bool>& isUpsert,
const boost::optional<mongo::Date_t>& wallClockTime,
const boost::optional<StmtId>& statementId,
const boost::optional<OpTime>& prevWriteOpTimeInTransaction,
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index e6639fee1c9..4a7ff632e0e 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -307,6 +307,7 @@ BSONObj _makeOplogEntry(Timestamp ts, long long term) {
BSONObj(), // o
boost::none, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6a1acb6e439..ac44798949b 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -515,37 +515,6 @@ void scheduleWritesToOplog(OperationContext* opCtx,
}
}
-using SessionRecordMap =
- stdx::unordered_map<LogicalSessionId, SessionTxnRecord, LogicalSessionIdHash>;
-
-void scheduleTxnTableUpdates(OperationContext* opCtx,
- ThreadPool* threadPool,
- const SessionRecordMap& latestRecords) {
- for (const auto& it : latestRecords) {
- auto& record = it.second;
-
- invariantOK(threadPool->schedule([&record]() {
- auto opCtx = cc().makeOperationContext();
- ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
- opCtx->lockState());
- Session::updateSessionRecordOnSecondary(opCtx.get(), record);
- }));
- }
-}
-
-/**
- * A session txn record is greater (i.e. later) than another if its transaction number is greater,
- * or if its transaction number is the same, and its last write optime is greater.
- *
- * Records can only be compared meaningfully if they are for the same session id.
- */
-bool isSessionTxnRecordLaterThan(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) {
- invariant(lhs.getSessionId() == rhs.getSessionId());
-
- return (lhs.getTxnNum() > rhs.getTxnNum()) ||
- (lhs.getTxnNum() == rhs.getTxnNum() && lhs.getLastWriteOpTime() > rhs.getLastWriteOpTime());
-}
-
/**
* Caches per-collection properties which are relevant for oplog application, so that they don't
* have to be retrieved repeatedly for each op.
@@ -662,42 +631,6 @@ void fillWriterVectors(OperationContext* opCtx,
}
}
-/**
- * Returns a map of the "latest" transaction table records for each logical session id present in
- * the given operations. Each record represents the final state of the transaction table entry for
- * that session id after the operations are applied.
- */
-SessionRecordMap getLatestSessionRecords(const MultiApplier::Operations& ops) {
- SessionRecordMap latestSessionRecords;
-
- for (auto&& op : ops) {
- const auto& sessionInfo = op.getOperationSessionInfo();
- // Do not write session table entries for applyOps, as multi-document transactions
- // and retryable writes do not work together.
- // TODO(SERVER-33501): Make multi-docunment transactions work with retryable writes.
- if (sessionInfo.getTxnNumber() &&
- (!op.isCommand() || op.getCommandType() != OplogEntry::CommandType::kApplyOps)) {
- const auto& lsid = *sessionInfo.getSessionId();
-
- SessionTxnRecord record;
- record.setSessionId(lsid);
- record.setTxnNum(*sessionInfo.getTxnNumber());
- record.setLastWriteOpTime(op.getOpTime());
- invariant(op.getWallClockTime());
- record.setLastWriteDate(*op.getWallClockTime());
-
- auto it = latestSessionRecords.find(lsid);
- if (it == latestSessionRecords.end()) {
- latestSessionRecords.emplace(lsid, std::move(record));
- } else if (isSessionTxnRecordLaterThan(record, it->second)) {
- latestSessionRecords[lsid] = std::move(record);
- }
- }
- }
-
- return latestSessionRecords;
-}
-
} // namespace
/**
@@ -1412,8 +1345,15 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
// 'applyOpsOperations' have been applied.
std::vector<MultiApplier::Operations> applyOpsOperations;
+ // Normal writes to config.transactions in the primary don't create an oplog entry.
+ // Reconstruct these ops so config.transactions will be replicated correctly.
+ // Need to create a new copy of ops vector because the workerPool is also concurrently
+ // reading it and we don't want the new oplog entries to get written to the actual
+ // oplog.rs collection.
+ auto opsWithTxnUpdates = Session::addOpsForReplicatingTxnTable(ops);
+
std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &applyOpsOperations);
+ fillWriterVectors(opCtx, &opsWithTxnUpdates, &writerVectors, &applyOpsOperations);
// Wait for writes to finish before applying ops.
workerPool->waitForIdle();
@@ -1442,11 +1382,6 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
}
}
- // Update the transaction table to point to the latest oplog entries for each session id.
- const auto latestSessionRecords = getLatestSessionRecords(ops);
- scheduleTxnTableUpdates(opCtx, workerPool, latestSessionRecords);
- workerPool->waitForIdle();
-
// Notify the storage engine that a replication batch has completed.
// This means that all the writes associated with the oplog entries in the batch are
// finished and no new writes with timestamps associated with those oplog entries will show
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 28c5ef44156..e952d718967 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -90,6 +90,7 @@ repl::OplogEntry makeOplogEntry(NamespaceString nss) {
BSONObj(), // o
boost::none, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction
@@ -195,7 +196,6 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) {
return OpTime(tsArray.Array()[elem].timestamp(), termArray.Array()[elem].Long());
};
-
TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) {
const BSONObj op = BSON("op"
<< "x");
@@ -667,119 +667,6 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
ASSERT_EQUALS(op2, unittest::assertGet(OplogEntry::parse(operationsWrittenToOplog[1].doc)));
}
-TEST_F(SyncTailTest, MultiApplyUpdatesTheTransactionTable) {
- // Set up the transactions collection, which can only be done by the primary.
- ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY));
- SessionCatalog::create(_opCtx->getServiceContext());
- SessionCatalog::get(_opCtx->getServiceContext())->onStepUp(_opCtx.get());
- ON_BLOCK_EXIT([&] { SessionCatalog::reset_forTest(_opCtx->getServiceContext()); });
- ASSERT_OK(
- ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_SECONDARY));
-
- // Entries with a session id and a txnNumber update the transaction table.
- auto lsidSingle = makeLogicalSessionIdForTest();
- auto opSingle =
- makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 0), 1LL},
- NamespaceString("test.0"),
- BSON("x" << 1),
- lsidSingle,
- 5LL,
- 0);
-
- // For entries with the same session, the entry with a larger txnNumber is saved.
- auto lsidDiffTxn = makeLogicalSessionIdForTest();
- auto opDiffTxnSmaller =
- makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 0), 1LL},
- NamespaceString("test.1"),
- BSON("x" << 0),
- lsidDiffTxn,
- 10LL,
- 1);
- auto opDiffTxnLarger =
- makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(3), 0), 1LL},
- NamespaceString("test.1"),
- BSON("x" << 1),
- lsidDiffTxn,
- 20LL,
- 1);
-
- // For entries with the same session and txnNumber, the later optime is saved.
- auto lsidSameTxn = makeLogicalSessionIdForTest();
- auto opSameTxnLater =
- makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(6), 0), 1LL},
- NamespaceString("test.2"),
- BSON("x" << 0),
- lsidSameTxn,
- 30LL,
- 0);
- auto opSameTxnSooner =
- makeInsertDocumentOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(5), 0), 1LL},
- NamespaceString("test.2"),
- BSON("x" << 1),
- lsidSameTxn,
- 30LL,
- 1);
-
- // Entries with a session id but no txnNumber do not lead to updates.
- auto lsidNoTxn = makeLogicalSessionIdForTest();
- OperationSessionInfo info;
- info.setSessionId(lsidNoTxn);
- auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo(
- {Timestamp(Seconds(7), 0), 1LL}, NamespaceString("test.3"), BSON("x" << 0), info);
-
- // Apply the batch and verify the transaction collection was properly updated for each scenario.
- auto writerPool = SyncTail::makeWriterPool();
- ASSERT_OK(multiApply(
- _opCtx.get(),
- writerPool.get(),
- {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnLater, opSameTxnSooner, opNoTxn},
- noopApplyOperationFn));
-
- DBDirectClient client(_opCtx.get());
-
- // The txnNum and optime of the only write were saved.
- auto resultSingleDoc =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON()));
- ASSERT_TRUE(!resultSingleDoc.isEmpty());
-
- auto resultSingle =
- SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc);
-
- ASSERT_EQ(resultSingle.getTxnNum(), 5LL);
- ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1));
-
- // The txnNum and optime of the write with the larger txnNum were saved.
- auto resultDiffTxnDoc =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON()));
- ASSERT_TRUE(!resultDiffTxnDoc.isEmpty());
-
- auto resultDiffTxn =
- SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc);
-
- ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL);
- ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1));
-
- // The txnNum and optime of the write with the later optime were saved.
- auto resultSameTxnDoc =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON()));
- ASSERT_TRUE(!resultSameTxnDoc.isEmpty());
-
- auto resultSameTxn =
- SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc);
-
- ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL);
- ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1));
-
- // There is no entry for the write with no txnNumber.
- auto resultNoTxn =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON()));
- ASSERT_TRUE(resultNoTxn.isEmpty());
-}
-
TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
@@ -1814,6 +1701,265 @@ TEST_F(SyncTailTest, DropDatabaseSucceedsInRecovering) {
ASSERT_OK(runOpSteadyState(op));
}
+class SyncTailTxnTableTest : public SyncTailTest {
+public:
+ void setUp() override {
+ SyncTailTest::setUp();
+
+ SessionCatalog::create(_opCtx->getServiceContext());
+ SessionCatalog::get(_opCtx->getServiceContext())->onStepUp(_opCtx.get());
+
+ DBDirectClient client(_opCtx.get());
+ BSONObj result;
+ ASSERT(client.runCommand(kNs.db().toString(), BSON("create" << kNs.coll()), result));
+ }
+ void tearDown() override {
+ SessionCatalog::reset_forTest(_opCtx->getServiceContext());
+ SyncTailTest::tearDown();
+ }
+
+ /**
+ * Creates an OplogEntry with given parameters and preset defaults for this test suite.
+ */
+ repl::OplogEntry makeOplogEntry(const NamespaceString& ns,
+ repl::OpTime opTime,
+ repl::OpTypeEnum opType,
+ BSONObj object,
+ boost::optional<BSONObj> object2,
+ const OperationSessionInfo& sessionInfo,
+ Date_t wallClockTime) {
+ return repl::OplogEntry(opTime, // optime
+ 0, // hash
+ opType, // opType
+ ns, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ 0, // version
+ object, // o
+ object2, // o2
+ sessionInfo, // sessionInfo
+ boost::none, // false
+ wallClockTime, // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+ }
+
+ void checkTxnTable(const OperationSessionInfo& sessionInfo,
+ const repl::OpTime& expectedOpTime,
+ Date_t expectedWallClock) {
+ invariant(sessionInfo.getSessionId());
+ invariant(sessionInfo.getTxnNumber());
+
+ DBDirectClient client(_opCtx.get());
+ auto result = client.findOne(
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())});
+ ASSERT_FALSE(result.isEmpty());
+
+ auto txnRecord =
+ SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result);
+
+ ASSERT_EQ(*sessionInfo.getTxnNumber(), txnRecord.getTxnNum());
+ ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime());
+ ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate());
+ }
+
+ static const NamespaceString& nss() {
+ return kNs;
+ }
+
+private:
+ static const NamespaceString kNs;
+};
+
+const NamespaceString SyncTailTxnTableTest::kNs("test.foo");
+
+TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ const auto date = Date_t::now();
+
+ auto insertOp = makeOplogEntry(nss(),
+ {Timestamp(1, 0), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto writerPool = SyncTail::makeWriterPool();
+ SyncTail syncTail(nullptr, multiSyncApply, writerPool.get());
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}));
+
+ checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date);
+}
+
+TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ const auto date = Date_t::now();
+
+ auto insertOp = makeOplogEntry(nss(),
+ {Timestamp(1, 0), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
+ {Timestamp(2, 0), 1},
+ repl::OpTypeEnum::kDelete,
+ BSON("_id" << sessionInfo.getSessionId()->toBSON()),
+ boost::none,
+ {},
+ Date_t::now());
+
+ auto writerPool = SyncTail::makeWriterPool();
+ SyncTail syncTail(nullptr, multiSyncApply, writerPool.get());
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}));
+
+ DBDirectClient client(_opCtx.get());
+ auto result = client.findOne(
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())});
+ ASSERT_TRUE(result.isEmpty());
+}
+
+TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectWriteToTxnTable) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ auto date = Date_t::now();
+
+ auto insertOp = makeOplogEntry(nss(),
+ {Timestamp(1, 0), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
+ {Timestamp(2, 0), 1},
+ repl::OpTypeEnum::kDelete,
+ BSON("_id" << sessionInfo.getSessionId()->toBSON()),
+ boost::none,
+ {},
+ Date_t::now());
+
+ date = Date_t::now();
+ sessionInfo.setTxnNumber(7);
+ auto insertOp2 = makeOplogEntry(nss(),
+ {Timestamp(3, 0), 2},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 6),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto writerPool = SyncTail::makeWriterPool();
+ SyncTail syncTail(nullptr, multiSyncApply, writerPool.get());
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}));
+
+ checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date);
+}
+
+TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
+ NamespaceString ns0("test.0");
+ NamespaceString ns1("test.1");
+ NamespaceString ns2("test.2");
+ NamespaceString ns3("test.3");
+
+ DBDirectClient client(_opCtx.get());
+ BSONObj result;
+ ASSERT(client.runCommand(ns0.db().toString(), BSON("create" << ns0.coll()), result));
+ ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result));
+ ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result));
+ ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result));
+
+ // Entries with a session id and a txnNumber update the transaction table.
+ auto lsidSingle = makeLogicalSessionIdForTest();
+ auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 0), 1LL}, ns0, BSON("_id" << 0), lsidSingle, 5LL, 0);
+
+ // For entries with the same session, the entry with a larger txnNumber is saved.
+ auto lsidDiffTxn = makeLogicalSessionIdForTest();
+ auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(2), 0), 1LL}, ns1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1);
+ auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(3), 0), 1LL}, ns1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1);
+
+ // For entries with the same session and txnNumber, the later optime is saved.
+ auto lsidSameTxn = makeLogicalSessionIdForTest();
+ auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(6), 0), 1LL}, ns2, BSON("_id" << 0), lsidSameTxn, 30LL, 0);
+ auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(5), 0), 1LL}, ns2, BSON("_id" << 1), lsidSameTxn, 30LL, 1);
+
+ // Entries with a session id but no txnNumber do not lead to updates.
+ auto lsidNoTxn = makeLogicalSessionIdForTest();
+ OperationSessionInfo info;
+ info.setSessionId(lsidNoTxn);
+ auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo(
+ {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info);
+
+ auto writerPool = SyncTail::makeWriterPool();
+ SyncTail syncTail(nullptr, multiSyncApply, writerPool.get());
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(),
+ {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}));
+
+ // The txnNum and optime of the only write were saved.
+ auto resultSingleDoc =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON()));
+ ASSERT_TRUE(!resultSingleDoc.isEmpty());
+
+ auto resultSingle =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc);
+
+ ASSERT_EQ(resultSingle.getTxnNum(), 5LL);
+ ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1));
+
+ // The txnNum and optime of the write with the larger txnNum were saved.
+ auto resultDiffTxnDoc =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON()));
+ ASSERT_TRUE(!resultDiffTxnDoc.isEmpty());
+
+ auto resultDiffTxn =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc);
+
+ ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL);
+ ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1));
+
+ // The txnNum and optime of the write with the later optime were saved.
+ auto resultSameTxnDoc =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON()));
+ ASSERT_TRUE(!resultSameTxnDoc.isEmpty());
+
+ auto resultSameTxn =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc);
+
+ ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL);
+ ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1));
+
+ // There is no entry for the write with no txnNumber.
+ auto resultNoTxn =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON()));
+ ASSERT_TRUE(resultNoTxn.isEmpty());
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index f1a2b422305..ff1529fd6cd 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -97,6 +97,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
object, // o
object2, // o2
sessionInfo, // sessionInfo
+ boost::none, // isUpsert
wallClockTime, // wall clock time
stmtId, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 301c9dc40d4..36190bf1cd4 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -89,6 +89,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
oField, // o
o2Field, // o2
sessionInfo, // session info
+ boost::none, // upsert
boost::none, // wall clock time
statementId, // statement id
boost::none, // optime of previous write within same transaction
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 74f87bbba25..f4940cfad40 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -72,6 +72,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
object, // o
object2, // o2
{}, // sessionInfo
+ boost::none, // upsert
wallClockTime, // wall clock time
stmtId, // statement id
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 41cfbd504bc..2abc192165c 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -209,6 +209,59 @@ void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequ
wuow.commit();
}
+/**
+ * Returns a new oplog entry if the given entry has transaction state embedded within in.
+ * The new oplog entry will contain the operation needed to replicate the transaction
+ * table.
+ * Returns boost::none if the given oplog doesn't have any transaction state or does not
+ * support update to the transaction table.
+ */
+boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
+ const repl::OplogEntry& entry) {
+ auto sessionInfo = entry.getOperationSessionInfo();
+ if (!sessionInfo.getTxnNumber()) {
+ return boost::none;
+ }
+
+ // Do not write session table entries for applyOps, as multi-document transactions
+ // and retryable writes do not work together.
+ // TODO(SERVER-33501): Make multi-docunment transactions work with retryable writes.
+ if (entry.isCommand() && entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) {
+ return boost::none;
+ }
+
+ invariant(sessionInfo.getSessionId());
+ invariant(entry.getWallClockTime());
+
+ const auto updateBSON = [&] {
+ SessionTxnRecord newTxnRecord;
+ newTxnRecord.setSessionId(*sessionInfo.getSessionId());
+ newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
+ newTxnRecord.setLastWriteOpTime(entry.getOpTime());
+ newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
+ return newTxnRecord.toBSON();
+ }();
+
+ return repl::OplogEntry(
+ entry.getOpTime(),
+ 0, // hash
+ repl::OpTypeEnum::kUpdate,
+ NamespaceString::kSessionTransactionsTableNamespace,
+ boost::none, // uuid
+ false, // fromMigrate
+ repl::OplogEntry::kOplogVersion,
+ updateBSON,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
+ {}, // sessionInfo
+ true, // upsert
+ *entry.getWallClockTime(),
+ boost::none, // statementId
+ boost::none, // prevWriteOpTime
+ boost::none, // preImangeOpTime
+ boost::none // postImageOpTime
+ );
+}
+
// Failpoint which allows different failure actions to happen after each write. Supports the
// parameters below, which can be combined with each other (unless explicitly disallowed):
//
@@ -354,27 +407,6 @@ void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx,
opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
-void Session::updateSessionRecordOnSecondary(OperationContext* opCtx,
- const SessionTxnRecord& sessionTxnRecord) {
- invariant(!opCtx->lockState()->isLocked());
-
- writeConflictRetry(
- opCtx, "Update session txn", NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] {
- UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
- updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
- << sessionTxnRecord.getSessionId().toBSON()));
- updateRequest.setUpdates(sessionTxnRecord.toBSON());
- updateRequest.setUpsert(true);
-
- repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
-
- Lock::DBLock configDBLock(opCtx, NamespaceString::kConfigDb, MODE_IX);
- WriteUnitOfWork wuow(opCtx);
- updateSessionEntry(opCtx, updateRequest);
- wuow.commit();
- });
-}
-
void Session::invalidate() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
_isValid = false;
@@ -778,4 +810,19 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
}
}
+std::vector<repl::OplogEntry> Session::addOpsForReplicatingTxnTable(
+ const std::vector<repl::OplogEntry>& ops) {
+ std::vector<repl::OplogEntry> newOps;
+
+ for (auto&& op : ops) {
+ newOps.push_back(op);
+
+ if (auto updateTxnTableOp = createMatchingTransactionTableUpdate(op)) {
+ newOps.push_back(*updateTxnTableOp);
+ }
+ }
+
+ return newOps;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 12f9d1b06be..3496cf2aa57 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -169,16 +169,6 @@ public:
Date_t lastStmtIdWriteDate);
/**
- * Called after a replication batch has been applied on a secondary node. Keeps the session
- * transaction entry in sync with the oplog chain which has been written.
- *
- * In order to avoid the possibility of deadlock, this method must not be called while holding a
- * lock.
- */
- static void updateSessionRecordOnSecondary(OperationContext* opCtx,
- const SessionTxnRecord& sessionTxnRecord);
-
- /**
* Marks the session as requiring refresh. Used when the session state has been modified
* externally, such as through a direct write to the transactions table.
*/
@@ -258,6 +248,13 @@ public:
return _transactionOperations;
}
+ /**
+ * Scan through the list of operations and add new oplog entries for updating
+ * config.transactions if needed.
+ */
+ static std::vector<repl::OplogEntry> addOpsForReplicatingTxnTable(
+ const std::vector<repl::OplogEntry>& ops);
+
private:
void _beginOrContinueTxn(WithLock, TxnNumber txnNumber, boost::optional<bool> autocommit);
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index f7e30092b0a..0b0f3c728be 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -69,6 +69,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
object, // o
boost::none, // o2
sessionInfo, // sessionInfo
+ boost::none, // upsert
wallClockTime, // wall clock time
stmtId, // statement id
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
diff --git a/src/mongo/db/transaction_history_iterator_test.cpp b/src/mongo/db/transaction_history_iterator_test.cpp
index 504843f2bf7..a8a66ac8ca7 100644
--- a/src/mongo/db/transaction_history_iterator_test.cpp
+++ b/src/mongo/db/transaction_history_iterator_test.cpp
@@ -71,6 +71,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
docToInsert, // o
boost::none, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index 49d93e75334..8ccb88af950 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -83,6 +83,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
object, // o
object2, // o2
{}, // sessionInfo
+ boost::none, // upsert
boost::none, // wall clock time
boost::none, // statement id
boost::none, // optime of previous write within same transaction