summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-09-30 17:18:10 +0000
committerevergreen <evergreen@mongodb.com>2019-09-30 17:18:10 +0000
commit9237e4d66a592d30385f9496b8fda1590e9ff9ca (patch)
tree4dad80baba13ecf504d104f875e1281d50d0a6fe
parent194361c6eafdbda1ccd272b6a1e1a887817f476a (diff)
downloadmongo-9237e4d66a592d30385f9496b8fda1590e9ff9ca.tar.gz
SERVER-43344 Move shutdown, multiApply, and scheduleWritesToOplog from SyncTail to OplogApplierImpl
-rw-r--r--src/mongo/db/repl/SConscript19
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp1
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp1
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp2
-rw-r--r--src/mongo/db/repl/oplog_applier.h7
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp256
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h31
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp1733
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp142
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.h155
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp3
-rw-r--r--src/mongo/db/repl/opqueue_batcher.cpp6
-rw-r--r--src/mongo/db/repl/opqueue_batcher.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp1
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp252
-rw-r--r--src/mongo/db/repl/sync_tail.h46
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp1518
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp14
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp20
21 files changed, 2352 insertions, 1867 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index cd06cba18bc..5da88d7a47b 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -500,6 +500,23 @@ env.Library(
],
)
+
+env.Library(
+ target='oplog_applier_impl_test_fixture',
+ source=[
+ 'oplog_applier_impl_test_fixture.cpp',
+ ],
+ LIBDEPS=[
+ 'drop_pending_collection_reaper',
+ 'oplog_application',
+ 'replmocks',
+ 'storage_interface_impl',
+ '$BUILD_DIR/mongo/db/catalog/document_validation',
+ '$BUILD_DIR/mongo/db/s/op_observer_sharding_impl',
+ '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ ],
+)
+
env.Library(
target='oplog_application_interface',
source=[
@@ -1231,6 +1248,7 @@ env.CppUnitTest(
'isself_test.cpp',
'member_config_test.cpp',
'multiapplier_test.cpp',
+ 'oplog_applier_impl_test.cpp',
'oplog_applier_test.cpp',
'oplog_buffer_collection_test.cpp',
'oplog_buffer_proxy_test.cpp',
@@ -1304,6 +1322,7 @@ env.CppUnitTest(
'multiapplier',
'oplog',
'oplog_application_interface',
+ 'oplog_applier_impl_test_fixture',
'oplog_buffer_collection',
'oplog_buffer_proxy',
'oplog_entry',
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 33ea1ea0d16..e7cb221c92b 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -143,6 +143,7 @@ std::unique_ptr<OplogApplier> DataReplicatorExternalStateImpl::makeOplogApplier(
_replicationCoordinator,
consistencyMarkers,
storageInterface,
+ multiSyncApply,
options,
writerPool);
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index ae615085319..3e38368a026 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -58,7 +58,6 @@ public:
private:
void _run(OplogBuffer* oplogBuffer) final {}
- void _shutdown() final {}
StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final {
return _externalState->multiApplyFn(opCtx, ops, _observer);
}
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 6f67f840635..adae4b62607 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -382,10 +382,7 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
}
SyncTail syncTail(nullptr, // observer
- nullptr, // consistency markers
nullptr, // storage interface
- SyncTail::MultiSyncApplyFunc(),
- nullptr, // writer pool
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync));
std::vector<MultiApplier::OperationPtrs> writerVectors(1);
std::vector<MultiApplier::Operations> derivedOps;
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 5242a9c917b..0bb82669ddc 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -73,8 +73,6 @@ Future<void> OplogApplier::startup() {
}
void OplogApplier::shutdown() {
- _shutdown();
-
stdx::lock_guard<Latch> lock(_mutex);
_inShutdown = true;
}
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index f6049c71943..63be125b2bd 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -196,13 +196,6 @@ private:
virtual void _run(OplogBuffer* oplogBuffer) = 0;
/**
- * Called from shutdown to signals oplog application loop to stop running.
- * Currently applicable to steady state replication only.
- * Implemented in subclasses but not visible otherwise.
- */
- virtual void _shutdown() = 0;
-
- /**
* Called from multiApply() to apply a batch of operations in parallel.
* Implemented in subclasses but not visible otherwise.
*/
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 1c094f6d077..f0317f59b79 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -29,7 +29,9 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+#include "mongo/db/stats/timer_stats.h"
#include "mongo/platform/basic.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/db/repl/oplog_applier_impl.h"
@@ -37,6 +39,16 @@
namespace mongo {
namespace repl {
+MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion);
+MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationAfterWritingOplogEntries);
+
+// Tracks the oplog application batch size.
+Counter64 oplogApplicationBatchSize;
+ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.batchSize",
+ &oplogApplicationBatchSize);
+// Number and time of each ApplyOps worker pool round
+TimerStats applyBatchStats;
+ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats);
namespace {
@@ -151,14 +163,16 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
ReplicationCoordinator* replCoord,
ReplicationConsistencyMarkers* consistencyMarkers,
StorageInterface* storageInterface,
+ MultiSyncApplyFunc func,
const OplogApplier::Options& options,
ThreadPool* writerPool)
: OplogApplier(executor, oplogBuffer, observer, options),
_replCoord(replCoord),
+ _writerPool(writerPool),
_storageInterface(storageInterface),
_consistencyMarkers(consistencyMarkers),
- _syncTail(
- observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options),
+ _applyFunc(func),
+ _syncTail(observer, storageInterface, options),
_beginApplyingOpTime(options.beginApplyingOpTime) {}
void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
@@ -168,17 +182,13 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
_runLoop(oplogBuffer, getNextApplierBatchFn);
}
-void OplogApplierImpl::_shutdown() {
- _syncTail.shutdown();
-}
-
void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer,
OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) {
// We don't start data replication for arbiters at all and it's not allowed to reconfig
// arbiterOnly field for any member.
invariant(!_replCoord->getMemberState().arbiter());
- OpQueueBatcher batcher(&_syncTail, _storageInterface, oplogBuffer, getNextApplierBatchFn);
+ OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn);
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getStorageEngine()->isDurable()
@@ -203,7 +213,7 @@ void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer,
while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
// Tests should not trigger clean shutdown while that failpoint is active. If we
// think we need this, we need to think hard about what the behavior should be.
- if (_syncTail.inShutdown()) {
+ if (inShutdown()) {
severe() << "Turn off rsSyncApplyStop before attempting clean shutdown";
fassertFailedNoTrace(40304);
}
@@ -256,10 +266,10 @@ void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer,
// Don't allow the fsync+lock thread to see intermediate states of batch application.
stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
- // Apply the operations in this batch. 'multiApply' returns the optime of the last op that
+ // Apply the operations in this batch. '_multiApply' returns the optime of the last op that
// was applied, which should be the last optime in the batch.
auto lastOpTimeAppliedInBatch =
- fassertNoTrace(34437, _syncTail.multiApply(&opCtx, ops.releaseBatch()));
+ fassertNoTrace(34437, _multiApply(&opCtx, ops.releaseBatch()));
invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
// Update various things that care about our last applied optime. Tests rely on 1 happening
@@ -294,8 +304,230 @@ void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer,
}
}
-StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) {
- return _syncTail.multiApply(opCtx, std::move(ops));
+
+// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that
+// 'ops' stays valid until all scheduled work in the thread pool completes.
+void scheduleWritesToOplog(OperationContext* opCtx,
+ StorageInterface* storageInterface,
+ ThreadPool* writerPool,
+ const MultiApplier::Operations& ops) {
+ auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) {
+ // The returned function will be run in a separate thread after this returns. Therefore all
+ // captures other than 'ops' must be by value since they will not be available. The caller
+ // guarantees that 'ops' will stay in scope until the spawned threads complete.
+ return [storageInterface, &ops, begin, end](auto status) {
+ invariant(status);
+
+ auto opCtx = cc().makeOperationContext();
+
+ // This code path is only executed on secondaries and initial syncing nodes, so it is
+ // safe to exclude any writes from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
+
+ UnreplicatedWritesBlock uwb(opCtx.get());
+ ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
+ opCtx->lockState());
+
+ std::vector<InsertStatement> docs;
+ docs.reserve(end - begin);
+ for (size_t i = begin; i < end; i++) {
+ // Add as unowned BSON to avoid unnecessary ref-count bumps.
+ // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed.
+ docs.emplace_back(InsertStatement{ops[i].getRaw(),
+ ops[i].getOpTime().getTimestamp(),
+ ops[i].getOpTime().getTerm()});
+ }
+
+ fassert(40141,
+ storageInterface->insertDocuments(
+ opCtx.get(), NamespaceString::kRsOplogNamespace, docs));
+ };
+ };
+
+ // We want to be able to take advantage of bulk inserts so we don't use multiple threads if it
+ // would result too little work per thread. This also ensures that we can amortize the
+ // setup/teardown overhead across many writes.
+ const size_t kMinOplogEntriesPerThread = 16;
+ const bool enoughToMultiThread =
+ ops.size() >= kMinOplogEntriesPerThread * writerPool->getStats().numThreads;
+
+ // Only doc-locking engines support parallel writes to the oplog because they are required to
+ // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally,
+ // there would be no way to take advantage of multiple threads if a storage engine doesn't
+ // support document locking.
+ if (!enoughToMultiThread ||
+ !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
+
+ writerPool->schedule(makeOplogWriterForRange(0, ops.size()));
+ return;
+ }
+
+
+ const size_t numOplogThreads = writerPool->getStats().numThreads;
+ const size_t numOpsPerThread = ops.size() / numOplogThreads;
+ for (size_t thread = 0; thread < numOplogThreads; thread++) {
+ size_t begin = thread * numOpsPerThread;
+ size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread;
+ writerPool->schedule(makeOplogWriterForRange(begin, end));
+ }
+}
+
+StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx,
+ MultiApplier::Operations ops) {
+ invariant(!ops.empty());
+
+ LOG(2) << "replication batch size is " << ops.size();
+
+ // Stop all readers until we're done. This also prevents doc-locking engines from deleting old
+ // entries from the oplog until we finish writing.
+ Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
+
+ // TODO (SERVER-42996): This is a temporary invariant to protect against segfaults. This will
+ // be removed once ApplierState is moved from ReplicationCoordinator to OplogApplier.
+ invariant(_replCoord);
+ if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
+ severe() << "attempting to replicate ops while primary";
+ return {ErrorCodes::CannotApplyOplogWhilePrimary,
+ "attempting to replicate ops while primary"};
+ }
+
+ // Increment the batch size stat.
+ oplogApplicationBatchSize.increment(ops.size());
+
+ std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads);
+ {
+ // Each node records cumulative batch application stats for itself using this timer.
+ TimerHolder timer(&applyBatchStats);
+
+ // We must wait for the all work we've dispatched to complete before leaving this block
+ // because the spawned threads refer to objects on the stack
+ ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); });
+
+ // Write batch of ops into oplog.
+ // TODO (SERVER-43651): _options currently belongs to SyncTail for use in the free
+ // function multiSyncApply; move this field to OplogApplierImpl.
+ if (!_syncTail.getOptions().skipWritesToOplog) {
+ _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp());
+ scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops);
+ }
+
+ // Holds 'pseudo operations' generated by secondaries to aid in replication.
+ // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied.
+ // Pseudo operations include:
+ // - applyOps operations expanded to individual ops.
+ // - ops to update config.transactions. Normal writes to config.transactions in the
+ // primary don't create an oplog entry, so extract info from writes with transactions
+ // and create a pseudo oplog.
+ std::vector<MultiApplier::Operations> derivedOps;
+
+ std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
+ _syncTail.fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
+
+ // Wait for writes to finish before applying ops.
+ _writerPool->waitForIdle();
+
+ // Use this fail point to hold the PBWM lock after we have written the oplog entries but
+ // before we have applied them.
+ if (MONGO_unlikely(pauseBatchApplicationAfterWritingOplogEntries.shouldFail())) {
+ log() << "pauseBatchApplicationAfterWritingOplogEntries fail point enabled. Blocking "
+ "until fail point is disabled.";
+ pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx);
+ }
+
+ // Reset consistency markers in case the node fails while applying ops.
+ if (!_syncTail.getOptions().skipWritesToOplog) {
+ _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
+ _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime());
+ }
+
+ {
+ std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
+
+ // Doles out all the work to the writer pool threads. writerVectors is not modified,
+ // but multiSyncApply will modify the vectors that it contains.
+ invariant(writerVectors.size() == statusVector.size());
+ for (size_t i = 0; i < writerVectors.size(); i++) {
+ if (writerVectors[i].empty())
+ continue;
+
+ _writerPool->schedule(
+ [this,
+ &writer = writerVectors.at(i),
+ &status = statusVector.at(i),
+ &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) {
+ invariant(scheduleStatus);
+
+ auto opCtx = cc().makeOperationContext();
+
+ // This code path is only executed on secondaries and initial syncing nodes,
+ // so it is safe to exclude any writes from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
+
+ status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
+ return _applyFunc(opCtx.get(), &writer, &_syncTail, &multikeyVector);
+ });
+ });
+ }
+
+ _writerPool->waitForIdle();
+
+ // If any of the statuses is not ok, return error.
+ for (auto it = statusVector.cbegin(); it != statusVector.cend(); ++it) {
+ const auto& status = *it;
+ if (!status.isOK()) {
+ severe()
+ << "Failed to apply batch of operations. Number of operations in batch: "
+ << ops.size() << ". First operation: " << redact(ops.front().toBSON())
+ << ". Last operation: " << redact(ops.back().toBSON())
+ << ". Oplog application failed in writer thread "
+ << std::distance(statusVector.cbegin(), it) << ": " << redact(status);
+ return status;
+ }
+ }
+ }
+ }
+
+ // Notify the storage engine that a replication batch has completed. This means that all the
+ // writes associated with the oplog entries in the batch are finished and no new writes with
+ // timestamps associated with those oplog entries will show up in the future.
+ const auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
+ storageEngine->replicationBatchIsComplete();
+
+ // Use this fail point to hold the PBWM lock and prevent the batch from completing.
+ if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
+ log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail "
+ "point is disabled.";
+ while (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
+ if (inShutdown()) {
+ severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting "
+ "clean shutdown";
+ fassertFailedNoTrace(50798);
+ }
+ sleepmillis(100);
+ }
+ }
+
+ Timestamp firstTimeInBatch = ops.front().getTimestamp();
+ // Set any indexes to multikey that this batch ignored. This must be done while holding the
+ // parallel batch writer mode lock.
+ for (WorkerMultikeyPathInfo infoVector : multikeyVector) {
+ for (MultikeyPathInfo info : infoVector) {
+ // We timestamp every multikey write with the first timestamp in the batch. It is always
+ // safe to set an index as multikey too early, just not too late. We conservatively pick
+ // the first timestamp in the batch since we do not have enough information to find out
+ // the timestamp of the first write that set the given multikey path.
+ fassert(50686,
+ _storageInterface->setIndexIsMultikey(
+ opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch));
+ }
+ }
+
+ // Increment the counter for the number of ops applied during catchup if the node is in catchup
+ // mode.
+ _replCoord->incrementNumCatchUpOpsIfCatchingUp(ops.size());
+
+ // We have now written all database writes and updated the oplog to match.
+ return ops.back().getOpTime();
}
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index 087e5870e56..efd31736a6b 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -53,6 +53,11 @@ class OplogApplierImpl : public OplogApplier {
OplogApplierImpl& operator=(const OplogApplierImpl&) = delete;
public:
+ using MultiSyncApplyFunc =
+ std::function<Status(OperationContext* opCtx,
+ MultiApplier::OperationPtrs* ops,
+ SyncTail* st,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo)>;
/**
* Constructs this OplogApplier with specific options.
* Obtains batches of operations from the OplogBuffer to apply.
@@ -64,14 +69,13 @@ public:
ReplicationCoordinator* replCoord,
ReplicationConsistencyMarkers* consistencyMarkers,
StorageInterface* storageInterface,
+ MultiSyncApplyFunc func,
const Options& options,
ThreadPool* writerPool);
private:
void _run(OplogBuffer* oplogBuffer) override;
- void _shutdown() override;
-
/**
* Runs oplog application in a loop until shutdown() is called.
* Retrieves operations from the OplogBuffer in batches that will be applied in parallel using
@@ -80,16 +84,37 @@ private:
void _runLoop(OplogBuffer* oplogBuffer,
OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn);
- StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override;
+ /**
+ * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then
+ * using a set of threads to apply the operations. It writes all entries to the oplog, but only
+ * applies entries with timestamp >= beginApplyingTimestamp.
+ *
+ * If the batch application is successful, returns the optime of the last op applied, which
+ * should be the last op in the batch.
+ * Returns ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary.
+ *
+ * To provide crash resilience, this function will advance the persistent value of 'minValid'
+ * to at least the last optime of the batch. If 'minValid' is already greater than or equal
+ * to the last optime of this batch, it will not be updated.
+ */
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx, MultiApplier::Operations ops);
// Not owned by us.
ReplicationCoordinator* const _replCoord;
+ // Pool of worker threads for writing ops to the databases.
+ // Not owned by us.
+ ThreadPool* const _writerPool;
+
StorageInterface* _storageInterface;
ReplicationConsistencyMarkers* const _consistencyMarkers;
+ // Function to use during _multiApply
+ MultiSyncApplyFunc _applyFunc;
+
// Used to run oplog application loop.
+ // TODO (SERVER-43651): Remove this member once sync_tail.cpp is fully merged in.
SyncTail _syncTail;
// Used to determine which operations should be applied during initial sync. If this is null,
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
new file mode 100644
index 00000000000..59268e41483
--- /dev/null
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -0,0 +1,1733 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/oplog_applier_impl_test_fixture.h"
+// TODO SERVER-41882 This is a temporary inclusion to avoid duplicate definitions of free
+// functions. Remove once sync_tail_test.cpp is fully merged in.
+#include "mongo/db/repl/sync_tail_test_fixture.h"
+
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/repl/idempotency_test_fixture.h"
+#include "mongo/db/repl/sync_tail.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/unittest/death_test.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+
+/**
+ * Creates collection options suitable for oplog.
+ */
+CollectionOptions createOplogCollectionOptions() {
+ CollectionOptions options;
+ options.capped = true;
+ options.cappedSize = 64 * 1024 * 1024LL;
+ options.autoIndexId = CollectionOptions::NO;
+ return options;
+}
+
+/**
+ * Create test collection.
+ * Returns collection.
+ */
+void createCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionOptions& options) {
+ writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] {
+ Lock::DBLock dblk(opCtx, nss.db(), MODE_X);
+ OldClientContext ctx(opCtx, nss.ns());
+ auto db = ctx.db();
+ ASSERT_TRUE(db);
+ mongo::WriteUnitOfWork wuow(opCtx);
+ auto coll = db->createCollection(opCtx, nss, options);
+ ASSERT_TRUE(coll);
+ wuow.commit();
+ });
+}
+
+
+/**
+ * Create test collection with UUID.
+ */
+auto createCollectionWithUuid(OperationContext* opCtx, const NamespaceString& nss) {
+ CollectionOptions options;
+ options.uuid = UUID::gen();
+ createCollection(opCtx, nss, options);
+ return options.uuid.get();
+}
+
+/**
+ * Create test database.
+ */
+void createDatabase(OperationContext* opCtx, StringData dbName) {
+ Lock::GlobalWrite globalLock(opCtx);
+ bool justCreated;
+ auto databaseHolder = DatabaseHolder::get(opCtx);
+ auto db = databaseHolder->openDb(opCtx, dbName, &justCreated);
+ ASSERT_TRUE(db);
+ ASSERT_TRUE(justCreated);
+}
+
+DEATH_TEST_F(OplogApplierImplTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") {
+ // TODO SERVER-43344 Currently, this file only tests multiApply, so several parameters have been
+ // set to nullptr during OplogAppierImpl construction as they are not needed in multiApply.
+ // Update constructors as needed once the rest of sync_tail_test.cpp has been merged in.
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ noopApplyOperationFn,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+ oplogApplier.multiApply(_opCtx.get(), {}).getStatus().ignore();
+}
+
+bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
+ ReplicationCoordinator* const replCoord,
+ ReplicationConsistencyMarkers* const consistencyMarkers,
+ StorageInterface* const storageInterface,
+ const NamespaceString& nss,
+ const CollectionOptions& options) {
+ auto writerPool = makeReplWriterPool();
+ MultiApplier::Operations operationsApplied;
+ auto applyOperationFn = [&operationsApplied](OperationContext* opCtx,
+ MultiApplier::OperationPtrs* operationsToApply,
+ SyncTail* st,
+ WorkerMultikeyPathInfo*) -> Status {
+ for (auto&& opPtr : *operationsToApply) {
+ operationsApplied.push_back(*opPtr);
+ }
+ return Status::OK();
+ };
+ createCollection(opCtx, nss, options);
+
+ auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1));
+ ASSERT_FALSE(op.isForCappedCollection);
+
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ replCoord,
+ consistencyMarkers,
+ storageInterface,
+ applyOperationFn,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+ auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(opCtx, {op}));
+ ASSERT_EQUALS(op.getOpTime(), lastOpTime);
+
+ ASSERT_EQUALS(1U, operationsApplied.size());
+ const auto& opApplied = operationsApplied.front();
+ ASSERT_EQUALS(op, opApplied);
+ // "isForCappedCollection" is not parsed from raw oplog entry document.
+ return opApplied.isForCappedCollection;
+}
+
+TEST_F(
+ OplogApplierImplTest,
+ MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ ASSERT_FALSE(_testOplogEntryIsForCappedCollection(_opCtx.get(),
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ nss,
+ CollectionOptions()));
+}
+
+TEST_F(OplogApplierImplTest,
+ MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ ASSERT_TRUE(_testOplogEntryIsForCappedCollection(_opCtx.get(),
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ nss,
+ createOplogCollectionOptions()));
+}
+
+class MultiOplogEntryOplogApplierImplTest : public OplogApplierImplTest {
+public:
+ MultiOplogEntryOplogApplierImplTest()
+ : _nss1("test.preptxn1"), _nss2("test.preptxn2"), _txnNum(1) {}
+
+protected:
+ void setUp() override {
+ OplogApplierImplTest::setUp();
+ const NamespaceString cmdNss{"admin", "$cmd"};
+
+ _uuid1 = createCollectionWithUuid(_opCtx.get(), _nss1);
+ _uuid2 = createCollectionWithUuid(_opCtx.get(), _nss2);
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+
+ _lsid = makeLogicalSessionId(_opCtx.get());
+
+ _insertOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 1), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
+ << BSON("_id" << 1)))
+ << "partialTxn" << true),
+ _lsid,
+ _txnNum,
+ StmtId(0),
+ OpTime());
+ _insertOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 2), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o"
+ << BSON("_id" << 2)))
+ << "partialTxn" << true),
+ _lsid,
+ _txnNum,
+ StmtId(1),
+ _insertOp1->getOpTime());
+ _commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 3), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o"
+ << BSON("_id" << 3)))),
+ _lsid,
+ _txnNum,
+ StmtId(2),
+ _insertOp2->getOpTime());
+ _opObserver->onInsertsFn =
+ [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
+ stdx::lock_guard<Latch> lock(_insertMutex);
+ if (nss.isOplog() || nss == _nss1 || nss == _nss2 ||
+ nss == NamespaceString::kSessionTransactionsTableNamespace) {
+ _insertedDocs[nss].insert(_insertedDocs[nss].end(), docs.begin(), docs.end());
+ } else
+ FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front();
+ };
+
+ _writerPool = makeReplWriterPool();
+ }
+
+ void tearDown() override {
+ OplogApplierImplTest::tearDown();
+ }
+
+ void checkTxnTable(const LogicalSessionId& lsid,
+ const TxnNumber& txnNum,
+ const repl::OpTime& expectedOpTime,
+ Date_t expectedWallClock,
+ boost::optional<repl::OpTime> expectedStartOpTime,
+ DurableTxnStateEnum expectedState) {
+ repl::checkTxnTable(_opCtx.get(),
+ lsid,
+ txnNum,
+ expectedOpTime,
+ expectedWallClock,
+ expectedStartOpTime,
+ expectedState);
+ }
+
+ std::vector<BSONObj>& oplogDocs() {
+ return _insertedDocs[NamespaceString::kRsOplogNamespace];
+ }
+
+protected:
+ NamespaceString _nss1;
+ NamespaceString _nss2;
+ boost::optional<UUID> _uuid1;
+ boost::optional<UUID> _uuid2;
+ LogicalSessionId _lsid;
+ TxnNumber _txnNum;
+ boost::optional<OplogEntry> _insertOp1, _insertOp2;
+ boost::optional<OplogEntry> _commitOp;
+ std::map<NamespaceString, std::vector<BSONObj>> _insertedDocs;
+ std::unique_ptr<ThreadPool> _writerPool;
+
+private:
+ Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntryOplogApplierImplTest::_insertMutex");
+};
+
+TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSeparate) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+
+ // Apply a batch with only the first operation. This should result in the first oplog entry
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1}));
+ ASSERT_EQ(1U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ _insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Apply a batch with only the second operation. This should result in the second oplog entry
+ // being put in the oplog, but with no effect because the operation is part of a pending
+ // transaction.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp2}));
+ ASSERT_EQ(2U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ // The transaction table should not have been updated for partialTxn operations that are not the
+ // first in a transaction.
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ _insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Apply a batch with only the commit. This should result in the commit being put in the
+ // oplog, and the two previous entries being applied.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitOp}));
+ ASSERT_EQ(3U, oplogDocs().size());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(2U, _insertedDocs[_nss2].size());
+ ASSERT_BSONOBJ_EQ(oplogDocs().back(), _commitOp->getRaw());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitOp->getOpTime(),
+ _commitOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionAllAtOnce) {
+ // Skipping writes to oplog proves we're testing the code path which does not rely on reading
+ // the oplog.
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering),
+ _writerPool.get());
+
+ // Apply both inserts and the commit in a single batch. We expect no oplog entries to
+ // be inserted (because we've set skipWritesToOplog), and both entries to be committed.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}));
+ ASSERT_EQ(0U, oplogDocs().size());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(2U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitOp->getOpTime(),
+ _commitOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionTwoBatches) {
+ // Tests an unprepared transaction with ops both in the batch with the commit and prior
+ // batches.
+ // Populate transaction with 4 linked inserts, one in nss2 and the others in nss1.
+ std::vector<OplogEntry> insertOps;
+ std::vector<BSONObj> insertDocs;
+
+ const NamespaceString cmdNss{"admin", "$cmd"};
+ for (int i = 0; i < 4; i++) {
+ insertDocs.push_back(BSON("_id" << i));
+ insertOps.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), i + 1), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << (i == 1 ? _nss2.ns() : _nss1.ns()) << "ui"
+ << (i == 1 ? *_uuid2 : *_uuid1) << "o"
+ << insertDocs.back()))
+ << "partialTxn" << true),
+ _lsid,
+ _txnNum,
+ StmtId(i),
+ i == 0 ? OpTime() : insertOps.back().getOpTime()));
+ }
+ auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 5), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSONArray()),
+ _lsid,
+ _txnNum,
+ StmtId(4),
+ insertOps.back().getOpTime());
+
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+
+ // Insert the first entry in its own batch. This should result in the oplog entry being written
+ // but the entry should not be applied as it is part of a pending transaction.
+ const auto expectedStartOpTime = insertOps[0].getOpTime();
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOps[0]}));
+ ASSERT_EQ(1U, oplogDocs().size());
+ ASSERT_EQ(0U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(0U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ insertOps[0].getOpTime(),
+ insertOps[0].getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Insert the rest of the entries, including the commit. These entries should be added to the
+ // oplog, and all the entries including the first should be applied.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(),
+ {insertOps[1], insertOps[2], insertOps[3], commitOp}));
+ ASSERT_EQ(5U, oplogDocs().size());
+ ASSERT_EQ(3U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(1U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ commitOp.getOpTime(),
+ commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+
+ // Check docs and ordering of docs in nss1.
+ // The insert into nss2 is unordered with respect to those.
+ ASSERT_BSONOBJ_EQ(insertDocs[0], _insertedDocs[_nss1][0]);
+ ASSERT_BSONOBJ_EQ(insertDocs[1], _insertedDocs[_nss2].front());
+ ASSERT_BSONOBJ_EQ(insertDocs[2], _insertedDocs[_nss1][1]);
+ ASSERT_BSONOBJ_EQ(insertDocs[3], _insertedDocs[_nss1][2]);
+}
+
+TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) {
+ // Tests that two transactions on the same session ID in the same batch both
+ // apply correctly.
+ TxnNumber txnNum1(1);
+ TxnNumber txnNum2(2);
+
+
+ std::vector<OplogEntry> insertOps1, insertOps2;
+ const NamespaceString cmdNss{"admin", "$cmd"};
+ insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 1), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
+ << BSON("_id" << 1)))
+ << "partialTxn" << true),
+ _lsid,
+ txnNum1,
+ StmtId(0),
+ OpTime()));
+ insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 2), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
+ << BSON("_id" << 2)))
+ << "partialTxn" << true),
+
+ _lsid,
+ txnNum1,
+ StmtId(1),
+ insertOps1.back().getOpTime()));
+ insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(2), 1), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
+ << BSON("_id" << 3)))
+ << "partialTxn" << true),
+ _lsid,
+ txnNum2,
+ StmtId(0),
+ OpTime()));
+ insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(2), 2), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
+ << BSON("_id" << 4)))
+ << "partialTxn" << true),
+ _lsid,
+ txnNum2,
+ StmtId(1),
+ insertOps2.back().getOpTime()));
+ auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 3), 1LL},
+ _nss1,
+ BSON("applyOps" << BSONArray()),
+ _lsid,
+ txnNum1,
+ StmtId(2),
+ insertOps1.back().getOpTime());
+ auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 3), 1LL},
+ _nss1,
+ BSON("applyOps" << BSONArray()),
+ _lsid,
+ txnNum2,
+ StmtId(2),
+ insertOps2.back().getOpTime());
+
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+
+ // Note the insert counter so we can check it later. It is necessary to use opCounters as
+ // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in
+ // the opObserver.
+ int insertsBefore = replOpCounters.getInsert()->load();
+ // Insert all the oplog entries in one batch. All inserts should be executed, in order, exactly
+ // once.
+ ASSERT_OK(oplogApplier.multiApply(
+ _opCtx.get(),
+ {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}));
+ ASSERT_EQ(6U, oplogDocs().size());
+ ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore);
+ ASSERT_EQ(4U, _insertedDocs[_nss1].size());
+ checkTxnTable(_lsid,
+ txnNum2,
+ commitOp2.getOpTime(),
+ commitOp2.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+
+ // Check docs and ordering of docs in nss1.
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), _insertedDocs[_nss1][0]);
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 2), _insertedDocs[_nss1][1]);
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 3), _insertedDocs[_nss1][2]);
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 4), _insertedDocs[_nss1][3]);
+}
+
+
+class MultiOplogEntryPreparedTransactionTest : public MultiOplogEntryOplogApplierImplTest {
+protected:
+ void setUp() override {
+ MultiOplogEntryOplogApplierImplTest::setUp();
+
+ _prepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 3), 1LL},
+ _nss1,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o"
+ << BSON("_id" << 3)))
+ << "prepare" << true),
+ _lsid,
+ _txnNum,
+ StmtId(2),
+ _insertOp2->getOpTime());
+ _singlePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 3), 1LL},
+ _nss1,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
+ << BSON("_id" << 0)))
+ << "prepare" << true),
+ _lsid,
+ _txnNum,
+ StmtId(0),
+ OpTime());
+ _commitPrepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 4), 1LL},
+ _nss1,
+ BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)),
+ _lsid,
+ _txnNum,
+ StmtId(3),
+ _prepareWithPrevOp->getOpTime());
+ _commitSinglePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 4), 1LL},
+ _nss1,
+ BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)),
+ _lsid,
+ _txnNum,
+ StmtId(1),
+ _prepareWithPrevOp->getOpTime());
+ _abortPrepareWithPrevOp =
+ makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL},
+ _nss1,
+ BSON("abortTransaction" << 1),
+ _lsid,
+ _txnNum,
+ StmtId(3),
+ _prepareWithPrevOp->getOpTime());
+ _abortSinglePrepareApplyOp = _abortPrepareWithPrevOp =
+ makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL},
+ _nss1,
+ BSON("abortTransaction" << 1),
+ _lsid,
+ _txnNum,
+ StmtId(1),
+ _singlePrepareApplyOp->getOpTime());
+ }
+
+protected:
+ boost::optional<OplogEntry> _commitPrepareWithPrevOp, _abortPrepareWithPrevOp,
+ _singlePrepareApplyOp, _prepareWithPrevOp, _commitSinglePrepareApplyOp,
+ _abortSinglePrepareApplyOp;
+
+private:
+ Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntryPreparedTransactionTest::_insertMutex");
+};
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionSteadyState) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+
+ // Apply a batch with the insert operations. This should result in the oplog entries
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_EQ(2U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
+ ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ _insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Apply a batch with only the prepare. This should result in the prepare being put in the
+ // oplog, and the two previous entries being applied (but in a transaction) along with the
+ // nested insert in the prepare oplog entry.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_EQ(3U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(2U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareWithPrevOp->getOpTime(),
+ _prepareWithPrevOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the commit. This should result in the commit being put in the
+ // oplog, and the three previous entries being committed.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(2U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitPrepareWithPrevOp->getOpTime(),
+ _commitPrepareWithPrevOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+
+ // Apply a batch with the insert operations. This should result in the oplog entries
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ _insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Apply a batch with only the prepare. This should result in the prepare being put in the
+ // oplog, and the two previous entries being applied (but in a transaction) along with the
+ // nested insert in the prepare oplog entry.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareWithPrevOp->getOpTime(),
+ _prepareWithPrevOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the abort. This should result in the abort being put in the
+ // oplog and the transaction table being updated accordingly.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}));
+ ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(2U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _abortPrepareWithPrevOp->getOpTime(),
+ _abortPrepareWithPrevOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kAborted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInitialSync) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
+ _writerPool.get());
+ // Apply a batch with the insert operations. This should result in the oplog entries
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_EQ(2U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
+ ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ _insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
+ // the oplog, but, since this is initial sync, nothing else.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_EQ(3U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareWithPrevOp->getOpTime(),
+ _prepareWithPrevOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the commit. This should result in the commit being put in the
+ // oplog, and the three previous entries being applied.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(2U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitPrepareWithPrevOp->getOpTime(),
+ _commitPrepareWithPrevOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionRecovery) {
+ // For recovery, the oplog must contain the operations before starting.
+ for (auto&& entry :
+ {*_insertOp1, *_insertOp2, *_prepareWithPrevOp, *_commitPrepareWithPrevOp}) {
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ _opCtx.get(),
+ NamespaceString::kRsOplogNamespace,
+ {entry.toBSON(), entry.getOpTime().getTimestamp()},
+ entry.getOpTime().getTerm()));
+ }
+ // Ignore docs inserted into oplog in setup.
+ oplogDocs().clear();
+
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering),
+ _writerPool.get());
+
+ // Apply a batch with the insert operations. This should have no effect, because this is
+ // recovery.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_TRUE(oplogDocs().empty());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ _insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Apply a batch with only the prepare applyOps. This should have no effect, since this is
+ // recovery.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_TRUE(oplogDocs().empty());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareWithPrevOp->getOpTime(),
+ _prepareWithPrevOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the commit. This should result in the the three previous entries
+ // being applied.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_TRUE(oplogDocs().empty());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(2U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitPrepareWithPrevOp->getOpTime(),
+ _commitPrepareWithPrevOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedTransaction) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+ const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
+
+ // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
+ // the oplog, and the nested insert being applied (but in a transaction).
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_EQ(1U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _singlePrepareApplyOp->getOpTime(),
+ _singlePrepareApplyOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the commit. This should result in the commit being put in the
+ // oplog, and prepared insert being committed.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitSinglePrepareApplyOp->getOpTime(),
+ _commitSinglePrepareApplyOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTransaction) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+
+ auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 3), 1LL},
+ _nss1,
+ BSON("applyOps" << BSONArray() << "prepare" << true),
+ _lsid,
+ _txnNum,
+ StmtId(0),
+ OpTime());
+ const auto expectedStartOpTime = emptyPrepareApplyOp.getOpTime();
+
+ // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
+ // the oplog, and the nested insert being applied (but in a transaction).
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {emptyPrepareApplyOp}));
+ ASSERT_EQ(1U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ emptyPrepareApplyOp.getOpTime(),
+ emptyPrepareApplyOp.getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the commit. This should result in the commit being put in the
+ // oplog, and prepared insert being committed.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitSinglePrepareApplyOp->getOpTime(),
+ _commitSinglePrepareApplyOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPreparedTransaction) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ _writerPool.get());
+
+ const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
+ // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
+ // the oplog, and the nested insert being applied (but in a transaction).
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ checkTxnTable(_lsid,
+ _txnNum,
+ _singlePrepareApplyOp->getOpTime(),
+ _singlePrepareApplyOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the abort. This should result in the abort being put in the
+ // oplog and the transaction table being updated accordingly.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}));
+ ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _abortSinglePrepareApplyOp->getOpTime(),
+ _abortSinglePrepareApplyOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kAborted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest,
+ MultiApplySingleApplyOpsPreparedTransactionInitialSync) {
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
+ _writerPool.get());
+
+ const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
+
+ // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
+ // the oplog, but, since this is initial sync, nothing else.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_EQ(1U, oplogDocs().size());
+ ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _singlePrepareApplyOp->getOpTime(),
+ _singlePrepareApplyOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the commit. This should result in the commit being put in the
+ // oplog, and the previous entry being applied.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitSinglePrepareApplyOp->getOpTime(),
+ _commitSinglePrepareApplyOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest,
+ MultiApplySingleApplyOpsPreparedTransactionRecovery) {
+ // For recovery, the oplog must contain the operations before starting.
+ for (auto&& entry : {*_singlePrepareApplyOp, *_commitPrepareWithPrevOp}) {
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ _opCtx.get(),
+ NamespaceString::kRsOplogNamespace,
+ {entry.toBSON(), entry.getOpTime().getTimestamp()},
+ entry.getOpTime().getTerm()));
+ }
+ // Ignore docs inserted into oplog in setup.
+ oplogDocs().clear();
+
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering),
+ _writerPool.get());
+
+ const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
+
+ // Apply a batch with only the prepare applyOps. This should have no effect, since this is
+ // recovery.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_TRUE(oplogDocs().empty());
+ ASSERT_TRUE(_insertedDocs[_nss1].empty());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _singlePrepareApplyOp->getOpTime(),
+ _singlePrepareApplyOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the commit. This should result in the previous entry being
+ // applied.
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_TRUE(oplogDocs().empty());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitSinglePrepareApplyOp->getOpTime(),
+ _commitSinglePrepareApplyOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+class OplogApplierImplTxnTableTest : public OplogApplierImplTest {
+public:
+ void setUp() override {
+ OplogApplierImplTest::setUp();
+
+ MongoDSessionCatalog::onStepUp(_opCtx.get());
+
+ DBDirectClient client(_opCtx.get());
+ BSONObj result;
+ ASSERT(client.runCommand(kNs.db().toString(), BSON("create" << kNs.coll()), result));
+ }
+
+ /**
+ * Creates an OplogEntry with given parameters and preset defaults for this test suite.
+ */
+ repl::OplogEntry makeOplogEntry(const NamespaceString& ns,
+ repl::OpTime opTime,
+ repl::OpTypeEnum opType,
+ BSONObj object,
+ boost::optional<BSONObj> object2,
+ const OperationSessionInfo& sessionInfo,
+ Date_t wallClockTime) {
+ return repl::OplogEntry(opTime, // optime
+ boost::none, // hash
+ opType, // opType
+ ns, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ 0, // version
+ object, // o
+ object2, // o2
+ sessionInfo, // sessionInfo
+ boost::none, // false
+ wallClockTime, // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+ }
+
+ /**
+ * Creates an OplogEntry with given parameters and preset defaults for this test suite.
+ */
+ repl::OplogEntry makeOplogEntryForMigrate(const NamespaceString& ns,
+ repl::OpTime opTime,
+ repl::OpTypeEnum opType,
+ BSONObj object,
+ boost::optional<BSONObj> object2,
+ const OperationSessionInfo& sessionInfo,
+ Date_t wallClockTime) {
+ return repl::OplogEntry(opTime, // optime
+ boost::none, // hash
+ opType, // opType
+ ns, // namespace
+ boost::none, // uuid
+ true, // fromMigrate
+ 0, // version
+ object, // o
+ object2, // o2
+ sessionInfo, // sessionInfo
+ boost::none, // false
+ wallClockTime, // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+ }
+
+ void checkTxnTable(const OperationSessionInfo& sessionInfo,
+ const repl::OpTime& expectedOpTime,
+ Date_t expectedWallClock) {
+ invariant(sessionInfo.getSessionId());
+ invariant(sessionInfo.getTxnNumber());
+
+ repl::checkTxnTable(_opCtx.get(),
+ *sessionInfo.getSessionId(),
+ *sessionInfo.getTxnNumber(),
+ expectedOpTime,
+ expectedWallClock,
+ {},
+ {});
+ }
+
+ static const NamespaceString& nss() {
+ return kNs;
+ }
+
+private:
+ static const NamespaceString kNs;
+};
+
+const NamespaceString OplogApplierImplTxnTableTest::kNs("test.foo");
+
+TEST_F(OplogApplierImplTxnTableTest, SimpleWriteWithTxn) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ const auto date = Date_t::now();
+
+ auto insertOp = makeOplogEntry(nss(),
+ {Timestamp(1, 0), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto writerPool = makeReplWriterPool();
+
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp}));
+
+ checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date);
+}
+
+TEST_F(OplogApplierImplTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ const auto date = Date_t::now();
+
+ auto insertOp = makeOplogEntry(nss(),
+ {Timestamp(1, 0), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
+ {Timestamp(2, 0), 1},
+ repl::OpTypeEnum::kDelete,
+ BSON("_id" << sessionInfo.getSessionId()->toBSON()),
+ boost::none,
+ {},
+ Date_t::now());
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp, deleteOp}));
+
+ ASSERT_FALSE(docExists(
+ _opCtx.get(),
+ NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())));
+}
+
+TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTable) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ auto date = Date_t::now();
+
+ auto insertOp = makeOplogEntry(nss(),
+ {Timestamp(1, 0), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
+ {Timestamp(2, 0), 1},
+ repl::OpTypeEnum::kDelete,
+ BSON("_id" << sessionInfo.getSessionId()->toBSON()),
+ boost::none,
+ {},
+ Date_t::now());
+
+ date = Date_t::now();
+ sessionInfo.setTxnNumber(7);
+ auto insertOp2 = makeOplogEntry(nss(),
+ {Timestamp(3, 0), 2},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 6),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}));
+
+ checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date);
+}
+
+TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTable) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ auto date = Date_t::now();
+
+ auto insertOp = makeOplogEntry(nss(),
+ {Timestamp(1, 0), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ repl::OpTime newWriteOpTime(Timestamp(2, 0), 1);
+ auto updateOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
+ {Timestamp(4, 0), 1},
+ repl::OpTypeEnum::kUpdate,
+ BSON("$set" << BSON("lastWriteOpTime" << newWriteOpTime)),
+ BSON("_id" << sessionInfo.getSessionId()->toBSON()),
+ {},
+ Date_t::now());
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp, updateOp}));
+
+ checkTxnTable(sessionInfo, newWriteOpTime, date);
+}
+
+TEST_F(OplogApplierImplTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSession) {
+ const NamespaceString cmdNss{"admin", "$cmd"};
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ auto date = Date_t::now();
+ auto uuid = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid();
+ }();
+
+ repl::OpTime retryableInsertOpTime(Timestamp(1, 0), 1);
+
+ auto retryableInsertOp = makeOplogEntry(nss(),
+ retryableInsertOpTime,
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ repl::OpTime txnInsertOpTime(Timestamp(2, 0), 1);
+ sessionInfo.setTxnNumber(4);
+
+ auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ txnInsertOpTime,
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss().ns() << "ui" << uuid << "o"
+ << BSON("_id" << 2)))
+ << "partialTxn" << true),
+ sessionId,
+ *sessionInfo.getTxnNumber(),
+ StmtId(0),
+ OpTime());
+
+ repl::OpTime txnCommitOpTime(Timestamp(3, 0), 1);
+ auto txnCommitOp =
+ makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime,
+ cmdNss,
+ BSON("applyOps" << BSONArray()),
+ sessionId,
+ *sessionInfo.getTxnNumber(),
+ StmtId(1),
+ txnInsertOpTime);
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}));
+
+ repl::checkTxnTable(_opCtx.get(),
+ *sessionInfo.getSessionId(),
+ *sessionInfo.getTxnNumber(),
+ txnCommitOpTime,
+ txnCommitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(OplogApplierImplTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSession) {
+ const NamespaceString cmdNss{"admin", "$cmd"};
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(3);
+ auto date = Date_t::now();
+ auto uuid = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid();
+ }();
+
+ repl::OpTime txnInsertOpTime(Timestamp(1, 0), 1);
+ auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ txnInsertOpTime,
+ cmdNss,
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss().ns() << "ui" << uuid << "o"
+ << BSON("_id" << 2)))
+ << "partialTxn" << true),
+ sessionId,
+ *sessionInfo.getTxnNumber(),
+ StmtId(0),
+ OpTime());
+
+ repl::OpTime txnCommitOpTime(Timestamp(2, 0), 1);
+ auto txnCommitOp =
+ makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime,
+ cmdNss,
+ BSON("applyOps" << BSONArray()),
+ sessionId,
+ *sessionInfo.getTxnNumber(),
+ StmtId(1),
+ txnInsertOpTime);
+
+ repl::OpTime retryableInsertOpTime(Timestamp(3, 0), 1);
+ sessionInfo.setTxnNumber(4);
+
+ auto retryableInsertOp = makeOplogEntry(nss(),
+ retryableInsertOpTime,
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ date);
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}));
+
+ repl::checkTxnTable(_opCtx.get(),
+ *sessionInfo.getSessionId(),
+ *sessionInfo.getTxnNumber(),
+ retryableInsertOpTime,
+ retryableInsertOp.getWallClockTime(),
+ boost::none,
+ boost::none);
+}
+
+
+TEST_F(OplogApplierImplTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
+ NamespaceString ns0("test.0");
+ NamespaceString ns1("test.1");
+ NamespaceString ns2("test.2");
+ NamespaceString ns3("test.3");
+
+ DBDirectClient client(_opCtx.get());
+ BSONObj result;
+ ASSERT(client.runCommand(ns0.db().toString(), BSON("create" << ns0.coll()), result));
+ ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result));
+ ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result));
+ ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result));
+ auto uuid0 = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), ns0).getCollection()->uuid();
+ }();
+ auto uuid1 = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), ns1).getCollection()->uuid();
+ }();
+ auto uuid2 = [&] {
+ return AutoGetCollectionForRead(_opCtx.get(), ns2).getCollection()->uuid();
+ }();
+
+ // Entries with a session id and a txnNumber update the transaction table.
+ auto lsidSingle = makeLogicalSessionIdForTest();
+ auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0);
+
+ // For entries with the same session, the entry with a larger txnNumber is saved.
+ auto lsidDiffTxn = makeLogicalSessionIdForTest();
+ auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1);
+ auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1);
+
+ // For entries with the same session and txnNumber, the later optime is saved.
+ auto lsidSameTxn = makeLogicalSessionIdForTest();
+ auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0);
+ auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+ {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1);
+
+ // Entries with a session id but no txnNumber do not lead to updates.
+ auto lsidNoTxn = makeLogicalSessionIdForTest();
+ OperationSessionInfo info;
+ info.setSessionId(lsidNoTxn);
+ auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo(
+ {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info);
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(
+ _opCtx.get(),
+ {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}));
+
+ // The txnNum and optime of the only write were saved.
+ auto resultSingleDoc =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON()));
+ ASSERT_TRUE(!resultSingleDoc.isEmpty());
+
+ auto resultSingle =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc);
+
+ ASSERT_EQ(resultSingle.getTxnNum(), 5LL);
+ ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1));
+
+ // The txnNum and optime of the write with the larger txnNum were saved.
+ auto resultDiffTxnDoc =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON()));
+ ASSERT_TRUE(!resultDiffTxnDoc.isEmpty());
+
+ auto resultDiffTxn =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc);
+
+ ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL);
+ ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1));
+
+ // The txnNum and optime of the write with the later optime were saved.
+ auto resultSameTxnDoc =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON()));
+ ASSERT_TRUE(!resultSameTxnDoc.isEmpty());
+
+ auto resultSameTxn =
+ SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc);
+
+ ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL);
+ ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1));
+
+ // There is no entry for the write with no txnNumber.
+ auto resultNoTxn =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON()));
+ ASSERT_TRUE(resultNoTxn.isEmpty());
+}
+
+TEST_F(OplogApplierImplTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) {
+ const auto insertLsid = makeLogicalSessionIdForTest();
+ OperationSessionInfo insertSessionInfo;
+ insertSessionInfo.setSessionId(insertLsid);
+ insertSessionInfo.setTxnNumber(3);
+ auto date = Date_t::now();
+
+ auto innerOplog = makeOplogEntry(nss(),
+ {Timestamp(10, 10), 1},
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ insertSessionInfo,
+ date);
+
+ auto outerInsertDate = Date_t::now();
+ auto insertOplog = makeOplogEntryForMigrate(nss(),
+ {Timestamp(40, 0), 1},
+ repl::OpTypeEnum::kNoop,
+ BSON("$sessionMigrateInfo" << 1),
+ innerOplog.toBSON(),
+ insertSessionInfo,
+ outerInsertDate);
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOplog}));
+
+ checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate);
+}
+
+TEST_F(OplogApplierImplTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) {
+ const auto preImageLsid = makeLogicalSessionIdForTest();
+ OperationSessionInfo preImageSessionInfo;
+ preImageSessionInfo.setSessionId(preImageLsid);
+ preImageSessionInfo.setTxnNumber(3);
+ auto preImageDate = Date_t::now();
+
+ auto preImageOplog = makeOplogEntryForMigrate(nss(),
+ {Timestamp(30, 0), 1},
+ repl::OpTypeEnum::kNoop,
+ BSON("_id" << 1),
+ boost::none,
+ preImageSessionInfo,
+ preImageDate);
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {preImageOplog}));
+
+ ASSERT_FALSE(docExists(_opCtx.get(),
+ NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName
+ << preImageSessionInfo.getSessionId()->toBSON())));
+}
+
+TEST_F(OplogApplierImplTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) {
+ const auto lsid = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(lsid);
+ sessionInfo.setTxnNumber(3);
+
+ auto oplog = makeOplogEntry(nss(),
+ {Timestamp(30, 0), 1},
+ repl::OpTypeEnum::kNoop,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ Date_t::now());
+
+ auto writerPool = makeReplWriterPool();
+ NoopOplogApplierObserver observer;
+ OplogApplierImpl oplogApplier(
+ nullptr, // executor
+ nullptr, // oplogBuffer
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+
+ ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {oplog}));
+
+ ASSERT_FALSE(docExists(
+ _opCtx.get(),
+ NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())));
+}
+
+} // namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
new file mode 100644
index 00000000000..1e85b6eba26
--- /dev/null
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
@@ -0,0 +1,142 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/oplog_applier_impl_test_fixture.h"
+
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/repl/drop_pending_collection_reaper.h"
+#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+
+namespace mongo {
+namespace repl {
+
+void OplogApplierImplOpObserver::onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) {
+ if (!onInsertsFn) {
+ return;
+ }
+ std::vector<BSONObj> docs;
+ for (auto it = begin; it != end; ++it) {
+ const InsertStatement& insertStatement = *it;
+ docs.push_back(insertStatement.doc.getOwned());
+ }
+ onInsertsFn(opCtx, nss, docs);
+}
+
+void OplogApplierImplOpObserver::onDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ StmtId stmtId,
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) {
+ if (!onDeleteFn) {
+ return;
+ }
+ onDeleteFn(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc);
+}
+
+void OplogApplierImplOpObserver::onCreateCollection(OperationContext* opCtx,
+ Collection* coll,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj& idIndex,
+ const OplogSlot& createOpTime) {
+ if (!onCreateCollectionFn) {
+ return;
+ }
+ onCreateCollectionFn(opCtx, coll, collectionName, options, idIndex);
+}
+
+void OplogApplierImplTest::setUp() {
+ ServiceContextMongoDTest::setUp();
+
+ serviceContext = getServiceContext();
+ _opCtx = cc().makeOperationContext();
+
+ ReplicationCoordinator::set(serviceContext,
+ std::make_unique<ReplicationCoordinatorMock>(serviceContext));
+ ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY));
+
+ StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>());
+
+ DropPendingCollectionReaper::set(
+ serviceContext, std::make_unique<DropPendingCollectionReaper>(getStorageInterface()));
+ repl::setOplogCollectionName(serviceContext);
+ repl::createOplog(_opCtx.get());
+
+ _consistencyMarkers = std::make_unique<ReplicationConsistencyMarkersMock>();
+
+ // Set up an OpObserver to track the documents OplogApplierImpl inserts.
+ auto opObserver = std::make_unique<OplogApplierImplOpObserver>();
+ _opObserver = opObserver.get();
+ auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver());
+ opObserverRegistry->addObserver(std::move(opObserver));
+
+ // Initialize the featureCompatibilityVersion server parameter. This is necessary because this
+ // test fixture does not create a featureCompatibilityVersion document from which to initialize
+ // the server parameter.
+ serverGlobalParams.featureCompatibility.setVersion(
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44);
+}
+
+void OplogApplierImplTest::tearDown() {
+ _opCtx.reset();
+ _consistencyMarkers = {};
+ DropPendingCollectionReaper::set(serviceContext, {});
+ StorageInterface::set(serviceContext, {});
+ ServiceContextMongoDTest::tearDown();
+}
+
+ReplicationConsistencyMarkers* OplogApplierImplTest::getConsistencyMarkers() const {
+ return _consistencyMarkers.get();
+}
+
+StorageInterface* OplogApplierImplTest::getStorageInterface() const {
+ return StorageInterface::get(serviceContext);
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
new file mode 100644
index 00000000000..31c703bc97e
--- /dev/null
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+#include "mongo/db/concurrency/lock_manager_defs.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/op_observer_noop.h"
+#include "mongo/db/repl/oplog_applier_impl.h"
+#include "mongo/db/repl/replication_consistency_markers.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_txn_record_gen.h"
+
+namespace mongo {
+
+class BSONObj;
+class OperationContext;
+
+namespace repl {
+
+/**
+ * OpObserver for OplogApplierImpl test fixture.
+ */
+class OplogApplierImplOpObserver : public OpObserverNoop {
+public:
+ /**
+ * This function is called whenever OplogApplierImpl inserts documents into a collection.
+ */
+ void onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) override;
+
+ /**
+ * This function is called whenever OplogApplierImpl deletes a document from a collection.
+ */
+ void onDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ StmtId stmtId,
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) override;
+
+ /**
+ * Called when OplogApplierImpl creates a collection.
+ */
+ void onCreateCollection(OperationContext* opCtx,
+ Collection* coll,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj& idIndex,
+ const OplogSlot& createOpTime) override;
+
+ // Hooks for OpObserver functions. Defaults to a no-op function but may be overridden to check
+ // actual documents mutated.
+ std::function<void(OperationContext*, const NamespaceString&, const std::vector<BSONObj>&)>
+ onInsertsFn;
+
+ std::function<void(OperationContext*,
+ const NamespaceString&,
+ OptionalCollectionUUID,
+ StmtId,
+ bool,
+ const boost::optional<BSONObj>&)>
+ onDeleteFn;
+
+ std::function<void(OperationContext*,
+ Collection*,
+ const NamespaceString&,
+ const CollectionOptions&,
+ const BSONObj&)>
+ onCreateCollectionFn;
+};
+
+class OplogApplierImplTest : public ServiceContextMongoDTest {
+protected:
+ void _testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
+ const OplogEntry& op,
+ bool expectedApplyOpCalled);
+
+ Status _syncApplyWrapper(OperationContext* opCtx,
+ const OplogEntryBatch& batch,
+ OplogApplication::Mode oplogApplicationMode);
+
+ ServiceContext::UniqueOperationContext _opCtx;
+ std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers;
+ ServiceContext* serviceContext;
+ OplogApplierImplOpObserver* _opObserver = nullptr;
+
+ // Implements the OplogApplierImpl::MultiSyncApplyFn interface and does nothing.
+ static Status noopApplyOperationFn(OperationContext*,
+ MultiApplier::OperationPtrs*,
+ SyncTail* st,
+ WorkerMultikeyPathInfo*) {
+ return Status::OK();
+ }
+
+ OpTime nextOpTime() {
+ static long long lastSecond = 1;
+ return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL);
+ }
+
+ void setUp() override;
+ void tearDown() override;
+
+
+ ReplicationCoordinator* getReplCoord() const;
+ ReplicationConsistencyMarkers* getConsistencyMarkers() const;
+ StorageInterface* getStorageInterface() const;
+
+ Status runOpSteadyState(const OplogEntry& op);
+ Status runOpsSteadyState(std::vector<OplogEntry> ops);
+ Status runOpInitialSync(const OplogEntry& entry);
+ Status runOpsInitialSync(std::vector<OplogEntry> ops);
+ Status runOpPtrsInitialSync(MultiApplier::OperationPtrs ops);
+
+ UUID kUuid{UUID::gen()};
+};
+
+Status failedApplyCommand(OperationContext* opCtx,
+ const BSONObj& theOperation,
+ OplogApplication::Mode);
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index 5b086263114..ca2795a6b39 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -55,7 +55,6 @@ public:
explicit OplogApplierMock(OplogBuffer* oplogBuffer);
void _run(OplogBuffer* oplogBuffer) final;
- void _shutdown() final;
StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final;
};
@@ -67,8 +66,6 @@ OplogApplierMock::OplogApplierMock(OplogBuffer* oplogBuffer)
void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {}
-void OplogApplierMock::_shutdown() {}
-
StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) {
return OpTime();
}
diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/opqueue_batcher.cpp
index 8c3802c1e3f..a7b41404193 100644
--- a/src/mongo/db/repl/opqueue_batcher.cpp
+++ b/src/mongo/db/repl/opqueue_batcher.cpp
@@ -36,11 +36,11 @@ namespace mongo {
namespace repl {
-OpQueueBatcher::OpQueueBatcher(SyncTail* syncTail,
+OpQueueBatcher::OpQueueBatcher(OplogApplier* oplogApplier,
StorageInterface* storageInterface,
OplogBuffer* oplogBuffer,
OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn)
- : _syncTail(syncTail),
+ : _oplogApplier(oplogApplier),
_storageInterface(storageInterface),
_oplogBuffer(oplogBuffer),
_getNextApplierBatchFn(getNextApplierBatchFn),
@@ -126,7 +126,7 @@ void OpQueueBatcher::run() {
// If we don't have anything in the queue, wait a bit for something to appear.
if (oplogEntries.empty()) {
- if (_syncTail->inShutdown()) {
+ if (_oplogApplier->inShutdown()) {
ops.setMustShutdownFlag();
} else {
// Block up to 1 second.
diff --git a/src/mongo/db/repl/opqueue_batcher.h b/src/mongo/db/repl/opqueue_batcher.h
index 0cdad053ddd..48432377dd5 100644
--- a/src/mongo/db/repl/opqueue_batcher.h
+++ b/src/mongo/db/repl/opqueue_batcher.h
@@ -30,7 +30,7 @@
#pragma once
#include "mongo/db/repl/initial_syncer.h"
-#include "mongo/db/repl/sync_tail.h"
+#include "mongo/db/repl/oplog_applier_impl.h"
namespace mongo {
namespace repl {
@@ -122,7 +122,7 @@ public:
/**
* Constructs an OpQueueBatcher
*/
- OpQueueBatcher(SyncTail* syncTail,
+ OpQueueBatcher(OplogApplier* oplogApplier,
StorageInterface* storageInterface,
OplogBuffer* oplogBuffer,
OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn);
@@ -143,7 +143,7 @@ private:
void run();
- SyncTail* const _syncTail;
+ OplogApplier* _oplogApplier;
StorageInterface* const _storageInterface;
OplogBuffer* const _oplogBuffer;
OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index d1975168f77..49be20493d8 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -223,6 +223,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
replCoord,
_replicationProcess->getConsistencyMarkers(),
_storageInterface,
+ multiSyncApply,
OplogApplier::Options(OplogApplication::Mode::kSecondary),
_writerPool.get());
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 42b379c832e..4ef5b7bcc75 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -360,9 +360,10 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
OplogApplierImpl oplogApplier(nullptr,
&oplogBuffer,
&stats,
- nullptr,
+ ReplicationCoordinator::get(opCtx),
_consistencyMarkers,
_storageInterface,
+ multiSyncApply,
OplogApplier::Options(OplogApplication::Mode::kRecovering),
writerPool.get());
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 18b3d7e0495..a5402fadfa7 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -85,23 +85,12 @@ namespace mongo {
namespace repl {
namespace {
-MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion);
-MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationAfterWritingOplogEntries);
MONGO_FAIL_POINT_DEFINE(hangAfterRecordingOpApplicationStartTime);
// The oplog entries applied
Counter64 opsAppliedStats;
ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats);
-// Tracks the oplog application batch size.
-Counter64 oplogApplicationBatchSize;
-ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.batchSize",
- &oplogApplicationBatchSize);
-
-// Number and time of each ApplyOps worker pool round
-TimerStats applyBatchStats;
-ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats);
-
NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) {
auto optionalUuid = oplogEntry.getUuid();
if (!optionalUuid) {
@@ -165,17 +154,9 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod
} // namespace
SyncTail::SyncTail(OplogApplier::Observer* observer,
- ReplicationConsistencyMarkers* consistencyMarkers,
StorageInterface* storageInterface,
- MultiSyncApplyFunc func,
- ThreadPool* writerPool,
const OplogApplier::Options& options)
- : _observer(observer),
- _consistencyMarkers(consistencyMarkers),
- _storageInterface(storageInterface),
- _applyFunc(func),
- _writerPool(writerPool),
- _options(options) {}
+ : _observer(observer), _storageInterface(storageInterface), _options(options) {}
SyncTail::~SyncTail() {}
@@ -185,74 +166,6 @@ const OplogApplier::Options& SyncTail::getOptions() const {
namespace {
-// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that 'ops'
-// stays valid until all scheduled work in the thread pool completes.
-void scheduleWritesToOplog(OperationContext* opCtx,
- StorageInterface* storageInterface,
- ThreadPool* threadPool,
- const MultiApplier::Operations& ops) {
-
- auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) {
- // The returned function will be run in a separate thread after this returns. Therefore all
- // captures other than 'ops' must be by value since they will not be available. The caller
- // guarantees that 'ops' will stay in scope until the spawned threads complete.
- return [storageInterface, &ops, begin, end](auto status) {
- invariant(status);
-
- auto opCtx = cc().makeOperationContext();
-
- // This code path is only executed on secondaries and initial syncing nodes, so it is
- // safe to exclude any writes from Flow Control.
- opCtx->setShouldParticipateInFlowControl(false);
-
- UnreplicatedWritesBlock uwb(opCtx.get());
- ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
- opCtx->lockState());
-
- std::vector<InsertStatement> docs;
- docs.reserve(end - begin);
- for (size_t i = begin; i < end; i++) {
- // Add as unowned BSON to avoid unnecessary ref-count bumps.
- // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed.
- docs.emplace_back(InsertStatement{ops[i].getRaw(),
- ops[i].getOpTime().getTimestamp(),
- ops[i].getOpTime().getTerm()});
- }
-
- fassert(40141,
- storageInterface->insertDocuments(
- opCtx.get(), NamespaceString::kRsOplogNamespace, docs));
- };
- };
-
- // We want to be able to take advantage of bulk inserts so we don't use multiple threads if it
- // would result too little work per thread. This also ensures that we can amortize the
- // setup/teardown overhead across many writes.
- const size_t kMinOplogEntriesPerThread = 16;
- const bool enoughToMultiThread =
- ops.size() >= kMinOplogEntriesPerThread * threadPool->getStats().numThreads;
-
- // Only doc-locking engines support parallel writes to the oplog because they are required to
- // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally,
- // there would be no way to take advantage of multiple threads if a storage engine doesn't
- // support document locking.
- if (!enoughToMultiThread ||
- !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
-
- threadPool->schedule(makeOplogWriterForRange(0, ops.size()));
- return;
- }
-
-
- const size_t numOplogThreads = threadPool->getStats().numThreads;
- const size_t numOpsPerThread = ops.size() / numOplogThreads;
- for (size_t thread = 0; thread < numOplogThreads; thread++) {
- size_t begin = thread * numOpsPerThread;
- size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread;
- threadPool->schedule(makeOplogWriterForRange(begin, end));
- }
-}
-
/**
* Caches per-collection properties which are relevant for oplog application, so that they don't
* have to be retrieved repeatedly for each op.
@@ -366,16 +279,6 @@ void addDerivedOps(OperationContext* opCtx,
} // namespace
-void SyncTail::shutdown() {
- stdx::lock_guard<Latch> lock(_mutex);
- _inShutdown = true;
-}
-
-bool SyncTail::inShutdown() const {
- stdx::lock_guard<Latch> lock(_mutex);
- return _inShutdown;
-}
-
Status syncApply(OperationContext* opCtx,
const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode) {
@@ -691,158 +594,5 @@ void SyncTail::fillWriterVectors(OperationContext* opCtx,
}
}
-StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
- invariant(!ops.empty());
-
- LOG(2) << "replication batch size is " << ops.size();
-
- // Stop all readers until we're done. This also prevents doc-locking engines from deleting old
- // entries from the oplog until we finish writing.
- Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
-
- auto replCoord = ReplicationCoordinator::get(opCtx);
- if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
- severe() << "attempting to replicate ops while primary";
- return {ErrorCodes::CannotApplyOplogWhilePrimary,
- "attempting to replicate ops while primary"};
- }
-
- // Increment the batch size stat.
- oplogApplicationBatchSize.increment(ops.size());
-
- std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads);
- {
- // Each node records cumulative batch application stats for itself using this timer.
- TimerHolder timer(&applyBatchStats);
-
- // We must wait for the all work we've dispatched to complete before leaving this block
- // because the spawned threads refer to objects on the stack
- ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); });
-
- // Write batch of ops into oplog.
- if (!_options.skipWritesToOplog) {
- _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp());
- scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops);
- }
-
- // Holds 'pseudo operations' generated by secondaries to aid in replication.
- // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied.
- // Pseudo operations include:
- // - applyOps operations expanded to individual ops.
- // - ops to update config.transactions. Normal writes to config.transactions in the
- // primary don't create an oplog entry, so extract info from writes with transactions
- // and create a pseudo oplog.
- std::vector<MultiApplier::Operations> derivedOps;
-
- std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
-
- // Wait for writes to finish before applying ops.
- _writerPool->waitForIdle();
-
- // Use this fail point to hold the PBWM lock after we have written the oplog entries but
- // before we have applied them.
- if (MONGO_unlikely(pauseBatchApplicationAfterWritingOplogEntries.shouldFail())) {
- log() << "pauseBatchApplicationAfterWritingOplogEntries fail point enabled. Blocking "
- "until fail point is disabled.";
- pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx);
- }
-
- // Reset consistency markers in case the node fails while applying ops.
- if (!_options.skipWritesToOplog) {
- _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
- _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime());
- }
-
- {
- std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
-
- // Doles out all the work to the writer pool threads. writerVectors is not modified,
- // but multiSyncApply will modify the vectors that it contains.
- invariant(writerVectors.size() == statusVector.size());
- for (size_t i = 0; i < writerVectors.size(); i++) {
- if (writerVectors[i].empty())
- continue;
-
- _writerPool->schedule(
- [this,
- &writer = writerVectors.at(i),
- &status = statusVector.at(i),
- &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) {
- invariant(scheduleStatus);
-
- auto opCtx = cc().makeOperationContext();
-
- // This code path is only executed on secondaries and initial syncing nodes,
- // so it is safe to exclude any writes from Flow Control.
- opCtx->setShouldParticipateInFlowControl(false);
-
- status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
- return _applyFunc(opCtx.get(), &writer, this, &multikeyVector);
- });
- });
- }
-
- _writerPool->waitForIdle();
-
- // If any of the statuses is not ok, return error.
- for (auto it = statusVector.cbegin(); it != statusVector.cend(); ++it) {
- const auto& status = *it;
- if (!status.isOK()) {
- severe()
- << "Failed to apply batch of operations. Number of operations in batch: "
- << ops.size() << ". First operation: " << redact(ops.front().toBSON())
- << ". Last operation: " << redact(ops.back().toBSON())
- << ". Oplog application failed in writer thread "
- << std::distance(statusVector.cbegin(), it) << ": " << redact(status);
- return status;
- }
- }
- }
- }
-
- // Notify the storage engine that a replication batch has completed. This means that all the
- // writes associated with the oplog entries in the batch are finished and no new writes with
- // timestamps associated with those oplog entries will show up in the future.
- const auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
- storageEngine->replicationBatchIsComplete();
-
- // Use this fail point to hold the PBWM lock and prevent the batch from completing.
- if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
- log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail "
- "point is disabled.";
- while (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
- if (inShutdown()) {
- severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting "
- "clean shutdown";
- fassertFailedNoTrace(50798);
- }
- sleepmillis(100);
- }
- }
-
- Timestamp firstTimeInBatch = ops.front().getTimestamp();
- // Set any indexes to multikey that this batch ignored. This must be done while holding the
- // parallel batch writer mode lock.
- for (WorkerMultikeyPathInfo infoVector : multikeyVector) {
- for (MultikeyPathInfo info : infoVector) {
- // We timestamp every multikey write with the first timestamp in the batch. It is always
- // safe to set an index as multikey too early, just not too late. We conservatively pick
- // the first timestamp in the batch since we do not have enough information to find out
- // the timestamp of the first write that set the given multikey path.
- fassert(50686,
- _storageInterface->setIndexIsMultikey(
- opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch));
- }
- }
-
- // Increment the counter for the number of ops applied during catchup if the node is in catchup
- // mode.
- replCoord->incrementNumCatchUpOpsIfCatchingUp(ops.size());
-
- // We have now written all database writes and updated the oplog to match.
- return ops.back().getOpTime();
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 8fd2d9aef31..c8f5353b262 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -66,12 +66,6 @@ class OpTime;
*/
class SyncTail {
public:
- using MultiSyncApplyFunc =
- std::function<Status(OperationContext* opCtx,
- MultiApplier::OperationPtrs* ops,
- SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo)>;
-
/**
*
* Constructs a SyncTail.
@@ -82,10 +76,7 @@ public:
* of operations using 'func'. The writer thread pool is not owned by us.
*/
SyncTail(OplogApplier::Observer* observer,
- ReplicationConsistencyMarkers* consistencyMarkers,
StorageInterface* storageInterface,
- MultiSyncApplyFunc func,
- ThreadPool* writerPool,
const OplogApplier::Options& options);
virtual ~SyncTail();
@@ -94,35 +85,8 @@ public:
*/
const OplogApplier::Options& getOptions() const;
- /**
- * Shuts down oplogApplication() processing.
- */
- void shutdown();
-
- /**
- * Returns true if we are shutting down.
- */
- bool inShutdown() const;
-
-
using BatchLimits = OplogApplier::BatchLimits;
- /**
- * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then
- * using a set of threads to apply the operations. It will only apply (but will
- * still write to the oplog) oplog entries with a timestamp greater than or equal to the
- * beginApplyingTimestamp.
- *
- * If the batch application is successful, returns the optime of the last op applied, which
- * should be the last op in the batch.
- * Returns ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary.
- *
- * To provide crash resilience, this function will advance the persistent value of 'minValid'
- * to at least the last optime of the batch. If 'minValid' is already greater than or equal
- * to the last optime of this batch, it will not be updated.
- */
- StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops);
-
void fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
@@ -130,7 +94,6 @@ public:
private:
OplogApplier::Observer* const _observer;
- ReplicationConsistencyMarkers* const _consistencyMarkers;
StorageInterface* const _storageInterface;
void _deriveOpsAndFillWriterVectors(OperationContext* opCtx,
@@ -138,21 +101,12 @@ private:
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
SessionUpdateTracker* sessionUpdateTracker) noexcept;
- // Function to use during applyOps
- MultiSyncApplyFunc _applyFunc;
-
- // Pool of worker threads for writing ops to the databases.
- // Not owned by us.
- ThreadPool* const _writerPool;
// Used to configure multiApply() behavior.
const OplogApplier::Options _options;
// Protects member data of SyncTail.
mutable Mutex _mutex = MONGO_MAKE_LATCH("SyncTail::_mutex");
-
- // Set to true if shutdown() has been called.
- bool _inShutdown = false;
};
/**
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index c2a18035ea7..5276f4142cc 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -109,10 +109,7 @@ public:
SyncTailForTest::SyncTailForTest()
: SyncTail(nullptr, // observer
- nullptr, // consistency markers
nullptr, // storage interface
- SyncTail::MultiSyncApplyFunc(),
- nullptr, // writer pool
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)) {}
/**
@@ -300,950 +297,22 @@ TEST_F(SyncTailTest, SyncApplyCommand) {
ASSERT_TRUE(applyCmdCalled);
}
-DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") {
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- noopApplyOperationFn,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore();
-}
-
-bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
- ReplicationConsistencyMarkers* const consistencyMarkers,
- StorageInterface* const storageInterface,
- const NamespaceString& nss,
- const CollectionOptions& options) {
- auto writerPool = makeReplWriterPool();
- MultiApplier::Operations operationsApplied;
- auto applyOperationFn = [&operationsApplied](OperationContext* opCtx,
- MultiApplier::OperationPtrs* operationsToApply,
- SyncTail* st,
- WorkerMultikeyPathInfo*) -> Status {
- for (auto&& opPtr : *operationsToApply) {
- operationsApplied.push_back(*opPtr);
- }
- return Status::OK();
- };
- createCollection(opCtx, nss, options);
-
- auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1));
- ASSERT_FALSE(op.isForCappedCollection);
-
- SyncTail syncTail(nullptr,
- consistencyMarkers,
- storageInterface,
- applyOperationFn,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}));
- ASSERT_EQUALS(op.getOpTime(), lastOpTime);
-
- ASSERT_EQUALS(1U, operationsApplied.size());
- const auto& opApplied = operationsApplied.front();
- ASSERT_EQUALS(op, opApplied);
- // "isForCappedCollection" is not parsed from raw oplog entry document.
- return opApplied.isForCappedCollection;
-}
-
-TEST_F(
- SyncTailTest,
- MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) {
- NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- ASSERT_FALSE(_testOplogEntryIsForCappedCollection(
- _opCtx.get(), getConsistencyMarkers(), getStorageInterface(), nss, CollectionOptions()));
-}
-
-TEST_F(SyncTailTest,
- MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) {
- NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- ASSERT_TRUE(_testOplogEntryIsForCappedCollection(_opCtx.get(),
- getConsistencyMarkers(),
- getStorageInterface(),
- nss,
- createOplogCollectionOptions()));
-}
-
TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- SyncTail syncTail(nullptr,
- nullptr,
- nullptr,
- {},
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
// Collection should be created after syncApply() processes operation.
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
}
-class MultiOplogEntrySyncTailTest : public SyncTailTest {
-public:
- MultiOplogEntrySyncTailTest() : _nss1("test.preptxn1"), _nss2("test.preptxn2"), _txnNum(1) {}
-
-protected:
- void setUp() override {
- SyncTailTest::setUp();
- const NamespaceString cmdNss{"admin", "$cmd"};
-
- _uuid1 = createCollectionWithUuid(_opCtx.get(), _nss1);
- _uuid2 = createCollectionWithUuid(_opCtx.get(), _nss2);
- createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
-
- _lsid = makeLogicalSessionId(_opCtx.get());
-
- _insertOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 1), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
- << BSON("_id" << 1)))
- << "partialTxn" << true),
- _lsid,
- _txnNum,
- StmtId(0),
- OpTime());
- _insertOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 2), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o"
- << BSON("_id" << 2)))
- << "partialTxn" << true),
- _lsid,
- _txnNum,
- StmtId(1),
- _insertOp1->getOpTime());
- _commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 3), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o"
- << BSON("_id" << 3)))),
- _lsid,
- _txnNum,
- StmtId(2),
- _insertOp2->getOpTime());
- _opObserver->onInsertsFn =
- [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
- stdx::lock_guard<Latch> lock(_insertMutex);
- if (nss.isOplog() || nss == _nss1 || nss == _nss2 ||
- nss == NamespaceString::kSessionTransactionsTableNamespace) {
- _insertedDocs[nss].insert(_insertedDocs[nss].end(), docs.begin(), docs.end());
- } else
- FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front();
- };
-
- _writerPool = makeReplWriterPool();
- }
-
- void tearDown() override {
- SyncTailTest::tearDown();
- }
-
- void checkTxnTable(const LogicalSessionId& lsid,
- const TxnNumber& txnNum,
- const repl::OpTime& expectedOpTime,
- Date_t expectedWallClock,
- boost::optional<repl::OpTime> expectedStartOpTime,
- DurableTxnStateEnum expectedState) {
- repl::checkTxnTable(_opCtx.get(),
- lsid,
- txnNum,
- expectedOpTime,
- expectedWallClock,
- expectedStartOpTime,
- expectedState);
- }
-
- std::vector<BSONObj>& oplogDocs() {
- return _insertedDocs[NamespaceString::kRsOplogNamespace];
- }
-
-protected:
- NamespaceString _nss1;
- NamespaceString _nss2;
- boost::optional<UUID> _uuid1;
- boost::optional<UUID> _uuid2;
- LogicalSessionId _lsid;
- TxnNumber _txnNum;
- boost::optional<OplogEntry> _insertOp1, _insertOp2;
- boost::optional<OplogEntry> _commitOp;
- std::map<NamespaceString, std::vector<BSONObj>> _insertedDocs;
- std::unique_ptr<ThreadPool> _writerPool;
-
-private:
- Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntrySyncTailTest::_insertMutex");
-};
-
-TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- // Apply a batch with only the first operation. This should result in the first oplog entry
- // being put in the oplog and updating the transaction table, but not actually being applied
- // because they are part of a pending transaction.
- const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}));
- ASSERT_EQ(1U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _insertOp1->getOpTime(),
- _insertOp1->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kInProgress);
-
- // Apply a batch with only the second operation. This should result in the second oplog entry
- // being put in the oplog, but with no effect because the operation is part of a pending
- // transaction.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}));
- ASSERT_EQ(2U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- // The transaction table should not have been updated for partialTxn operations that are not the
- // first in a transaction.
- checkTxnTable(_lsid,
- _txnNum,
- _insertOp1->getOpTime(),
- _insertOp1->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kInProgress);
-
- // Apply a batch with only the commit. This should result in the commit being put in the
- // oplog, and the two previous entries being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}));
- ASSERT_EQ(3U, oplogDocs().size());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_EQ(2U, _insertedDocs[_nss2].size());
- ASSERT_BSONOBJ_EQ(oplogDocs().back(), _commitOp->getRaw());
- checkTxnTable(_lsid,
- _txnNum,
- _commitOp->getOpTime(),
- _commitOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
- // Skipping writes to oplog proves we're testing the code path which does not rely on reading
- // the oplog.
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kRecovering));
-
- // Apply both inserts and the commit in a single batch. We expect no oplog entries to
- // be inserted (because we've set skipWritesToOplog), and both entries to be committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}));
- ASSERT_EQ(0U, oplogDocs().size());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_EQ(2U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- _commitOp->getOpTime(),
- _commitOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
- // Tests an unprepared transaction with ops both in the batch with the commit and prior
- // batches.
- // Populate transaction with 4 linked inserts, one in nss2 and the others in nss1.
- std::vector<OplogEntry> insertOps;
- std::vector<BSONObj> insertDocs;
-
- const NamespaceString cmdNss{"admin", "$cmd"};
- for (int i = 0; i < 4; i++) {
- insertDocs.push_back(BSON("_id" << i));
- insertOps.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), i + 1), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << (i == 1 ? _nss2.ns() : _nss1.ns()) << "ui"
- << (i == 1 ? *_uuid2 : *_uuid1) << "o"
- << insertDocs.back()))
- << "partialTxn" << true),
- _lsid,
- _txnNum,
- StmtId(i),
- i == 0 ? OpTime() : insertOps.back().getOpTime()));
- }
- auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 5), 1LL},
- cmdNss,
- BSON("applyOps" << BSONArray()),
- _lsid,
- _txnNum,
- StmtId(4),
- insertOps.back().getOpTime());
-
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- // Insert the first entry in its own batch. This should result in the oplog entry being written
- // but the entry should not be applied as it is part of a pending transaction.
- const auto expectedStartOpTime = insertOps[0].getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}));
- ASSERT_EQ(1U, oplogDocs().size());
- ASSERT_EQ(0U, _insertedDocs[_nss1].size());
- ASSERT_EQ(0U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- insertOps[0].getOpTime(),
- insertOps[0].getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kInProgress);
-
- // Insert the rest of the entries, including the commit. These entries should be added to the
- // oplog, and all the entries including the first should be applied.
- ASSERT_OK(
- syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}));
- ASSERT_EQ(5U, oplogDocs().size());
- ASSERT_EQ(3U, _insertedDocs[_nss1].size());
- ASSERT_EQ(1U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- commitOp.getOpTime(),
- commitOp.getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-
- // Check docs and ordering of docs in nss1.
- // The insert into nss2 is unordered with respect to those.
- ASSERT_BSONOBJ_EQ(insertDocs[0], _insertedDocs[_nss1][0]);
- ASSERT_BSONOBJ_EQ(insertDocs[1], _insertedDocs[_nss2].front());
- ASSERT_BSONOBJ_EQ(insertDocs[2], _insertedDocs[_nss1][1]);
- ASSERT_BSONOBJ_EQ(insertDocs[3], _insertedDocs[_nss1][2]);
-}
-
-TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) {
- // Tests that two transactions on the same session ID in the same batch both
- // apply correctly.
- TxnNumber txnNum1(1);
- TxnNumber txnNum2(2);
-
-
- std::vector<OplogEntry> insertOps1, insertOps2;
- const NamespaceString cmdNss{"admin", "$cmd"};
- insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 1), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
- << BSON("_id" << 1)))
- << "partialTxn" << true),
- _lsid,
- txnNum1,
- StmtId(0),
- OpTime()));
- insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 2), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
- << BSON("_id" << 2)))
- << "partialTxn" << true),
-
- _lsid,
- txnNum1,
- StmtId(1),
- insertOps1.back().getOpTime()));
- insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(2), 1), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
- << BSON("_id" << 3)))
- << "partialTxn" << true),
- _lsid,
- txnNum2,
- StmtId(0),
- OpTime()));
- insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(2), 2), 1LL},
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
- << BSON("_id" << 4)))
- << "partialTxn" << true),
- _lsid,
- txnNum2,
- StmtId(1),
- insertOps2.back().getOpTime()));
- auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 3), 1LL},
- _nss1,
- BSON("applyOps" << BSONArray()),
- _lsid,
- txnNum1,
- StmtId(2),
- insertOps1.back().getOpTime());
- auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 3), 1LL},
- _nss1,
- BSON("applyOps" << BSONArray()),
- _lsid,
- txnNum2,
- StmtId(2),
- insertOps2.back().getOpTime());
-
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- // Note the insert counter so we can check it later. It is necessary to use opCounters as
- // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in
- // the opObserver.
- int insertsBefore = replOpCounters.getInsert()->load();
- // Insert all the oplog entries in one batch. All inserts should be executed, in order, exactly
- // once.
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(),
- {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}));
- ASSERT_EQ(6U, oplogDocs().size());
- ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore);
- ASSERT_EQ(4U, _insertedDocs[_nss1].size());
- checkTxnTable(_lsid,
- txnNum2,
- commitOp2.getOpTime(),
- commitOp2.getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-
- // Check docs and ordering of docs in nss1.
- ASSERT_BSONOBJ_EQ(BSON("_id" << 1), _insertedDocs[_nss1][0]);
- ASSERT_BSONOBJ_EQ(BSON("_id" << 2), _insertedDocs[_nss1][1]);
- ASSERT_BSONOBJ_EQ(BSON("_id" << 3), _insertedDocs[_nss1][2]);
- ASSERT_BSONOBJ_EQ(BSON("_id" << 4), _insertedDocs[_nss1][3]);
-}
-
-
-class MultiOplogEntryPreparedTransactionTest : public MultiOplogEntrySyncTailTest {
-protected:
- void setUp() override {
- MultiOplogEntrySyncTailTest::setUp();
-
- _prepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 3), 1LL},
- _nss1,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o"
- << BSON("_id" << 3)))
- << "prepare" << true),
- _lsid,
- _txnNum,
- StmtId(2),
- _insertOp2->getOpTime());
- _singlePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 3), 1LL},
- _nss1,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o"
- << BSON("_id" << 0)))
- << "prepare" << true),
- _lsid,
- _txnNum,
- StmtId(0),
- OpTime());
- _commitPrepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 4), 1LL},
- _nss1,
- BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)),
- _lsid,
- _txnNum,
- StmtId(3),
- _prepareWithPrevOp->getOpTime());
- _commitSinglePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 4), 1LL},
- _nss1,
- BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)),
- _lsid,
- _txnNum,
- StmtId(1),
- _prepareWithPrevOp->getOpTime());
- _abortPrepareWithPrevOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL},
- _nss1,
- BSON("abortTransaction" << 1),
- _lsid,
- _txnNum,
- StmtId(3),
- _prepareWithPrevOp->getOpTime());
- _abortSinglePrepareApplyOp = _abortPrepareWithPrevOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL},
- _nss1,
- BSON("abortTransaction" << 1),
- _lsid,
- _txnNum,
- StmtId(1),
- _singlePrepareApplyOp->getOpTime());
- }
-
-protected:
- boost::optional<OplogEntry> _commitPrepareWithPrevOp, _abortPrepareWithPrevOp,
- _singlePrepareApplyOp, _prepareWithPrevOp, _commitSinglePrepareApplyOp,
- _abortSinglePrepareApplyOp;
-
-private:
- Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntryPreparedTransactionTest::_insertMutex");
-};
-
-TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionSteadyState) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- // Apply a batch with the insert operations. This should result in the oplog entries
- // being put in the oplog and updating the transaction table, but not actually being applied
- // because they are part of a pending transaction.
- const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
- ASSERT_EQ(2U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
- ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _insertOp1->getOpTime(),
- _insertOp1->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kInProgress);
-
- // Apply a batch with only the prepare. This should result in the prepare being put in the
- // oplog, and the two previous entries being applied (but in a transaction) along with the
- // nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
- ASSERT_EQ(3U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_EQ(2U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- _prepareWithPrevOp->getOpTime(),
- _prepareWithPrevOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the commit. This should result in the commit being put in the
- // oplog, and the three previous entries being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
- ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_EQ(2U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- _commitPrepareWithPrevOp->getOpTime(),
- _commitPrepareWithPrevOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- // Apply a batch with the insert operations. This should result in the oplog entries
- // being put in the oplog and updating the transaction table, but not actually being applied
- // because they are part of a pending transaction.
- const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
- checkTxnTable(_lsid,
- _txnNum,
- _insertOp1->getOpTime(),
- _insertOp1->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kInProgress);
-
- // Apply a batch with only the prepare. This should result in the prepare being put in the
- // oplog, and the two previous entries being applied (but in a transaction) along with the
- // nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
- checkTxnTable(_lsid,
- _txnNum,
- _prepareWithPrevOp->getOpTime(),
- _prepareWithPrevOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the abort. This should result in the abort being put in the
- // oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}));
- ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_EQ(2U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- _abortPrepareWithPrevOp->getOpTime(),
- _abortPrepareWithPrevOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kAborted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInitialSync) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync));
-
- // Apply a batch with the insert operations. This should result in the oplog entries
- // being put in the oplog and updating the transaction table, but not actually being applied
- // because they are part of a pending transaction.
- const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
- ASSERT_EQ(2U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
- ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _insertOp1->getOpTime(),
- _insertOp1->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kInProgress);
-
- // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
- // the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
- ASSERT_EQ(3U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _prepareWithPrevOp->getOpTime(),
- _prepareWithPrevOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the commit. This should result in the commit being put in the
- // oplog, and the three previous entries being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
- ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_EQ(2U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- _commitPrepareWithPrevOp->getOpTime(),
- _commitPrepareWithPrevOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionRecovery) {
- // For recovery, the oplog must contain the operations before starting.
- for (auto&& entry :
- {*_insertOp1, *_insertOp2, *_prepareWithPrevOp, *_commitPrepareWithPrevOp}) {
- ASSERT_OK(getStorageInterface()->insertDocument(
- _opCtx.get(),
- NamespaceString::kRsOplogNamespace,
- {entry.toBSON(), entry.getOpTime().getTimestamp()},
- entry.getOpTime().getTerm()));
- }
- // Ignore docs inserted into oplog in setup.
- oplogDocs().clear();
-
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kRecovering));
-
- // Apply a batch with the insert operations. This should have no effect, because this is
- // recovery.
- const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
- ASSERT_TRUE(oplogDocs().empty());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _insertOp1->getOpTime(),
- _insertOp1->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kInProgress);
-
- // Apply a batch with only the prepare applyOps. This should have no effect, since this is
- // recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
- ASSERT_TRUE(oplogDocs().empty());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _prepareWithPrevOp->getOpTime(),
- _prepareWithPrevOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the commit. This should result in the the three previous entries
- // being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
- ASSERT_TRUE(oplogDocs().empty());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_EQ(2U, _insertedDocs[_nss2].size());
- checkTxnTable(_lsid,
- _txnNum,
- _commitPrepareWithPrevOp->getOpTime(),
- _commitPrepareWithPrevOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedTransaction) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
-
- // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
- // the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
- ASSERT_EQ(1U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- checkTxnTable(_lsid,
- _txnNum,
- _singlePrepareApplyOp->getOpTime(),
- _singlePrepareApplyOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the commit. This should result in the commit being put in the
- // oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
- ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _commitSinglePrepareApplyOp->getOpTime(),
- _commitSinglePrepareApplyOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTransaction) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 3), 1LL},
- _nss1,
- BSON("applyOps" << BSONArray() << "prepare" << true),
- _lsid,
- _txnNum,
- StmtId(0),
- OpTime());
- const auto expectedStartOpTime = emptyPrepareApplyOp.getOpTime();
-
- // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
- // the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}));
- ASSERT_EQ(1U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- checkTxnTable(_lsid,
- _txnNum,
- emptyPrepareApplyOp.getOpTime(),
- emptyPrepareApplyOp.getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the commit. This should result in the commit being put in the
- // oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
- ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _commitSinglePrepareApplyOp->getOpTime(),
- _commitSinglePrepareApplyOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPreparedTransaction) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
- // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
- // the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
- checkTxnTable(_lsid,
- _txnNum,
- _singlePrepareApplyOp->getOpTime(),
- _singlePrepareApplyOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the abort. This should result in the abort being put in the
- // oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}));
- ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _abortSinglePrepareApplyOp->getOpTime(),
- _abortSinglePrepareApplyOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kAborted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest,
- MultiApplySingleApplyOpsPreparedTransactionInitialSync) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync));
-
- const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
-
- // Apply a batch with only the prepare applyOps. This should result in the prepare being put in
- // the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
- ASSERT_EQ(1U, oplogDocs().size());
- ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _singlePrepareApplyOp->getOpTime(),
- _singlePrepareApplyOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the commit. This should result in the commit being put in the
- // oplog, and the previous entry being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
- ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _commitSinglePrepareApplyOp->getOpTime(),
- _commitSinglePrepareApplyOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(MultiOplogEntryPreparedTransactionTest,
- MultiApplySingleApplyOpsPreparedTransactionRecovery) {
- // For recovery, the oplog must contain the operations before starting.
- for (auto&& entry : {*_singlePrepareApplyOp, *_commitPrepareWithPrevOp}) {
- ASSERT_OK(getStorageInterface()->insertDocument(
- _opCtx.get(),
- NamespaceString::kRsOplogNamespace,
- {entry.toBSON(), entry.getOpTime().getTimestamp()},
- entry.getOpTime().getTerm()));
- }
- // Ignore docs inserted into oplog in setup.
- oplogDocs().clear();
-
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- _writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kRecovering));
-
- const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
-
- // Apply a batch with only the prepare applyOps. This should have no effect, since this is
- // recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
- ASSERT_TRUE(oplogDocs().empty());
- ASSERT_TRUE(_insertedDocs[_nss1].empty());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _singlePrepareApplyOp->getOpTime(),
- _singlePrepareApplyOp->getWallClockTime(),
- expectedStartOpTime,
- DurableTxnStateEnum::kPrepared);
-
- // Apply a batch with only the commit. This should result in the previous entry being
- // applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
- ASSERT_TRUE(oplogDocs().empty());
- ASSERT_EQ(1U, _insertedDocs[_nss1].size());
- ASSERT_TRUE(_insertedDocs[_nss2].empty());
- checkTxnTable(_lsid,
- _txnNum,
- _commitSinglePrepareApplyOp->getOpTime(),
- _commitSinglePrepareApplyOp->getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
void testWorkerMultikeyPaths(OperationContext* opCtx,
const OplogEntry& op,
unsigned long numPaths) {
- SyncTail syncTail(nullptr,
- nullptr,
- nullptr,
- {},
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&op};
ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo));
@@ -1299,12 +368,8 @@ TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) {
auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA);
auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7));
auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB);
- SyncTail syncTail(nullptr,
- nullptr,
- nullptr,
- {},
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ SyncTail syncTail(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&opA, &opB};
ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
@@ -1347,12 +412,7 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) {
NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName());
auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
- SyncTail syncTail(nullptr,
- nullptr,
- nullptr,
- {},
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
MultiApplier::OperationPtrs ops = {&op};
ASSERT_EQUALS(ErrorCodes::InvalidOptions,
multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr));
@@ -2252,574 +1312,6 @@ TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) {
ASSERT_EQUALS(0, countLogLinesContaining(expected.str()));
}
-class SyncTailTxnTableTest : public SyncTailTest {
-public:
- void setUp() override {
- SyncTailTest::setUp();
-
- MongoDSessionCatalog::onStepUp(_opCtx.get());
-
- DBDirectClient client(_opCtx.get());
- BSONObj result;
- ASSERT(client.runCommand(kNs.db().toString(), BSON("create" << kNs.coll()), result));
- }
-
- /**
- * Creates an OplogEntry with given parameters and preset defaults for this test suite.
- */
- repl::OplogEntry makeOplogEntry(const NamespaceString& ns,
- repl::OpTime opTime,
- repl::OpTypeEnum opType,
- BSONObj object,
- boost::optional<BSONObj> object2,
- const OperationSessionInfo& sessionInfo,
- Date_t wallClockTime) {
- return repl::OplogEntry(opTime, // optime
- boost::none, // hash
- opType, // opType
- ns, // namespace
- boost::none, // uuid
- boost::none, // fromMigrate
- 0, // version
- object, // o
- object2, // o2
- sessionInfo, // sessionInfo
- boost::none, // false
- wallClockTime, // wall clock time
- boost::none, // statement id
- boost::none, // optime of previous write within same transaction
- boost::none, // pre-image optime
- boost::none); // post-image optime
- }
-
- /**
- * Creates an OplogEntry with given parameters and preset defaults for this test suite.
- */
- repl::OplogEntry makeOplogEntryForMigrate(const NamespaceString& ns,
- repl::OpTime opTime,
- repl::OpTypeEnum opType,
- BSONObj object,
- boost::optional<BSONObj> object2,
- const OperationSessionInfo& sessionInfo,
- Date_t wallClockTime) {
- return repl::OplogEntry(opTime, // optime
- boost::none, // hash
- opType, // opType
- ns, // namespace
- boost::none, // uuid
- true, // fromMigrate
- 0, // version
- object, // o
- object2, // o2
- sessionInfo, // sessionInfo
- boost::none, // false
- wallClockTime, // wall clock time
- boost::none, // statement id
- boost::none, // optime of previous write within same transaction
- boost::none, // pre-image optime
- boost::none); // post-image optime
- }
-
- void checkTxnTable(const OperationSessionInfo& sessionInfo,
- const repl::OpTime& expectedOpTime,
- Date_t expectedWallClock) {
- invariant(sessionInfo.getSessionId());
- invariant(sessionInfo.getTxnNumber());
-
- repl::checkTxnTable(_opCtx.get(),
- *sessionInfo.getSessionId(),
- *sessionInfo.getTxnNumber(),
- expectedOpTime,
- expectedWallClock,
- {},
- {});
- }
-
- static const NamespaceString& nss() {
- return kNs;
- }
-
-private:
- static const NamespaceString kNs;
-};
-
-const NamespaceString SyncTailTxnTableTest::kNs("test.foo");
-
-TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) {
- const auto sessionId = makeLogicalSessionIdForTest();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(3);
- const auto date = Date_t::now();
-
- auto insertOp = makeOplogEntry(nss(),
- {Timestamp(1, 0), 1},
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 1),
- boost::none,
- sessionInfo,
- date);
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}));
-
- checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date);
-}
-
-TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
- const auto sessionId = makeLogicalSessionIdForTest();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(3);
- const auto date = Date_t::now();
-
- auto insertOp = makeOplogEntry(nss(),
- {Timestamp(1, 0), 1},
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 1),
- boost::none,
- sessionInfo,
- date);
-
- auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
- {Timestamp(2, 0), 1},
- repl::OpTypeEnum::kDelete,
- BSON("_id" << sessionInfo.getSessionId()->toBSON()),
- boost::none,
- {},
- Date_t::now());
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}));
-
- ASSERT_FALSE(docExists(
- _opCtx.get(),
- NamespaceString::kSessionTransactionsTableNamespace,
- BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())));
-}
-
-TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTable) {
- const auto sessionId = makeLogicalSessionIdForTest();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(3);
- auto date = Date_t::now();
-
- auto insertOp = makeOplogEntry(nss(),
- {Timestamp(1, 0), 1},
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 1),
- boost::none,
- sessionInfo,
- date);
-
- auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
- {Timestamp(2, 0), 1},
- repl::OpTypeEnum::kDelete,
- BSON("_id" << sessionInfo.getSessionId()->toBSON()),
- boost::none,
- {},
- Date_t::now());
-
- date = Date_t::now();
- sessionInfo.setTxnNumber(7);
- auto insertOp2 = makeOplogEntry(nss(),
- {Timestamp(3, 0), 2},
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 6),
- boost::none,
- sessionInfo,
- date);
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}));
-
- checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date);
-}
-
-TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTable) {
- const auto sessionId = makeLogicalSessionIdForTest();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(3);
- auto date = Date_t::now();
-
- auto insertOp = makeOplogEntry(nss(),
- {Timestamp(1, 0), 1},
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 1),
- boost::none,
- sessionInfo,
- date);
-
- repl::OpTime newWriteOpTime(Timestamp(2, 0), 1);
- auto updateOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace,
- {Timestamp(4, 0), 1},
- repl::OpTypeEnum::kUpdate,
- BSON("$set" << BSON("lastWriteOpTime" << newWriteOpTime)),
- BSON("_id" << sessionInfo.getSessionId()->toBSON()),
- {},
- Date_t::now());
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}));
-
- checkTxnTable(sessionInfo, newWriteOpTime, date);
-}
-
-TEST_F(SyncTailTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSession) {
- const NamespaceString cmdNss{"admin", "$cmd"};
- const auto sessionId = makeLogicalSessionIdForTest();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(3);
- auto date = Date_t::now();
- auto uuid = [&] {
- return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid();
- }();
-
- repl::OpTime retryableInsertOpTime(Timestamp(1, 0), 1);
-
- auto retryableInsertOp = makeOplogEntry(nss(),
- retryableInsertOpTime,
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 1),
- boost::none,
- sessionInfo,
- date);
-
- repl::OpTime txnInsertOpTime(Timestamp(2, 0), 1);
- sessionInfo.setTxnNumber(4);
-
- auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- txnInsertOpTime,
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << nss().ns() << "ui" << uuid << "o"
- << BSON("_id" << 2)))
- << "partialTxn" << true),
- sessionId,
- *sessionInfo.getTxnNumber(),
- StmtId(0),
- OpTime());
-
- repl::OpTime txnCommitOpTime(Timestamp(3, 0), 1);
- auto txnCommitOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime,
- cmdNss,
- BSON("applyOps" << BSONArray()),
- sessionId,
- *sessionInfo.getTxnNumber(),
- StmtId(1),
- txnInsertOpTime);
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}));
-
- repl::checkTxnTable(_opCtx.get(),
- *sessionInfo.getSessionId(),
- *sessionInfo.getTxnNumber(),
- txnCommitOpTime,
- txnCommitOp.getWallClockTime(),
- boost::none,
- DurableTxnStateEnum::kCommitted);
-}
-
-TEST_F(SyncTailTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSession) {
- const NamespaceString cmdNss{"admin", "$cmd"};
- const auto sessionId = makeLogicalSessionIdForTest();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(3);
- auto date = Date_t::now();
- auto uuid = [&] {
- return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid();
- }();
-
- repl::OpTime txnInsertOpTime(Timestamp(1, 0), 1);
- auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
- txnInsertOpTime,
- cmdNss,
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << nss().ns() << "ui" << uuid << "o"
- << BSON("_id" << 2)))
- << "partialTxn" << true),
- sessionId,
- *sessionInfo.getTxnNumber(),
- StmtId(0),
- OpTime());
-
- repl::OpTime txnCommitOpTime(Timestamp(2, 0), 1);
- auto txnCommitOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime,
- cmdNss,
- BSON("applyOps" << BSONArray()),
- sessionId,
- *sessionInfo.getTxnNumber(),
- StmtId(1),
- txnInsertOpTime);
-
- repl::OpTime retryableInsertOpTime(Timestamp(3, 0), 1);
- sessionInfo.setTxnNumber(4);
-
- auto retryableInsertOp = makeOplogEntry(nss(),
- retryableInsertOpTime,
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 1),
- boost::none,
- sessionInfo,
- date);
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
-
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}));
-
- repl::checkTxnTable(_opCtx.get(),
- *sessionInfo.getSessionId(),
- *sessionInfo.getTxnNumber(),
- retryableInsertOpTime,
- retryableInsertOp.getWallClockTime(),
- boost::none,
- boost::none);
-}
-
-
-TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
- NamespaceString ns0("test.0");
- NamespaceString ns1("test.1");
- NamespaceString ns2("test.2");
- NamespaceString ns3("test.3");
-
- DBDirectClient client(_opCtx.get());
- BSONObj result;
- ASSERT(client.runCommand(ns0.db().toString(), BSON("create" << ns0.coll()), result));
- ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result));
- ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result));
- ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result));
- auto uuid0 = [&] {
- return AutoGetCollectionForRead(_opCtx.get(), ns0).getCollection()->uuid();
- }();
- auto uuid1 = [&] {
- return AutoGetCollectionForRead(_opCtx.get(), ns1).getCollection()->uuid();
- }();
- auto uuid2 = [&] {
- return AutoGetCollectionForRead(_opCtx.get(), ns2).getCollection()->uuid();
- }();
-
- // Entries with a session id and a txnNumber update the transaction table.
- auto lsidSingle = makeLogicalSessionIdForTest();
- auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0);
-
- // For entries with the same session, the entry with a larger txnNumber is saved.
- auto lsidDiffTxn = makeLogicalSessionIdForTest();
- auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1);
- auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1);
-
- // For entries with the same session and txnNumber, the later optime is saved.
- auto lsidSameTxn = makeLogicalSessionIdForTest();
- auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0);
- auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1);
-
- // Entries with a session id but no txnNumber do not lead to updates.
- auto lsidNoTxn = makeLogicalSessionIdForTest();
- OperationSessionInfo info;
- info.setSessionId(lsidNoTxn);
- auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo(
- {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info);
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(),
- {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}));
-
- // The txnNum and optime of the only write were saved.
- auto resultSingleDoc =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON()));
- ASSERT_TRUE(!resultSingleDoc.isEmpty());
-
- auto resultSingle =
- SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc);
-
- ASSERT_EQ(resultSingle.getTxnNum(), 5LL);
- ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1));
-
- // The txnNum and optime of the write with the larger txnNum were saved.
- auto resultDiffTxnDoc =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON()));
- ASSERT_TRUE(!resultDiffTxnDoc.isEmpty());
-
- auto resultDiffTxn =
- SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc);
-
- ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL);
- ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1));
-
- // The txnNum and optime of the write with the later optime were saved.
- auto resultSameTxnDoc =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON()));
- ASSERT_TRUE(!resultSameTxnDoc.isEmpty());
-
- auto resultSameTxn =
- SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc);
-
- ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL);
- ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1));
-
- // There is no entry for the write with no txnNumber.
- auto resultNoTxn =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON()));
- ASSERT_TRUE(resultNoTxn.isEmpty());
-}
-
-TEST_F(SyncTailTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) {
- const auto insertLsid = makeLogicalSessionIdForTest();
- OperationSessionInfo insertSessionInfo;
- insertSessionInfo.setSessionId(insertLsid);
- insertSessionInfo.setTxnNumber(3);
- auto date = Date_t::now();
-
- auto innerOplog = makeOplogEntry(nss(),
- {Timestamp(10, 10), 1},
- repl::OpTypeEnum::kInsert,
- BSON("_id" << 1),
- boost::none,
- insertSessionInfo,
- date);
-
- auto outerInsertDate = Date_t::now();
- auto insertOplog = makeOplogEntryForMigrate(nss(),
- {Timestamp(40, 0), 1},
- repl::OpTypeEnum::kNoop,
- BSON("$sessionMigrateInfo" << 1),
- innerOplog.toBSON(),
- insertSessionInfo,
- outerInsertDate);
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}));
-
- checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate);
-}
-
-TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) {
- const auto preImageLsid = makeLogicalSessionIdForTest();
- OperationSessionInfo preImageSessionInfo;
- preImageSessionInfo.setSessionId(preImageLsid);
- preImageSessionInfo.setTxnNumber(3);
- auto preImageDate = Date_t::now();
-
- auto preImageOplog = makeOplogEntryForMigrate(nss(),
- {Timestamp(30, 0), 1},
- repl::OpTypeEnum::kNoop,
- BSON("_id" << 1),
- boost::none,
- preImageSessionInfo,
- preImageDate);
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}));
-
- ASSERT_FALSE(docExists(_opCtx.get(),
- NamespaceString::kSessionTransactionsTableNamespace,
- BSON(SessionTxnRecord::kSessionIdFieldName
- << preImageSessionInfo.getSessionId()->toBSON())));
-}
-
-TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) {
- const auto lsid = makeLogicalSessionIdForTest();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(lsid);
- sessionInfo.setTxnNumber(3);
-
- auto oplog = makeOplogEntry(nss(),
- {Timestamp(30, 0), 1},
- repl::OpTypeEnum::kNoop,
- BSON("_id" << 1),
- boost::none,
- sessionInfo,
- Date_t::now());
-
- auto writerPool = makeReplWriterPool();
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- multiSyncApply,
- writerPool.get(),
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}));
-
- ASSERT_FALSE(docExists(
- _opCtx.get(),
- NamespaceString::kSessionTransactionsTableNamespace,
- BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())));
-}
-
TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) {
// Create a BSON "emptycapped" command.
auto emptyCappedCmd = BSON("emptycapped" << nss.coll());
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 4bca63586f3..bd5fe5c9d13 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -204,12 +204,8 @@ Status SyncTailTest::runOpSteadyState(const OplogEntry& op) {
}
Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) {
- SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
- getStorageInterface(),
- SyncTail::MultiSyncApplyFunc(),
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ SyncTail syncTail(
+ nullptr, getStorageInterface(), OplogApplier::Options(OplogApplication::Mode::kSecondary));
MultiApplier::OperationPtrs opsPtrs;
for (auto& op : ops) {
opsPtrs.push_back(&op);
@@ -224,10 +220,7 @@ Status SyncTailTest::runOpInitialSync(const OplogEntry& op) {
Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
getStorageInterface(),
- SyncTail::MultiSyncApplyFunc(),
- nullptr,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync));
// Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD
// operations provided by idempotency tests.
@@ -245,10 +238,7 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
Status SyncTailTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) {
SyncTail syncTail(nullptr,
- getConsistencyMarkers(),
getStorageInterface(),
- SyncTail::MultiSyncApplyFunc(),
- nullptr,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync));
// Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD
// operations provided by idempotency tests.
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 597b84fde43..45b72b54edc 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1308,9 +1308,10 @@ public:
nullptr, // task executor. not required for multiApply().
nullptr, // oplog buffer. not required for multiApply().
&observer,
- nullptr, // replication coordinator. not required for multiApply().
+ _coordinatorMock,
_consistencyMarkers,
storageInterface,
+ repl::multiSyncApply,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)));
@@ -1392,9 +1393,10 @@ public:
nullptr, // task executor. not required for multiApply().
nullptr, // oplog buffer. not required for multiApply().
&observer,
- nullptr, // replication coordinator. not required for multiApply().
+ _coordinatorMock,
_consistencyMarkers,
storageInterface,
+ repl::multiSyncApply,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
writerPool.get());
auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops));
@@ -2440,17 +2442,21 @@ public:
<< "ns" << ns.ns() << "ui" << uuid << "wall"
<< Date_t() << "o" << doc0));
+ DoNothingOplogApplierObserver observer;
// Apply the operation.
auto storageInterface = repl::StorageInterface::get(_opCtx);
auto writerPool = repl::makeReplWriterPool(1);
- repl::SyncTail syncTail(
- nullptr,
+ repl::OplogApplierImpl oplogApplier(
+ nullptr, // task executor. not required for multiApply().
+ nullptr, // oplog buffer. not required for multiApply().
+ &observer,
+ _coordinatorMock,
_consistencyMarkers,
storageInterface,
applyOperationFn,
- writerPool.get(),
- repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary));
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}));
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
+ writerPool.get());
+ auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, {insertOp}));
ASSERT_EQ(insertOp.getOpTime(), lastOpTime);
joinGuard.dismiss();