summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/apply_ops.cpp13
-rw-r--r--src/mongo/db/repl/apply_ops.h7
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp4
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp3
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp6
-rw-r--r--src/mongo/db/repl/oplog_applier.h8
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp5
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h4
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp7
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp62
-rw-r--r--src/mongo/db/repl/sync_tail.h10
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp94
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp40
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h9
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp7
-rw-r--r--src/mongo/shell/replsettest.js4
18 files changed, 199 insertions, 90 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 223568e52a0..54971bbdee8 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -486,13 +486,14 @@ Status applyOps(OperationContext* opCtx,
// static
MultiApplier::Operations ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) {
MultiApplier::Operations result;
- extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result);
+ extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result, boost::none);
return result;
}
void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations) {
+ MultiApplier::Operations* operations,
+ boost::optional<Timestamp> commitOplogEntryTS) {
uassert(ErrorCodes::TypeMismatch,
str::stream() << "ApplyOps::extractOperations(): not a command: "
<< redact(applyOpsOplogEntry.toBSON()),
@@ -513,8 +514,16 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
for (const auto& elem : operationDocs) {
auto operationDoc = elem.Obj();
BSONObjBuilder builder(operationDoc);
+
+ // Apply the ts field first if we have a commitOplogEntryTS so that appendElementsUnique
+ // will not overwrite this value.
+ if (commitOplogEntryTS) {
+ builder.append("ts", *commitOplogEntryTS);
+ }
+
builder.appendElementsUnique(topLevelDoc);
auto operation = builder.obj();
+
operations->emplace_back(operation);
}
}
diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h
index 0672ce6d05f..c5cca31569f 100644
--- a/src/mongo/db/repl/apply_ops.h
+++ b/src/mongo/db/repl/apply_ops.h
@@ -56,10 +56,15 @@ public:
* This variant allows optimization for extracting multiple applyOps operations. The entry for
* the non-DurableReplOperation fields of the extracted operation must be specified as
* 'topLevelDoc', and need not be any of the applyOps operations.
+ *
+ * If a commitOplogEntryTS Timestamp is passed in, then we are extracting applyOps operations
+ * from a prepare oplog entry during initial sync. These operations must be timestamped at the
+ * commit oplog entry timestamp instead of the prepareTimestamp.
*/
static void extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations);
+ MultiApplier::Operations* operations,
+ boost::optional<Timestamp> commitOplogEntryTS);
};
/**
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 322f91a29dc..cbee6637c67 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -56,7 +56,9 @@ public:
private:
void _run(OplogBuffer* oplogBuffer) final {}
void _shutdown() final {}
- StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final {
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) final {
return _externalState->multiApplyFn(opCtx, ops, _observer);
}
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 6a5fe64ba4a..ac0a2510718 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -389,7 +389,8 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
std::vector<MultiApplier::OperationPtrs> writerVectors(1);
std::vector<MultiApplier::Operations> derivedOps;
// Derive ops for transactions if necessary.
- syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+ syncTail.fillWriterVectors(
+ _opCtx.get(), &ops, &writerVectors, &derivedOps, OplogApplication::Mode::kInitialSync);
const auto& opPtrs = writerVectors[0];
ASSERT_OK(runOpPtrsInitialSync(opPtrs));
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 784a5cfba2c..2038989393e 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1190,7 +1190,8 @@ void InitialSyncer::_getNextApplierBatchCallback(
_fetchCount.store(0);
MultiApplier::MultiApplyFn applyBatchOfOperationsFn = [this](OperationContext* opCtx,
MultiApplier::Operations ops) {
- return _oplogApplier->multiApply(opCtx, std::move(ops));
+ return _oplogApplier->multiApply(
+ opCtx, std::move(ops), repl::OplogApplication::Mode::kInitialSync);
};
OpTime lastApplied = ops.back().getOpTime();
invariant(ops.back().getWallClockTime());
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 61b5d790bf5..24a484f1362 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -289,9 +289,11 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
return std::move(ops);
}
-StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations ops) {
+StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) {
_observer->onBatchBegin(ops);
- auto lastApplied = _multiApply(opCtx, std::move(ops));
+ auto lastApplied = _multiApply(opCtx, std::move(ops), mode);
_observer->onBatchEnd(lastApplied, {});
return lastApplied;
}
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index 71ea92b4f3f..9650036b496 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -195,7 +195,9 @@ public:
*
* TODO: remove when enqueue() is implemented.
*/
- StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops);
+ StatusWith<OpTime> multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode);
private:
/**
@@ -221,7 +223,9 @@ private:
* Called from multiApply() to apply a batch of operations in parallel.
* Implemented in subclasses but not visible otherwise.
*/
- virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) = 0;
+ virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) = 0;
// Used to schedule task for oplog application loop.
// Not owned by us.
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 8c293a1bbba..9b3126cb781 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -61,8 +61,9 @@ void OplogApplierImpl::_shutdown() {
_syncTail.shutdown();
}
-StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) {
- return _syncTail.multiApply(opCtx, std::move(ops));
+StatusWith<OpTime> OplogApplierImpl::_multiApply(
+ OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) {
+ return _syncTail.multiApply(opCtx, std::move(ops), mode);
}
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index d6e0e523220..741cfd4dcd3 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -67,7 +67,9 @@ private:
void _shutdown() override;
- StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override;
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) override;
// Not owned by us.
ReplicationCoordinator* const _replCoord;
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index 352359bae58..05040963774 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -56,7 +56,9 @@ public:
void _run(OplogBuffer* oplogBuffer) final;
void _shutdown() final;
- StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final;
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) final;
};
OplogApplierMock::OplogApplierMock(OplogBuffer* oplogBuffer)
@@ -66,7 +68,8 @@ void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {}
void OplogApplierMock::_shutdown() {}
-StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) {
+StatusWith<OpTime> OplogApplierMock::_multiApply(
+ OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) {
return OpTime();
}
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 2e8890ddaa4..4f03e19b894 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -402,7 +402,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
OplogApplier::Operations batch;
while (
!(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) {
- applyThroughOpTime = uassertStatusOK(oplogApplier.multiApply(opCtx, std::move(batch)));
+ applyThroughOpTime = uassertStatusOK(
+ oplogApplier.multiApply(opCtx, std::move(batch), OplogApplication::Mode::kRecovering));
}
stats.complete(applyThroughOpTime);
invariant(oplogBuffer.isEmpty(),
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6b33953ca12..2c3e6317dc0 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -796,7 +796,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
// Apply the operations in this batch. 'multiApply' returns the optime of the last op that
// was applied, which should be the last optime in the batch.
auto lastOpTimeAppliedInBatch =
- fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch()));
+ fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch(), boost::none));
invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
// In order to provide resilience in the event of a crash in the middle of batch
@@ -847,6 +847,12 @@ inline bool isCommitApplyOps(const OplogEntry& entry) {
!entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare");
}
+// Returns whether a commitTransaction oplog entry is a part of a prepared transaction.
+inline bool isPreparedCommit(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction;
+}
+
+
void SyncTail::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inShutdown = true;
@@ -1109,7 +1115,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) {
+ SessionUpdateTracker* sessionUpdateTracker,
+ boost::optional<repl::OplogApplication::Mode> mode) {
const auto serviceContext = opCtx->getServiceContext();
const auto storageEngine = serviceContext->getStorageEngine();
@@ -1137,7 +1144,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
if (sessionUpdateTracker) {
if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ _fillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
}
@@ -1199,13 +1207,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// messing up the state of the opCtx. In particular we do not want to
// set the ReadSource to kLastApplied.
ReadSourceScope readSourceScope(opCtx);
- derivedOps->emplace_back(
- readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
+ derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
+ opCtx, op, partialTxnList, boost::none));
partialTxnList.clear();
}
// Transaction entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
} else {
// The applyOps entry was not generated as part of a transaction.
invariant(!op.getPrevWriteOpTimeInTransaction());
@@ -1213,7 +1221,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Nested entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
} catch (...) {
fassertFailedWithStatusNoTrace(
@@ -1225,6 +1233,32 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
continue;
}
+ // If we see a commitTransaction command that is a part of a prepared transaction during
+ // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers
+ // with the extracted operations.
+ if (isPreparedCommit(op) && (mode == OplogApplication::Mode::kInitialSync)) {
+ auto logicalSessionId = op.getSessionId();
+ auto& partialTxnList = partialTxnOps[*logicalSessionId];
+
+ {
+ // Traverse the oplog chain with its own snapshot and read timestamp.
+ ReadSourceScope readSourceScope(opCtx);
+
+ // Get the previous oplog entry, which should be a prepare oplog entry.
+ const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op);
+ invariant(prevOplogEntry.shouldPrepare());
+
+ // Extract the operations from the applyOps entry.
+ auto commitOplogEntryOpTime = op.getOpTime();
+ derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
+ opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp()));
+ }
+
+ _fillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ continue;
+ }
+
auto& writer = (*writerVectors)[hash % numWriters];
if (writer.empty()) {
writer.reserve(8); // Skip a few growth rounds
@@ -1236,14 +1270,15 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps) {
+ std::vector<MultiApplier::Operations>* derivedOps,
+ boost::optional<repl::OplogApplication::Mode> mode) {
SessionUpdateTracker sessionUpdateTracker;
- _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
+ _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker, mode);
auto newOplogWrites = sessionUpdateTracker.flushAll();
if (!newOplogWrites.empty()) {
derivedOps->emplace_back(std::move(newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
}
@@ -1275,10 +1310,13 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors
}
}
-StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
+StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx,
+ MultiApplier::Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) {
invariant(!ops.empty());
LOG(2) << "replication batch size is " << ops.size();
+
// Stop all readers until we're done. This also prevents doc-locking engines from deleting old
// entries from the oplog until we finish writing.
Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
@@ -1318,7 +1356,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
std::vector<MultiApplier::Operations> derivedOps;
std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
+ fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps, mode);
// Wait for writes to finish before applying ops.
_writerPool->waitForIdle();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index fd314fd3564..5f9961c8b22 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -228,12 +228,15 @@ public:
* to at least the last optime of the batch. If 'minValid' is already greater than or equal
* to the last optime of this batch, it will not be updated.
*/
- StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops);
+ StatusWith<OpTime> multiApply(OperationContext* opCtx,
+ MultiApplier::Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode);
void fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps);
+ std::vector<MultiApplier::Operations>* derivedOps,
+ boost::optional<repl::OplogApplication::Mode> mode);
private:
class OpQueueBatcher;
@@ -244,7 +247,8 @@ private:
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker);
+ SessionUpdateTracker* sessionUpdateTracker,
+ boost::optional<repl::OplogApplication::Mode> mode);
/**
* Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 514192587ba..9ec52ed34e3 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -408,7 +408,7 @@ DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty
getStorageInterface(),
noopApplyOperationFn,
writerPool.get());
- syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore();
+ syncTail.multiApply(_opCtx.get(), {}, boost::none).getStatus().ignore();
}
bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
@@ -434,7 +434,7 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
SyncTail syncTail(
nullptr, consistencyMarkers, storageInterface, applyOperationFn, writerPool.get());
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}));
+ auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}, boost::none));
ASSERT_EQUALS(op.getOpTime(), lastOpTime);
ASSERT_EQUALS(1U, operationsApplied.size());
@@ -598,7 +598,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -613,7 +613,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// Apply a batch with only the second operation. This should result in the second oplog entry
// being put in the oplog, but with no effect because the operation is part of a pending
// transaction.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}, boost::none));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -629,7 +629,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the two previous entries being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}, boost::none));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -656,7 +656,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
// Apply both inserts and the commit in a single batch. We expect no oplog entries to
// be inserted (because we've set skipWritesToOplog), and both entries to be committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}));
+ ASSERT_OK(
+ syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}, boost::none));
ASSERT_EQ(0U, oplogDocs().size());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -710,7 +711,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
// Insert the first entry in its own batch. This should result in the oplog entry being written
// but the entry should not be applied as it is part of a pending transaction.
const auto expectedStartOpTime = insertOps[0].getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_EQ(0U, _insertedDocs[_nss1].size());
ASSERT_EQ(0U, _insertedDocs[_nss2].size());
@@ -723,8 +724,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
// Insert the rest of the entries, including the commit. These entries should be added to the
// oplog, and all the entries including the first should be applied.
- ASSERT_OK(
- syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}, boost::none));
ASSERT_EQ(5U, oplogDocs().size());
ASSERT_EQ(3U, _insertedDocs[_nss1].size());
ASSERT_EQ(1U, _insertedDocs[_nss2].size());
@@ -847,7 +848,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) {
// once.
ASSERT_OK(syncTail.multiApply(
_opCtx.get(),
- {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}));
+ {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2},
+ boost::none));
ASSERT_EQ(6U, oplogDocs().size());
ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore);
ASSERT_EQ(4U, _insertedDocs[_nss1].size());
@@ -956,7 +958,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
@@ -972,7 +974,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, and the two previous entries being applied (but in a transaction) along with the
// nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
@@ -986,7 +988,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the three previous entries being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none));
ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1006,7 +1008,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
checkTxnTable(_lsid,
_txnNum,
_insertOp1->getOpTime(),
@@ -1017,7 +1019,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, and the two previous entries being applied (but in a transaction) along with the
// nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
checkTxnTable(_lsid,
_txnNum,
_prepareWithPrevOp->getOpTime(),
@@ -1027,7 +1029,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// Apply a batch with only the abort. This should result in the abort being put in the
// oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}, boost::none));
ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1051,7 +1053,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_insertOp1, *_insertOp2}, OplogApplication::Mode::kInitialSync));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
@@ -1066,7 +1069,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_prepareWithPrevOp}, OplogApplication::Mode::kInitialSync));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1080,7 +1084,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the three previous entries being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_commitPrepareWithPrevOp}, OplogApplication::Mode::kInitialSync));
ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1115,7 +1120,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with the insert operations. This should have no effect, because this is
// recovery.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1128,7 +1133,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with only the prepare applyOps. This should have no effect, since this is
// recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1141,7 +1146,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with only the commit. This should result in the the three previous entries
// being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1161,7 +1166,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
@@ -1174,7 +1179,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1202,7 +1207,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1215,7 +1220,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1234,7 +1239,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep
const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
checkTxnTable(_lsid,
_txnNum,
_singlePrepareApplyOp->getOpTime(),
@@ -1244,7 +1249,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep
// Apply a batch with only the abort. This should result in the abort being put in the
// oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}, boost::none));
ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1269,7 +1274,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_singlePrepareApplyOp}, OplogApplication::Mode::kInitialSync));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1283,7 +1289,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the previous entry being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_commitSinglePrepareApplyOp}, OplogApplication::Mode::kInitialSync));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1319,7 +1326,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the prepare applyOps. This should have no effect, since this is
// recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1332,7 +1339,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the commit. This should result in the previous entry being
// applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -2551,7 +2558,7 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}, boost::none));
checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date);
}
@@ -2582,7 +2589,7 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}, boost::none));
ASSERT_FALSE(docExists(
_opCtx.get(),
@@ -2626,7 +2633,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTa
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}, boost::none));
checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date);
}
@@ -2658,7 +2665,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTa
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}, boost::none));
checkTxnTable(sessionInfo, newWriteOpTime, date);
}
@@ -2719,7 +2726,8 @@ TEST_F(SyncTailTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSessi
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}, boost::none));
repl::checkTxnTable(_opCtx.get(),
*sessionInfo.getSessionId(),
@@ -2785,7 +2793,8 @@ TEST_F(SyncTailTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSessi
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}, boost::none));
repl::checkTxnTable(_opCtx.get(),
*sessionInfo.getSessionId(),
@@ -2850,7 +2859,8 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(
_opCtx.get(),
- {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}));
+ {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn},
+ boost::none));
// The txnNum and optime of the only write were saved.
auto resultSingleDoc =
@@ -2922,7 +2932,7 @@ TEST_F(SyncTailTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}, boost::none));
checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate);
}
@@ -2945,7 +2955,7 @@ TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}, boost::none));
ASSERT_FALSE(docExists(_opCtx.get(),
NamespaceString::kSessionTransactionsTableNamespace,
@@ -2970,7 +2980,7 @@ TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}, boost::none));
ASSERT_FALSE(docExists(
_opCtx.get(),
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index cf0a9d49c94..8f376b44d18 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -100,12 +100,8 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx,
ReadSourceScope readSourceScope(opCtx);
// Get the corresponding prepare applyOps oplog entry.
- const auto prepareOpTime = entry.getPrevWriteOpTimeInTransaction();
- invariant(prepareOpTime);
- TransactionHistoryIterator iter(prepareOpTime.get());
- invariant(iter.hasNext());
- const auto prepareOplogEntry = iter.nextFatalOnErrors(opCtx);
- ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {});
+ const auto prepareOplogEntry = getPreviousOplogEntry(opCtx, entry);
+ ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {}, boost::none);
}
const auto dbName = entry.getNss().db().toString();
@@ -137,6 +133,21 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx,
}
} // namespace
+/**
+ * Helper used to get previous oplog entry from the same transaction.
+ */
+const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
+ const repl::OplogEntry& entry) {
+ const auto prevOpTime = entry.getPrevWriteOpTimeInTransaction();
+ invariant(prevOpTime);
+ TransactionHistoryIterator iter(prevOpTime.get());
+ invariant(iter.hasNext());
+ const auto prevOplogEntry = iter.next(opCtx);
+
+ return prevOplogEntry;
+}
+
+
Status applyCommitTransaction(OperationContext* opCtx,
const OplogEntry& entry,
repl::OplogApplication::Mode mode) {
@@ -150,8 +161,7 @@ Status applyCommitTransaction(OperationContext* opCtx,
auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject());
invariant(commitCommand.getCommitTimestamp());
- if (mode == repl::OplogApplication::Mode::kRecovering ||
- mode == repl::OplogApplication::Mode::kInitialSync) {
+ if (mode == repl::OplogApplication::Mode::kRecovering) {
return _applyTransactionFromOplogChain(opCtx,
entry,
mode,
@@ -216,7 +226,8 @@ Status applyAbortTransaction(OperationContext* opCtx,
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
const OplogEntry& commitOrPrepare,
- const std::vector<OplogEntry*>& cachedOps) {
+ const std::vector<OplogEntry*>& cachedOps,
+ boost::optional<Timestamp> commitOplogEntryTS) {
repl::MultiApplier::Operations ops;
// Get the previous oplog entry.
@@ -245,7 +256,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
const auto& operationEntry = iter.nextFatalOnErrors(opCtx);
invariant(operationEntry.isPartialTransaction());
auto prevOpsEnd = ops.size();
- repl::ApplyOps::extractOperationsTo(operationEntry, commitOrPrepareObj, &ops);
+ repl::ApplyOps::extractOperationsTo(
+ operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
// Because BSONArrays do not have fast way of determining size without iterating through
// them, and we also have no way of knowing how many oplog entries are in a transaction
// without iterating, reversing each applyOps and then reversing the whole array is
@@ -260,12 +272,14 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
for (auto* cachedOp : cachedOps) {
const auto& operationEntry = *cachedOp;
invariant(operationEntry.isPartialTransaction());
- repl::ApplyOps::extractOperationsTo(operationEntry, commitOrPrepareObj, &ops);
+ repl::ApplyOps::extractOperationsTo(
+ operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
}
// Reconstruct the operations from the commit or prepare oplog entry.
if (commitOrPrepare.getCommandType() == OplogEntry::CommandType::kApplyOps) {
- repl::ApplyOps::extractOperationsTo(commitOrPrepare, commitOrPrepareObj, &ops);
+ repl::ApplyOps::extractOperationsTo(
+ commitOrPrepare, commitOrPrepareObj, &ops, commitOplogEntryTS);
}
return ops;
}
@@ -284,7 +298,7 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
// The prepare time of the transaction is set explicitly below.
auto ops = [&] {
ReadSourceScope readSourceScope(opCtx);
- return readTransactionOperationsFromOplogChain(opCtx, entry, {});
+ return readTransactionOperationsFromOplogChain(opCtx, entry, {}, boost::none);
}();
if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering ||
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
index dc145d2c701..5ad20f48875 100644
--- a/src/mongo/db/repl/transaction_oplog_application.h
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -51,13 +51,20 @@ Status applyAbortTransaction(OperationContext* opCtx,
repl::OplogApplication::Mode mode);
/**
+ * Helper used to get previous oplog entry from the same transaction.
+ */
+const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
+ const repl::OplogEntry& entry);
+
+/**
* Follow an oplog chain and copy the operations to destination. Operations will be copied in
* forward oplog order (increasing optimes).
*/
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
const repl::OplogEntry& entry,
- const std::vector<repl::OplogEntry*>& cachedOps);
+ const std::vector<repl::OplogEntry*>& cachedOps,
+ boost::optional<Timestamp> commitOplogEntryTS);
/**
* Apply `prepareTransaction` oplog entry.
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 10a7c57f407..6208fe8944d 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1386,7 +1386,8 @@ public:
storageInterface,
{},
writerPool.get());
- ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)));
+ ASSERT_EQUALS(op2.getOpTime(),
+ unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none)));
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX);
assertMultikeyPaths(
@@ -1495,7 +1496,7 @@ public:
storageInterface,
options,
writerPool.get());
- auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops));
+ auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none));
ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp());
// Wait for the index build to finish before making any assertions.
@@ -2478,7 +2479,7 @@ public:
auto writerPool = repl::OplogApplier::makeWriterPool(1);
repl::SyncTail syncTail(
nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get());
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}));
+ auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}, boost::none));
ASSERT_EQ(insertOp.getOpTime(), lastOpTime);
joinGuard.dismiss();
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index 8d41cacaef4..a9d13d29672 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -2293,6 +2293,10 @@ var ReplSetTest = function(opts) {
}
baseOptions = Object.merge(baseOptions, this.nodeOptions["n" + n]);
options = Object.merge(baseOptions, options);
+ if (options.hasOwnProperty("rsConfig")) {
+ this.nodeOptions["n" + n] =
+ Object.merge(this.nodeOptions["n" + n], {rsConfig: options.rsConfig});
+ }
delete options.rsConfig;
options.restart = options.restart || restart;