diff options
author | Mihai Andrei <mihai.andrei@mongodb.com> | 2019-09-30 17:18:10 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-30 17:18:10 +0000 |
commit | 9237e4d66a592d30385f9496b8fda1590e9ff9ca (patch) | |
tree | 4dad80baba13ecf504d104f875e1281d50d0a6fe | |
parent | 194361c6eafdbda1ccd272b6a1e1a887817f476a (diff) | |
download | mongo-9237e4d66a592d30385f9496b8fda1590e9ff9ca.tar.gz |
SERVER-43344 Move shutdown, multiApply, and scheduleWritesToOplog from SyncTail to OplogApplierImpl
21 files changed, 2352 insertions, 1867 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index cd06cba18bc..5da88d7a47b 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -500,6 +500,23 @@ env.Library( ], ) + +env.Library( + target='oplog_applier_impl_test_fixture', + source=[ + 'oplog_applier_impl_test_fixture.cpp', + ], + LIBDEPS=[ + 'drop_pending_collection_reaper', + 'oplog_application', + 'replmocks', + 'storage_interface_impl', + '$BUILD_DIR/mongo/db/catalog/document_validation', + '$BUILD_DIR/mongo/db/s/op_observer_sharding_impl', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + ], +) + env.Library( target='oplog_application_interface', source=[ @@ -1231,6 +1248,7 @@ env.CppUnitTest( 'isself_test.cpp', 'member_config_test.cpp', 'multiapplier_test.cpp', + 'oplog_applier_impl_test.cpp', 'oplog_applier_test.cpp', 'oplog_buffer_collection_test.cpp', 'oplog_buffer_proxy_test.cpp', @@ -1304,6 +1322,7 @@ env.CppUnitTest( 'multiapplier', 'oplog', 'oplog_application_interface', + 'oplog_applier_impl_test_fixture', 'oplog_buffer_collection', 'oplog_buffer_proxy', 'oplog_entry', diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 33ea1ea0d16..e7cb221c92b 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -143,6 +143,7 @@ std::unique_ptr<OplogApplier> DataReplicatorExternalStateImpl::makeOplogApplier( _replicationCoordinator, consistencyMarkers, storageInterface, + multiSyncApply, options, writerPool); } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index ae615085319..3e38368a026 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -58,7 +58,6 @@ public: private: void _run(OplogBuffer* oplogBuffer) final {} - void _shutdown() final {} StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final { return _externalState->multiApplyFn(opCtx, ops, _observer); } diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 6f67f840635..adae4b62607 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -382,10 +382,7 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence } SyncTail syncTail(nullptr, // observer - nullptr, // consistency markers nullptr, // storage interface - SyncTail::MultiSyncApplyFunc(), - nullptr, // writer pool repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); std::vector<MultiApplier::OperationPtrs> writerVectors(1); std::vector<MultiApplier::Operations> derivedOps; diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 5242a9c917b..0bb82669ddc 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -73,8 +73,6 @@ Future<void> OplogApplier::startup() { } void OplogApplier::shutdown() { - _shutdown(); - stdx::lock_guard<Latch> lock(_mutex); _inShutdown = true; } diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index f6049c71943..63be125b2bd 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -196,13 +196,6 @@ private: virtual void _run(OplogBuffer* oplogBuffer) = 0; /** - * Called from shutdown to signals oplog application loop to stop running. - * Currently applicable to steady state replication only. - * Implemented in subclasses but not visible otherwise. - */ - virtual void _shutdown() = 0; - - /** * Called from multiApply() to apply a batch of operations in parallel. * Implemented in subclasses but not visible otherwise. */ diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 1c094f6d077..f0317f59b79 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -29,7 +29,9 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication +#include "mongo/db/stats/timer_stats.h" #include "mongo/platform/basic.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/db/repl/oplog_applier_impl.h" @@ -37,6 +39,16 @@ namespace mongo { namespace repl { +MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion); +MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationAfterWritingOplogEntries); + +// Tracks the oplog application batch size. +Counter64 oplogApplicationBatchSize; +ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.batchSize", + &oplogApplicationBatchSize); +// Number and time of each ApplyOps worker pool round +TimerStats applyBatchStats; +ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); namespace { @@ -151,14 +163,16 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, ReplicationCoordinator* replCoord, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, + MultiSyncApplyFunc func, const OplogApplier::Options& options, ThreadPool* writerPool) : OplogApplier(executor, oplogBuffer, observer, options), _replCoord(replCoord), + _writerPool(writerPool), _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers), - _syncTail( - observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options), + _applyFunc(func), + _syncTail(observer, storageInterface, options), _beginApplyingOpTime(options.beginApplyingOpTime) {} void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { @@ -168,17 +182,13 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { _runLoop(oplogBuffer, getNextApplierBatchFn); } -void OplogApplierImpl::_shutdown() { - _syncTail.shutdown(); -} - void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer, OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) { // We don't start data replication for arbiters at all and it's not allowed to reconfig // arbiterOnly field for any member. invariant(!_replCoord->getMemberState().arbiter()); - OpQueueBatcher batcher(&_syncTail, _storageInterface, oplogBuffer, getNextApplierBatchFn); + OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn); std::unique_ptr<ApplyBatchFinalizer> finalizer{ getGlobalServiceContext()->getStorageEngine()->isDurable() @@ -203,7 +213,7 @@ void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer, while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { // Tests should not trigger clean shutdown while that failpoint is active. If we // think we need this, we need to think hard about what the behavior should be. - if (_syncTail.inShutdown()) { + if (inShutdown()) { severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; fassertFailedNoTrace(40304); } @@ -256,10 +266,10 @@ void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer, // Don't allow the fsync+lock thread to see intermediate states of batch application. stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); - // Apply the operations in this batch. 'multiApply' returns the optime of the last op that + // Apply the operations in this batch. '_multiApply' returns the optime of the last op that // was applied, which should be the last optime in the batch. auto lastOpTimeAppliedInBatch = - fassertNoTrace(34437, _syncTail.multiApply(&opCtx, ops.releaseBatch())); + fassertNoTrace(34437, _multiApply(&opCtx, ops.releaseBatch())); invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); // Update various things that care about our last applied optime. Tests rely on 1 happening @@ -294,8 +304,230 @@ void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer, } } -StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) { - return _syncTail.multiApply(opCtx, std::move(ops)); + +// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that +// 'ops' stays valid until all scheduled work in the thread pool completes. +void scheduleWritesToOplog(OperationContext* opCtx, + StorageInterface* storageInterface, + ThreadPool* writerPool, + const MultiApplier::Operations& ops) { + auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) { + // The returned function will be run in a separate thread after this returns. Therefore all + // captures other than 'ops' must be by value since they will not be available. The caller + // guarantees that 'ops' will stay in scope until the spawned threads complete. + return [storageInterface, &ops, begin, end](auto status) { + invariant(status); + + auto opCtx = cc().makeOperationContext(); + + // This code path is only executed on secondaries and initial syncing nodes, so it is + // safe to exclude any writes from Flow Control. + opCtx->setShouldParticipateInFlowControl(false); + + UnreplicatedWritesBlock uwb(opCtx.get()); + ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( + opCtx->lockState()); + + std::vector<InsertStatement> docs; + docs.reserve(end - begin); + for (size_t i = begin; i < end; i++) { + // Add as unowned BSON to avoid unnecessary ref-count bumps. + // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed. + docs.emplace_back(InsertStatement{ops[i].getRaw(), + ops[i].getOpTime().getTimestamp(), + ops[i].getOpTime().getTerm()}); + } + + fassert(40141, + storageInterface->insertDocuments( + opCtx.get(), NamespaceString::kRsOplogNamespace, docs)); + }; + }; + + // We want to be able to take advantage of bulk inserts so we don't use multiple threads if it + // would result too little work per thread. This also ensures that we can amortize the + // setup/teardown overhead across many writes. + const size_t kMinOplogEntriesPerThread = 16; + const bool enoughToMultiThread = + ops.size() >= kMinOplogEntriesPerThread * writerPool->getStats().numThreads; + + // Only doc-locking engines support parallel writes to the oplog because they are required to + // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally, + // there would be no way to take advantage of multiple threads if a storage engine doesn't + // support document locking. + if (!enoughToMultiThread || + !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { + + writerPool->schedule(makeOplogWriterForRange(0, ops.size())); + return; + } + + + const size_t numOplogThreads = writerPool->getStats().numThreads; + const size_t numOpsPerThread = ops.size() / numOplogThreads; + for (size_t thread = 0; thread < numOplogThreads; thread++) { + size_t begin = thread * numOpsPerThread; + size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread; + writerPool->schedule(makeOplogWriterForRange(begin, end)); + } +} + +StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, + MultiApplier::Operations ops) { + invariant(!ops.empty()); + + LOG(2) << "replication batch size is " << ops.size(); + + // Stop all readers until we're done. This also prevents doc-locking engines from deleting old + // entries from the oplog until we finish writing. + Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); + + // TODO (SERVER-42996): This is a temporary invariant to protect against segfaults. This will + // be removed once ApplierState is moved from ReplicationCoordinator to OplogApplier. + invariant(_replCoord); + if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { + severe() << "attempting to replicate ops while primary"; + return {ErrorCodes::CannotApplyOplogWhilePrimary, + "attempting to replicate ops while primary"}; + } + + // Increment the batch size stat. + oplogApplicationBatchSize.increment(ops.size()); + + std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads); + { + // Each node records cumulative batch application stats for itself using this timer. + TimerHolder timer(&applyBatchStats); + + // We must wait for the all work we've dispatched to complete before leaving this block + // because the spawned threads refer to objects on the stack + ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); }); + + // Write batch of ops into oplog. + // TODO (SERVER-43651): _options currently belongs to SyncTail for use in the free + // function multiSyncApply; move this field to OplogApplierImpl. + if (!_syncTail.getOptions().skipWritesToOplog) { + _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp()); + scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops); + } + + // Holds 'pseudo operations' generated by secondaries to aid in replication. + // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied. + // Pseudo operations include: + // - applyOps operations expanded to individual ops. + // - ops to update config.transactions. Normal writes to config.transactions in the + // primary don't create an oplog entry, so extract info from writes with transactions + // and create a pseudo oplog. + std::vector<MultiApplier::Operations> derivedOps; + + std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); + _syncTail.fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); + + // Wait for writes to finish before applying ops. + _writerPool->waitForIdle(); + + // Use this fail point to hold the PBWM lock after we have written the oplog entries but + // before we have applied them. + if (MONGO_unlikely(pauseBatchApplicationAfterWritingOplogEntries.shouldFail())) { + log() << "pauseBatchApplicationAfterWritingOplogEntries fail point enabled. Blocking " + "until fail point is disabled."; + pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx); + } + + // Reset consistency markers in case the node fails while applying ops. + if (!_syncTail.getOptions().skipWritesToOplog) { + _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); + _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); + } + + { + std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK()); + + // Doles out all the work to the writer pool threads. writerVectors is not modified, + // but multiSyncApply will modify the vectors that it contains. + invariant(writerVectors.size() == statusVector.size()); + for (size_t i = 0; i < writerVectors.size(); i++) { + if (writerVectors[i].empty()) + continue; + + _writerPool->schedule( + [this, + &writer = writerVectors.at(i), + &status = statusVector.at(i), + &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) { + invariant(scheduleStatus); + + auto opCtx = cc().makeOperationContext(); + + // This code path is only executed on secondaries and initial syncing nodes, + // so it is safe to exclude any writes from Flow Control. + opCtx->setShouldParticipateInFlowControl(false); + + status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { + return _applyFunc(opCtx.get(), &writer, &_syncTail, &multikeyVector); + }); + }); + } + + _writerPool->waitForIdle(); + + // If any of the statuses is not ok, return error. + for (auto it = statusVector.cbegin(); it != statusVector.cend(); ++it) { + const auto& status = *it; + if (!status.isOK()) { + severe() + << "Failed to apply batch of operations. Number of operations in batch: " + << ops.size() << ". First operation: " << redact(ops.front().toBSON()) + << ". Last operation: " << redact(ops.back().toBSON()) + << ". Oplog application failed in writer thread " + << std::distance(statusVector.cbegin(), it) << ": " << redact(status); + return status; + } + } + } + } + + // Notify the storage engine that a replication batch has completed. This means that all the + // writes associated with the oplog entries in the batch are finished and no new writes with + // timestamps associated with those oplog entries will show up in the future. + const auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); + storageEngine->replicationBatchIsComplete(); + + // Use this fail point to hold the PBWM lock and prevent the batch from completing. + if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { + log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail " + "point is disabled."; + while (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { + if (inShutdown()) { + severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting " + "clean shutdown"; + fassertFailedNoTrace(50798); + } + sleepmillis(100); + } + } + + Timestamp firstTimeInBatch = ops.front().getTimestamp(); + // Set any indexes to multikey that this batch ignored. This must be done while holding the + // parallel batch writer mode lock. + for (WorkerMultikeyPathInfo infoVector : multikeyVector) { + for (MultikeyPathInfo info : infoVector) { + // We timestamp every multikey write with the first timestamp in the batch. It is always + // safe to set an index as multikey too early, just not too late. We conservatively pick + // the first timestamp in the batch since we do not have enough information to find out + // the timestamp of the first write that set the given multikey path. + fassert(50686, + _storageInterface->setIndexIsMultikey( + opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch)); + } + } + + // Increment the counter for the number of ops applied during catchup if the node is in catchup + // mode. + _replCoord->incrementNumCatchUpOpsIfCatchingUp(ops.size()); + + // We have now written all database writes and updated the oplog to match. + return ops.back().getOpTime(); } } // namespace repl diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index 087e5870e56..efd31736a6b 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -53,6 +53,11 @@ class OplogApplierImpl : public OplogApplier { OplogApplierImpl& operator=(const OplogApplierImpl&) = delete; public: + using MultiSyncApplyFunc = + std::function<Status(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + SyncTail* st, + WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; /** * Constructs this OplogApplier with specific options. * Obtains batches of operations from the OplogBuffer to apply. @@ -64,14 +69,13 @@ public: ReplicationCoordinator* replCoord, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, + MultiSyncApplyFunc func, const Options& options, ThreadPool* writerPool); private: void _run(OplogBuffer* oplogBuffer) override; - void _shutdown() override; - /** * Runs oplog application in a loop until shutdown() is called. * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using @@ -80,16 +84,37 @@ private: void _runLoop(OplogBuffer* oplogBuffer, OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn); - StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override; + /** + * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then + * using a set of threads to apply the operations. It writes all entries to the oplog, but only + * applies entries with timestamp >= beginApplyingTimestamp. + * + * If the batch application is successful, returns the optime of the last op applied, which + * should be the last op in the batch. + * Returns ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary. + * + * To provide crash resilience, this function will advance the persistent value of 'minValid' + * to at least the last optime of the batch. If 'minValid' is already greater than or equal + * to the last optime of this batch, it will not be updated. + */ + StatusWith<OpTime> _multiApply(OperationContext* opCtx, MultiApplier::Operations ops); // Not owned by us. ReplicationCoordinator* const _replCoord; + // Pool of worker threads for writing ops to the databases. + // Not owned by us. + ThreadPool* const _writerPool; + StorageInterface* _storageInterface; ReplicationConsistencyMarkers* const _consistencyMarkers; + // Function to use during _multiApply + MultiSyncApplyFunc _applyFunc; + // Used to run oplog application loop. + // TODO (SERVER-43651): Remove this member once sync_tail.cpp is fully merged in. SyncTail _syncTail; // Used to determine which operations should be applied during initial sync. If this is null, diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp new file mode 100644 index 00000000000..59268e41483 --- /dev/null +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -0,0 +1,1733 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" +// TODO SERVER-41882 This is a temporary inclusion to avoid duplicate definitions of free +// functions. Remove once sync_tail_test.cpp is fully merged in. +#include "mongo/db/repl/sync_tail_test_fixture.h" + +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/repl/idempotency_test_fixture.h" +#include "mongo/db/repl/sync_tail.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/stats/counters.h" +#include "mongo/unittest/death_test.h" + +namespace mongo { +namespace repl { +namespace { + +/** + * Creates collection options suitable for oplog. + */ +CollectionOptions createOplogCollectionOptions() { + CollectionOptions options; + options.capped = true; + options.cappedSize = 64 * 1024 * 1024LL; + options.autoIndexId = CollectionOptions::NO; + return options; +} + +/** + * Create test collection. + * Returns collection. + */ +void createCollection(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionOptions& options) { + writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { + Lock::DBLock dblk(opCtx, nss.db(), MODE_X); + OldClientContext ctx(opCtx, nss.ns()); + auto db = ctx.db(); + ASSERT_TRUE(db); + mongo::WriteUnitOfWork wuow(opCtx); + auto coll = db->createCollection(opCtx, nss, options); + ASSERT_TRUE(coll); + wuow.commit(); + }); +} + + +/** + * Create test collection with UUID. + */ +auto createCollectionWithUuid(OperationContext* opCtx, const NamespaceString& nss) { + CollectionOptions options; + options.uuid = UUID::gen(); + createCollection(opCtx, nss, options); + return options.uuid.get(); +} + +/** + * Create test database. + */ +void createDatabase(OperationContext* opCtx, StringData dbName) { + Lock::GlobalWrite globalLock(opCtx); + bool justCreated; + auto databaseHolder = DatabaseHolder::get(opCtx); + auto db = databaseHolder->openDb(opCtx, dbName, &justCreated); + ASSERT_TRUE(db); + ASSERT_TRUE(justCreated); +} + +DEATH_TEST_F(OplogApplierImplTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") { + // TODO SERVER-43344 Currently, this file only tests multiApply, so several parameters have been + // set to nullptr during OplogAppierImpl construction as they are not needed in multiApply. + // Update constructors as needed once the rest of sync_tail_test.cpp has been merged in. + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + noopApplyOperationFn, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + oplogApplier.multiApply(_opCtx.get(), {}).getStatus().ignore(); +} + +bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, + ReplicationCoordinator* const replCoord, + ReplicationConsistencyMarkers* const consistencyMarkers, + StorageInterface* const storageInterface, + const NamespaceString& nss, + const CollectionOptions& options) { + auto writerPool = makeReplWriterPool(); + MultiApplier::Operations operationsApplied; + auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, + MultiApplier::OperationPtrs* operationsToApply, + SyncTail* st, + WorkerMultikeyPathInfo*) -> Status { + for (auto&& opPtr : *operationsToApply) { + operationsApplied.push_back(*opPtr); + } + return Status::OK(); + }; + createCollection(opCtx, nss, options); + + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); + ASSERT_FALSE(op.isForCappedCollection); + + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + replCoord, + consistencyMarkers, + storageInterface, + applyOperationFn, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(opCtx, {op})); + ASSERT_EQUALS(op.getOpTime(), lastOpTime); + + ASSERT_EQUALS(1U, operationsApplied.size()); + const auto& opApplied = operationsApplied.front(); + ASSERT_EQUALS(op, opApplied); + // "isForCappedCollection" is not parsed from raw oplog entry document. + return opApplied.isForCappedCollection; +} + +TEST_F( + OplogApplierImplTest, + MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_FALSE(_testOplogEntryIsForCappedCollection(_opCtx.get(), + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + nss, + CollectionOptions())); +} + +TEST_F(OplogApplierImplTest, + MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + ASSERT_TRUE(_testOplogEntryIsForCappedCollection(_opCtx.get(), + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + nss, + createOplogCollectionOptions())); +} + +class MultiOplogEntryOplogApplierImplTest : public OplogApplierImplTest { +public: + MultiOplogEntryOplogApplierImplTest() + : _nss1("test.preptxn1"), _nss2("test.preptxn2"), _txnNum(1) {} + +protected: + void setUp() override { + OplogApplierImplTest::setUp(); + const NamespaceString cmdNss{"admin", "$cmd"}; + + _uuid1 = createCollectionWithUuid(_opCtx.get(), _nss1); + _uuid2 = createCollectionWithUuid(_opCtx.get(), _nss2); + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + + _lsid = makeLogicalSessionId(_opCtx.get()); + + _insertOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 1), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" + << BSON("_id" << 1))) + << "partialTxn" << true), + _lsid, + _txnNum, + StmtId(0), + OpTime()); + _insertOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 2), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o" + << BSON("_id" << 2))) + << "partialTxn" << true), + _lsid, + _txnNum, + StmtId(1), + _insertOp1->getOpTime()); + _commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o" + << BSON("_id" << 3)))), + _lsid, + _txnNum, + StmtId(2), + _insertOp2->getOpTime()); + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + stdx::lock_guard<Latch> lock(_insertMutex); + if (nss.isOplog() || nss == _nss1 || nss == _nss2 || + nss == NamespaceString::kSessionTransactionsTableNamespace) { + _insertedDocs[nss].insert(_insertedDocs[nss].end(), docs.begin(), docs.end()); + } else + FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); + }; + + _writerPool = makeReplWriterPool(); + } + + void tearDown() override { + OplogApplierImplTest::tearDown(); + } + + void checkTxnTable(const LogicalSessionId& lsid, + const TxnNumber& txnNum, + const repl::OpTime& expectedOpTime, + Date_t expectedWallClock, + boost::optional<repl::OpTime> expectedStartOpTime, + DurableTxnStateEnum expectedState) { + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + expectedOpTime, + expectedWallClock, + expectedStartOpTime, + expectedState); + } + + std::vector<BSONObj>& oplogDocs() { + return _insertedDocs[NamespaceString::kRsOplogNamespace]; + } + +protected: + NamespaceString _nss1; + NamespaceString _nss2; + boost::optional<UUID> _uuid1; + boost::optional<UUID> _uuid2; + LogicalSessionId _lsid; + TxnNumber _txnNum; + boost::optional<OplogEntry> _insertOp1, _insertOp2; + boost::optional<OplogEntry> _commitOp; + std::map<NamespaceString, std::vector<BSONObj>> _insertedDocs; + std::unique_ptr<ThreadPool> _writerPool; + +private: + Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntryOplogApplierImplTest::_insertMutex"); +}; + +TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSeparate) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + + // Apply a batch with only the first operation. This should result in the first oplog entry + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = _insertOp1->getOpTime(); + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1})); + ASSERT_EQ(1U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + _insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Apply a batch with only the second operation. This should result in the second oplog entry + // being put in the oplog, but with no effect because the operation is part of a pending + // transaction. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp2})); + ASSERT_EQ(2U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + // The transaction table should not have been updated for partialTxn operations that are not the + // first in a transaction. + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + _insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and the two previous entries being applied. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitOp})); + ASSERT_EQ(3U, oplogDocs().size()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(2U, _insertedDocs[_nss2].size()); + ASSERT_BSONOBJ_EQ(oplogDocs().back(), _commitOp->getRaw()); + checkTxnTable(_lsid, + _txnNum, + _commitOp->getOpTime(), + _commitOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionAllAtOnce) { + // Skipping writes to oplog proves we're testing the code path which does not rely on reading + // the oplog. + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), + _writerPool.get()); + + // Apply both inserts and the commit in a single batch. We expect no oplog entries to + // be inserted (because we've set skipWritesToOplog), and both entries to be committed. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp})); + ASSERT_EQ(0U, oplogDocs().size()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(2U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _commitOp->getOpTime(), + _commitOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionTwoBatches) { + // Tests an unprepared transaction with ops both in the batch with the commit and prior + // batches. + // Populate transaction with 4 linked inserts, one in nss2 and the others in nss1. + std::vector<OplogEntry> insertOps; + std::vector<BSONObj> insertDocs; + + const NamespaceString cmdNss{"admin", "$cmd"}; + for (int i = 0; i < 4; i++) { + insertDocs.push_back(BSON("_id" << i)); + insertOps.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), i + 1), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << (i == 1 ? _nss2.ns() : _nss1.ns()) << "ui" + << (i == 1 ? *_uuid2 : *_uuid1) << "o" + << insertDocs.back())) + << "partialTxn" << true), + _lsid, + _txnNum, + StmtId(i), + i == 0 ? OpTime() : insertOps.back().getOpTime())); + } + auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 5), 1LL}, + cmdNss, + BSON("applyOps" << BSONArray()), + _lsid, + _txnNum, + StmtId(4), + insertOps.back().getOpTime()); + + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + + // Insert the first entry in its own batch. This should result in the oplog entry being written + // but the entry should not be applied as it is part of a pending transaction. + const auto expectedStartOpTime = insertOps[0].getOpTime(); + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOps[0]})); + ASSERT_EQ(1U, oplogDocs().size()); + ASSERT_EQ(0U, _insertedDocs[_nss1].size()); + ASSERT_EQ(0U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + insertOps[0].getOpTime(), + insertOps[0].getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Insert the rest of the entries, including the commit. These entries should be added to the + // oplog, and all the entries including the first should be applied. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), + {insertOps[1], insertOps[2], insertOps[3], commitOp})); + ASSERT_EQ(5U, oplogDocs().size()); + ASSERT_EQ(3U, _insertedDocs[_nss1].size()); + ASSERT_EQ(1U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + + // Check docs and ordering of docs in nss1. + // The insert into nss2 is unordered with respect to those. + ASSERT_BSONOBJ_EQ(insertDocs[0], _insertedDocs[_nss1][0]); + ASSERT_BSONOBJ_EQ(insertDocs[1], _insertedDocs[_nss2].front()); + ASSERT_BSONOBJ_EQ(insertDocs[2], _insertedDocs[_nss1][1]); + ASSERT_BSONOBJ_EQ(insertDocs[3], _insertedDocs[_nss1][2]); +} + +TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) { + // Tests that two transactions on the same session ID in the same batch both + // apply correctly. + TxnNumber txnNum1(1); + TxnNumber txnNum2(2); + + + std::vector<OplogEntry> insertOps1, insertOps2; + const NamespaceString cmdNss{"admin", "$cmd"}; + insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 1), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" + << BSON("_id" << 1))) + << "partialTxn" << true), + _lsid, + txnNum1, + StmtId(0), + OpTime())); + insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 2), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" + << BSON("_id" << 2))) + << "partialTxn" << true), + + _lsid, + txnNum1, + StmtId(1), + insertOps1.back().getOpTime())); + insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(2), 1), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" + << BSON("_id" << 3))) + << "partialTxn" << true), + _lsid, + txnNum2, + StmtId(0), + OpTime())); + insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(2), 2), 1LL}, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" + << BSON("_id" << 4))) + << "partialTxn" << true), + _lsid, + txnNum2, + StmtId(1), + insertOps2.back().getOpTime())); + auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 3), 1LL}, + _nss1, + BSON("applyOps" << BSONArray()), + _lsid, + txnNum1, + StmtId(2), + insertOps1.back().getOpTime()); + auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 3), 1LL}, + _nss1, + BSON("applyOps" << BSONArray()), + _lsid, + txnNum2, + StmtId(2), + insertOps2.back().getOpTime()); + + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + + // Note the insert counter so we can check it later. It is necessary to use opCounters as + // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in + // the opObserver. + int insertsBefore = replOpCounters.getInsert()->load(); + // Insert all the oplog entries in one batch. All inserts should be executed, in order, exactly + // once. + ASSERT_OK(oplogApplier.multiApply( + _opCtx.get(), + {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2})); + ASSERT_EQ(6U, oplogDocs().size()); + ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore); + ASSERT_EQ(4U, _insertedDocs[_nss1].size()); + checkTxnTable(_lsid, + txnNum2, + commitOp2.getOpTime(), + commitOp2.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + + // Check docs and ordering of docs in nss1. + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), _insertedDocs[_nss1][0]); + ASSERT_BSONOBJ_EQ(BSON("_id" << 2), _insertedDocs[_nss1][1]); + ASSERT_BSONOBJ_EQ(BSON("_id" << 3), _insertedDocs[_nss1][2]); + ASSERT_BSONOBJ_EQ(BSON("_id" << 4), _insertedDocs[_nss1][3]); +} + + +class MultiOplogEntryPreparedTransactionTest : public MultiOplogEntryOplogApplierImplTest { +protected: + void setUp() override { + MultiOplogEntryOplogApplierImplTest::setUp(); + + _prepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + _nss1, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o" + << BSON("_id" << 3))) + << "prepare" << true), + _lsid, + _txnNum, + StmtId(2), + _insertOp2->getOpTime()); + _singlePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + _nss1, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" + << BSON("_id" << 0))) + << "prepare" << true), + _lsid, + _txnNum, + StmtId(0), + OpTime()); + _commitPrepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 4), 1LL}, + _nss1, + BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)), + _lsid, + _txnNum, + StmtId(3), + _prepareWithPrevOp->getOpTime()); + _commitSinglePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 4), 1LL}, + _nss1, + BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)), + _lsid, + _txnNum, + StmtId(1), + _prepareWithPrevOp->getOpTime()); + _abortPrepareWithPrevOp = + makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL}, + _nss1, + BSON("abortTransaction" << 1), + _lsid, + _txnNum, + StmtId(3), + _prepareWithPrevOp->getOpTime()); + _abortSinglePrepareApplyOp = _abortPrepareWithPrevOp = + makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL}, + _nss1, + BSON("abortTransaction" << 1), + _lsid, + _txnNum, + StmtId(1), + _singlePrepareApplyOp->getOpTime()); + } + +protected: + boost::optional<OplogEntry> _commitPrepareWithPrevOp, _abortPrepareWithPrevOp, + _singlePrepareApplyOp, _prepareWithPrevOp, _commitSinglePrepareApplyOp, + _abortSinglePrepareApplyOp; + +private: + Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntryPreparedTransactionTest::_insertMutex"); +}; + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionSteadyState) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + + // Apply a batch with the insert operations. This should result in the oplog entries + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = _insertOp1->getOpTime(); + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + ASSERT_EQ(2U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); + ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + _insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Apply a batch with only the prepare. This should result in the prepare being put in the + // oplog, and the two previous entries being applied (but in a transaction) along with the + // nested insert in the prepare oplog entry. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + ASSERT_EQ(3U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(2U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _prepareWithPrevOp->getOpTime(), + _prepareWithPrevOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and the three previous entries being committed. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); + ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(2U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _commitPrepareWithPrevOp->getOpTime(), + _commitPrepareWithPrevOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + + // Apply a batch with the insert operations. This should result in the oplog entries + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = _insertOp1->getOpTime(); + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + _insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Apply a batch with only the prepare. This should result in the prepare being put in the + // oplog, and the two previous entries being applied (but in a transaction) along with the + // nested insert in the prepare oplog entry. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + checkTxnTable(_lsid, + _txnNum, + _prepareWithPrevOp->getOpTime(), + _prepareWithPrevOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the abort. This should result in the abort being put in the + // oplog and the transaction table being updated accordingly. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp})); + ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(2U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _abortPrepareWithPrevOp->getOpTime(), + _abortPrepareWithPrevOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kAborted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInitialSync) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), + _writerPool.get()); + // Apply a batch with the insert operations. This should result in the oplog entries + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = _insertOp1->getOpTime(); + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + ASSERT_EQ(2U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); + ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + _insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Apply a batch with only the prepare applyOps. This should result in the prepare being put in + // the oplog, but, since this is initial sync, nothing else. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + ASSERT_EQ(3U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _prepareWithPrevOp->getOpTime(), + _prepareWithPrevOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and the three previous entries being applied. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); + ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(2U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _commitPrepareWithPrevOp->getOpTime(), + _commitPrepareWithPrevOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionRecovery) { + // For recovery, the oplog must contain the operations before starting. + for (auto&& entry : + {*_insertOp1, *_insertOp2, *_prepareWithPrevOp, *_commitPrepareWithPrevOp}) { + ASSERT_OK(getStorageInterface()->insertDocument( + _opCtx.get(), + NamespaceString::kRsOplogNamespace, + {entry.toBSON(), entry.getOpTime().getTimestamp()}, + entry.getOpTime().getTerm())); + } + // Ignore docs inserted into oplog in setup. + oplogDocs().clear(); + + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), + _writerPool.get()); + + // Apply a batch with the insert operations. This should have no effect, because this is + // recovery. + const auto expectedStartOpTime = _insertOp1->getOpTime(); + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + ASSERT_TRUE(oplogDocs().empty()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + _insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Apply a batch with only the prepare applyOps. This should have no effect, since this is + // recovery. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + ASSERT_TRUE(oplogDocs().empty()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _prepareWithPrevOp->getOpTime(), + _prepareWithPrevOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the commit. This should result in the the three previous entries + // being applied. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); + ASSERT_TRUE(oplogDocs().empty()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(2U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _commitPrepareWithPrevOp->getOpTime(), + _commitPrepareWithPrevOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedTransaction) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); + + // Apply a batch with only the prepare applyOps. This should result in the prepare being put in + // the oplog, and the nested insert being applied (but in a transaction). + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + ASSERT_EQ(1U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + checkTxnTable(_lsid, + _txnNum, + _singlePrepareApplyOp->getOpTime(), + _singlePrepareApplyOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and prepared insert being committed. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _commitSinglePrepareApplyOp->getOpTime(), + _commitSinglePrepareApplyOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTransaction) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + + auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 3), 1LL}, + _nss1, + BSON("applyOps" << BSONArray() << "prepare" << true), + _lsid, + _txnNum, + StmtId(0), + OpTime()); + const auto expectedStartOpTime = emptyPrepareApplyOp.getOpTime(); + + // Apply a batch with only the prepare applyOps. This should result in the prepare being put in + // the oplog, and the nested insert being applied (but in a transaction). + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {emptyPrepareApplyOp})); + ASSERT_EQ(1U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + checkTxnTable(_lsid, + _txnNum, + emptyPrepareApplyOp.getOpTime(), + emptyPrepareApplyOp.getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and prepared insert being committed. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _commitSinglePrepareApplyOp->getOpTime(), + _commitSinglePrepareApplyOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPreparedTransaction) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + _writerPool.get()); + + const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); + // Apply a batch with only the prepare applyOps. This should result in the prepare being put in + // the oplog, and the nested insert being applied (but in a transaction). + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + checkTxnTable(_lsid, + _txnNum, + _singlePrepareApplyOp->getOpTime(), + _singlePrepareApplyOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the abort. This should result in the abort being put in the + // oplog and the transaction table being updated accordingly. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp})); + ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _abortSinglePrepareApplyOp->getOpTime(), + _abortSinglePrepareApplyOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kAborted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, + MultiApplySingleApplyOpsPreparedTransactionInitialSync) { + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), + _writerPool.get()); + + const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); + + // Apply a batch with only the prepare applyOps. This should result in the prepare being put in + // the oplog, but, since this is initial sync, nothing else. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + ASSERT_EQ(1U, oplogDocs().size()); + ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _singlePrepareApplyOp->getOpTime(), + _singlePrepareApplyOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the commit. This should result in the commit being put in the + // oplog, and the previous entry being applied. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _commitSinglePrepareApplyOp->getOpTime(), + _commitSinglePrepareApplyOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, + MultiApplySingleApplyOpsPreparedTransactionRecovery) { + // For recovery, the oplog must contain the operations before starting. + for (auto&& entry : {*_singlePrepareApplyOp, *_commitPrepareWithPrevOp}) { + ASSERT_OK(getStorageInterface()->insertDocument( + _opCtx.get(), + NamespaceString::kRsOplogNamespace, + {entry.toBSON(), entry.getOpTime().getTimestamp()}, + entry.getOpTime().getTerm())); + } + // Ignore docs inserted into oplog in setup. + oplogDocs().clear(); + + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), + _writerPool.get()); + + const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); + + // Apply a batch with only the prepare applyOps. This should have no effect, since this is + // recovery. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + ASSERT_TRUE(oplogDocs().empty()); + ASSERT_TRUE(_insertedDocs[_nss1].empty()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _singlePrepareApplyOp->getOpTime(), + _singlePrepareApplyOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the commit. This should result in the previous entry being + // applied. + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_TRUE(oplogDocs().empty()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _commitSinglePrepareApplyOp->getOpTime(), + _commitSinglePrepareApplyOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +class OplogApplierImplTxnTableTest : public OplogApplierImplTest { +public: + void setUp() override { + OplogApplierImplTest::setUp(); + + MongoDSessionCatalog::onStepUp(_opCtx.get()); + + DBDirectClient client(_opCtx.get()); + BSONObj result; + ASSERT(client.runCommand(kNs.db().toString(), BSON("create" << kNs.coll()), result)); + } + + /** + * Creates an OplogEntry with given parameters and preset defaults for this test suite. + */ + repl::OplogEntry makeOplogEntry(const NamespaceString& ns, + repl::OpTime opTime, + repl::OpTypeEnum opType, + BSONObj object, + boost::optional<BSONObj> object2, + const OperationSessionInfo& sessionInfo, + Date_t wallClockTime) { + return repl::OplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + ns, // namespace + boost::none, // uuid + boost::none, // fromMigrate + 0, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // false + wallClockTime, // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime + } + + /** + * Creates an OplogEntry with given parameters and preset defaults for this test suite. + */ + repl::OplogEntry makeOplogEntryForMigrate(const NamespaceString& ns, + repl::OpTime opTime, + repl::OpTypeEnum opType, + BSONObj object, + boost::optional<BSONObj> object2, + const OperationSessionInfo& sessionInfo, + Date_t wallClockTime) { + return repl::OplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + ns, // namespace + boost::none, // uuid + true, // fromMigrate + 0, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // false + wallClockTime, // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime + } + + void checkTxnTable(const OperationSessionInfo& sessionInfo, + const repl::OpTime& expectedOpTime, + Date_t expectedWallClock) { + invariant(sessionInfo.getSessionId()); + invariant(sessionInfo.getTxnNumber()); + + repl::checkTxnTable(_opCtx.get(), + *sessionInfo.getSessionId(), + *sessionInfo.getTxnNumber(), + expectedOpTime, + expectedWallClock, + {}, + {}); + } + + static const NamespaceString& nss() { + return kNs; + } + +private: + static const NamespaceString kNs; +}; + +const NamespaceString OplogApplierImplTxnTableTest::kNs("test.foo"); + +TEST_F(OplogApplierImplTxnTableTest, SimpleWriteWithTxn) { + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + const auto date = Date_t::now(); + + auto insertOp = makeOplogEntry(nss(), + {Timestamp(1, 0), 1}, + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + sessionInfo, + date); + + auto writerPool = makeReplWriterPool(); + + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp})); + + checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date); +} + +TEST_F(OplogApplierImplTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + const auto date = Date_t::now(); + + auto insertOp = makeOplogEntry(nss(), + {Timestamp(1, 0), 1}, + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + sessionInfo, + date); + + auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, + {Timestamp(2, 0), 1}, + repl::OpTypeEnum::kDelete, + BSON("_id" << sessionInfo.getSessionId()->toBSON()), + boost::none, + {}, + Date_t::now()); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp, deleteOp})); + + ASSERT_FALSE(docExists( + _opCtx.get(), + NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()))); +} + +TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTable) { + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + auto date = Date_t::now(); + + auto insertOp = makeOplogEntry(nss(), + {Timestamp(1, 0), 1}, + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + sessionInfo, + date); + + auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, + {Timestamp(2, 0), 1}, + repl::OpTypeEnum::kDelete, + BSON("_id" << sessionInfo.getSessionId()->toBSON()), + boost::none, + {}, + Date_t::now()); + + date = Date_t::now(); + sessionInfo.setTxnNumber(7); + auto insertOp2 = makeOplogEntry(nss(), + {Timestamp(3, 0), 2}, + repl::OpTypeEnum::kInsert, + BSON("_id" << 6), + boost::none, + sessionInfo, + date); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2})); + + checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date); +} + +TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTable) { + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + auto date = Date_t::now(); + + auto insertOp = makeOplogEntry(nss(), + {Timestamp(1, 0), 1}, + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + sessionInfo, + date); + + repl::OpTime newWriteOpTime(Timestamp(2, 0), 1); + auto updateOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, + {Timestamp(4, 0), 1}, + repl::OpTypeEnum::kUpdate, + BSON("$set" << BSON("lastWriteOpTime" << newWriteOpTime)), + BSON("_id" << sessionInfo.getSessionId()->toBSON()), + {}, + Date_t::now()); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOp, updateOp})); + + checkTxnTable(sessionInfo, newWriteOpTime, date); +} + +TEST_F(OplogApplierImplTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSession) { + const NamespaceString cmdNss{"admin", "$cmd"}; + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + auto date = Date_t::now(); + auto uuid = [&] { + return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid(); + }(); + + repl::OpTime retryableInsertOpTime(Timestamp(1, 0), 1); + + auto retryableInsertOp = makeOplogEntry(nss(), + retryableInsertOpTime, + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + sessionInfo, + date); + + repl::OpTime txnInsertOpTime(Timestamp(2, 0), 1); + sessionInfo.setTxnNumber(4); + + auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + txnInsertOpTime, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << nss().ns() << "ui" << uuid << "o" + << BSON("_id" << 2))) + << "partialTxn" << true), + sessionId, + *sessionInfo.getTxnNumber(), + StmtId(0), + OpTime()); + + repl::OpTime txnCommitOpTime(Timestamp(3, 0), 1); + auto txnCommitOp = + makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime, + cmdNss, + BSON("applyOps" << BSONArray()), + sessionId, + *sessionInfo.getTxnNumber(), + StmtId(1), + txnInsertOpTime); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp})); + + repl::checkTxnTable(_opCtx.get(), + *sessionInfo.getSessionId(), + *sessionInfo.getTxnNumber(), + txnCommitOpTime, + txnCommitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(OplogApplierImplTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSession) { + const NamespaceString cmdNss{"admin", "$cmd"}; + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(3); + auto date = Date_t::now(); + auto uuid = [&] { + return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid(); + }(); + + repl::OpTime txnInsertOpTime(Timestamp(1, 0), 1); + auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId( + txnInsertOpTime, + cmdNss, + BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" << nss().ns() << "ui" << uuid << "o" + << BSON("_id" << 2))) + << "partialTxn" << true), + sessionId, + *sessionInfo.getTxnNumber(), + StmtId(0), + OpTime()); + + repl::OpTime txnCommitOpTime(Timestamp(2, 0), 1); + auto txnCommitOp = + makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime, + cmdNss, + BSON("applyOps" << BSONArray()), + sessionId, + *sessionInfo.getTxnNumber(), + StmtId(1), + txnInsertOpTime); + + repl::OpTime retryableInsertOpTime(Timestamp(3, 0), 1); + sessionInfo.setTxnNumber(4); + + auto retryableInsertOp = makeOplogEntry(nss(), + retryableInsertOpTime, + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + sessionInfo, + date); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp})); + + repl::checkTxnTable(_opCtx.get(), + *sessionInfo.getSessionId(), + *sessionInfo.getTxnNumber(), + retryableInsertOpTime, + retryableInsertOp.getWallClockTime(), + boost::none, + boost::none); +} + + +TEST_F(OplogApplierImplTxnTableTest, MultiApplyUpdatesTheTransactionTable) { + NamespaceString ns0("test.0"); + NamespaceString ns1("test.1"); + NamespaceString ns2("test.2"); + NamespaceString ns3("test.3"); + + DBDirectClient client(_opCtx.get()); + BSONObj result; + ASSERT(client.runCommand(ns0.db().toString(), BSON("create" << ns0.coll()), result)); + ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result)); + ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result)); + ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result)); + auto uuid0 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns0).getCollection()->uuid(); + }(); + auto uuid1 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns1).getCollection()->uuid(); + }(); + auto uuid2 = [&] { + return AutoGetCollectionForRead(_opCtx.get(), ns2).getCollection()->uuid(); + }(); + + // Entries with a session id and a txnNumber update the transaction table. + auto lsidSingle = makeLogicalSessionIdForTest(); + auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0); + + // For entries with the same session, the entry with a larger txnNumber is saved. + auto lsidDiffTxn = makeLogicalSessionIdForTest(); + auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1); + auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1); + + // For entries with the same session and txnNumber, the later optime is saved. + auto lsidSameTxn = makeLogicalSessionIdForTest(); + auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0); + auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( + {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1); + + // Entries with a session id but no txnNumber do not lead to updates. + auto lsidNoTxn = makeLogicalSessionIdForTest(); + OperationSessionInfo info; + info.setSessionId(lsidNoTxn); + auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo( + {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply( + _opCtx.get(), + {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn})); + + // The txnNum and optime of the only write were saved. + auto resultSingleDoc = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON())); + ASSERT_TRUE(!resultSingleDoc.isEmpty()); + + auto resultSingle = + SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc); + + ASSERT_EQ(resultSingle.getTxnNum(), 5LL); + ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1)); + + // The txnNum and optime of the write with the larger txnNum were saved. + auto resultDiffTxnDoc = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON())); + ASSERT_TRUE(!resultDiffTxnDoc.isEmpty()); + + auto resultDiffTxn = + SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc); + + ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL); + ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1)); + + // The txnNum and optime of the write with the later optime were saved. + auto resultSameTxnDoc = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON())); + ASSERT_TRUE(!resultSameTxnDoc.isEmpty()); + + auto resultSameTxn = + SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc); + + ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL); + ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1)); + + // There is no entry for the write with no txnNumber. + auto resultNoTxn = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON())); + ASSERT_TRUE(resultNoTxn.isEmpty()); +} + +TEST_F(OplogApplierImplTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) { + const auto insertLsid = makeLogicalSessionIdForTest(); + OperationSessionInfo insertSessionInfo; + insertSessionInfo.setSessionId(insertLsid); + insertSessionInfo.setTxnNumber(3); + auto date = Date_t::now(); + + auto innerOplog = makeOplogEntry(nss(), + {Timestamp(10, 10), 1}, + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + insertSessionInfo, + date); + + auto outerInsertDate = Date_t::now(); + auto insertOplog = makeOplogEntryForMigrate(nss(), + {Timestamp(40, 0), 1}, + repl::OpTypeEnum::kNoop, + BSON("$sessionMigrateInfo" << 1), + innerOplog.toBSON(), + insertSessionInfo, + outerInsertDate); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {insertOplog})); + + checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate); +} + +TEST_F(OplogApplierImplTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) { + const auto preImageLsid = makeLogicalSessionIdForTest(); + OperationSessionInfo preImageSessionInfo; + preImageSessionInfo.setSessionId(preImageLsid); + preImageSessionInfo.setTxnNumber(3); + auto preImageDate = Date_t::now(); + + auto preImageOplog = makeOplogEntryForMigrate(nss(), + {Timestamp(30, 0), 1}, + repl::OpTypeEnum::kNoop, + BSON("_id" << 1), + boost::none, + preImageSessionInfo, + preImageDate); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {preImageOplog})); + + ASSERT_FALSE(docExists(_opCtx.get(), + NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName + << preImageSessionInfo.getSessionId()->toBSON()))); +} + +TEST_F(OplogApplierImplTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) { + const auto lsid = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(lsid); + sessionInfo.setTxnNumber(3); + + auto oplog = makeOplogEntry(nss(), + {Timestamp(30, 0), 1}, + repl::OpTypeEnum::kNoop, + BSON("_id" << 1), + boost::none, + sessionInfo, + Date_t::now()); + + auto writerPool = makeReplWriterPool(); + NoopOplogApplierObserver observer; + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + &observer, + ReplicationCoordinator::get(_opCtx.get()), + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + + ASSERT_OK(oplogApplier.multiApply(_opCtx.get(), {oplog})); + + ASSERT_FALSE(docExists( + _opCtx.get(), + NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()))); +} + +} // namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp new file mode 100644 index 00000000000..1e85b6eba26 --- /dev/null +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" + +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/repl/drop_pending_collection_reaper.h" +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/replication_consistency_markers_mock.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/storage_interface_impl.h" + +namespace mongo { +namespace repl { + +void OplogApplierImplOpObserver::onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) { + if (!onInsertsFn) { + return; + } + std::vector<BSONObj> docs; + for (auto it = begin; it != end; ++it) { + const InsertStatement& insertStatement = *it; + docs.push_back(insertStatement.doc.getOwned()); + } + onInsertsFn(opCtx, nss, docs); +} + +void OplogApplierImplOpObserver::onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) { + if (!onDeleteFn) { + return; + } + onDeleteFn(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc); +} + +void OplogApplierImplOpObserver::onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex, + const OplogSlot& createOpTime) { + if (!onCreateCollectionFn) { + return; + } + onCreateCollectionFn(opCtx, coll, collectionName, options, idIndex); +} + +void OplogApplierImplTest::setUp() { + ServiceContextMongoDTest::setUp(); + + serviceContext = getServiceContext(); + _opCtx = cc().makeOperationContext(); + + ReplicationCoordinator::set(serviceContext, + std::make_unique<ReplicationCoordinatorMock>(serviceContext)); + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + + StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>()); + + DropPendingCollectionReaper::set( + serviceContext, std::make_unique<DropPendingCollectionReaper>(getStorageInterface())); + repl::setOplogCollectionName(serviceContext); + repl::createOplog(_opCtx.get()); + + _consistencyMarkers = std::make_unique<ReplicationConsistencyMarkersMock>(); + + // Set up an OpObserver to track the documents OplogApplierImpl inserts. + auto opObserver = std::make_unique<OplogApplierImplOpObserver>(); + _opObserver = opObserver.get(); + auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver()); + opObserverRegistry->addObserver(std::move(opObserver)); + + // Initialize the featureCompatibilityVersion server parameter. This is necessary because this + // test fixture does not create a featureCompatibilityVersion document from which to initialize + // the server parameter. + serverGlobalParams.featureCompatibility.setVersion( + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44); +} + +void OplogApplierImplTest::tearDown() { + _opCtx.reset(); + _consistencyMarkers = {}; + DropPendingCollectionReaper::set(serviceContext, {}); + StorageInterface::set(serviceContext, {}); + ServiceContextMongoDTest::tearDown(); +} + +ReplicationConsistencyMarkers* OplogApplierImplTest::getConsistencyMarkers() const { + return _consistencyMarkers.get(); +} + +StorageInterface* OplogApplierImplTest::getStorageInterface() const { + return StorageInterface::get(serviceContext); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h new file mode 100644 index 00000000000..31c703bc97e --- /dev/null +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/db/concurrency/lock_manager_defs.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/op_observer_noop.h" +#include "mongo/db/repl/oplog_applier_impl.h" +#include "mongo/db/repl/replication_consistency_markers.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_txn_record_gen.h" + +namespace mongo { + +class BSONObj; +class OperationContext; + +namespace repl { + +/** + * OpObserver for OplogApplierImpl test fixture. + */ +class OplogApplierImplOpObserver : public OpObserverNoop { +public: + /** + * This function is called whenever OplogApplierImpl inserts documents into a collection. + */ + void onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) override; + + /** + * This function is called whenever OplogApplierImpl deletes a document from a collection. + */ + void onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) override; + + /** + * Called when OplogApplierImpl creates a collection. + */ + void onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex, + const OplogSlot& createOpTime) override; + + // Hooks for OpObserver functions. Defaults to a no-op function but may be overridden to check + // actual documents mutated. + std::function<void(OperationContext*, const NamespaceString&, const std::vector<BSONObj>&)> + onInsertsFn; + + std::function<void(OperationContext*, + const NamespaceString&, + OptionalCollectionUUID, + StmtId, + bool, + const boost::optional<BSONObj>&)> + onDeleteFn; + + std::function<void(OperationContext*, + Collection*, + const NamespaceString&, + const CollectionOptions&, + const BSONObj&)> + onCreateCollectionFn; +}; + +class OplogApplierImplTest : public ServiceContextMongoDTest { +protected: + void _testSyncApplyCrudOperation(ErrorCodes::Error expectedError, + const OplogEntry& op, + bool expectedApplyOpCalled); + + Status _syncApplyWrapper(OperationContext* opCtx, + const OplogEntryBatch& batch, + OplogApplication::Mode oplogApplicationMode); + + ServiceContext::UniqueOperationContext _opCtx; + std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; + ServiceContext* serviceContext; + OplogApplierImplOpObserver* _opObserver = nullptr; + + // Implements the OplogApplierImpl::MultiSyncApplyFn interface and does nothing. + static Status noopApplyOperationFn(OperationContext*, + MultiApplier::OperationPtrs*, + SyncTail* st, + WorkerMultikeyPathInfo*) { + return Status::OK(); + } + + OpTime nextOpTime() { + static long long lastSecond = 1; + return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL); + } + + void setUp() override; + void tearDown() override; + + + ReplicationCoordinator* getReplCoord() const; + ReplicationConsistencyMarkers* getConsistencyMarkers() const; + StorageInterface* getStorageInterface() const; + + Status runOpSteadyState(const OplogEntry& op); + Status runOpsSteadyState(std::vector<OplogEntry> ops); + Status runOpInitialSync(const OplogEntry& entry); + Status runOpsInitialSync(std::vector<OplogEntry> ops); + Status runOpPtrsInitialSync(MultiApplier::OperationPtrs ops); + + UUID kUuid{UUID::gen()}; +}; + +Status failedApplyCommand(OperationContext* opCtx, + const BSONObj& theOperation, + OplogApplication::Mode); + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp index 5b086263114..ca2795a6b39 100644 --- a/src/mongo/db/repl/oplog_applier_test.cpp +++ b/src/mongo/db/repl/oplog_applier_test.cpp @@ -55,7 +55,6 @@ public: explicit OplogApplierMock(OplogBuffer* oplogBuffer); void _run(OplogBuffer* oplogBuffer) final; - void _shutdown() final; StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final; }; @@ -67,8 +66,6 @@ OplogApplierMock::OplogApplierMock(OplogBuffer* oplogBuffer) void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {} -void OplogApplierMock::_shutdown() {} - StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) { return OpTime(); } diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/opqueue_batcher.cpp index 8c3802c1e3f..a7b41404193 100644 --- a/src/mongo/db/repl/opqueue_batcher.cpp +++ b/src/mongo/db/repl/opqueue_batcher.cpp @@ -36,11 +36,11 @@ namespace mongo { namespace repl { -OpQueueBatcher::OpQueueBatcher(SyncTail* syncTail, +OpQueueBatcher::OpQueueBatcher(OplogApplier* oplogApplier, StorageInterface* storageInterface, OplogBuffer* oplogBuffer, OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) - : _syncTail(syncTail), + : _oplogApplier(oplogApplier), _storageInterface(storageInterface), _oplogBuffer(oplogBuffer), _getNextApplierBatchFn(getNextApplierBatchFn), @@ -126,7 +126,7 @@ void OpQueueBatcher::run() { // If we don't have anything in the queue, wait a bit for something to appear. if (oplogEntries.empty()) { - if (_syncTail->inShutdown()) { + if (_oplogApplier->inShutdown()) { ops.setMustShutdownFlag(); } else { // Block up to 1 second. diff --git a/src/mongo/db/repl/opqueue_batcher.h b/src/mongo/db/repl/opqueue_batcher.h index 0cdad053ddd..48432377dd5 100644 --- a/src/mongo/db/repl/opqueue_batcher.h +++ b/src/mongo/db/repl/opqueue_batcher.h @@ -30,7 +30,7 @@ #pragma once #include "mongo/db/repl/initial_syncer.h" -#include "mongo/db/repl/sync_tail.h" +#include "mongo/db/repl/oplog_applier_impl.h" namespace mongo { namespace repl { @@ -122,7 +122,7 @@ public: /** * Constructs an OpQueueBatcher */ - OpQueueBatcher(SyncTail* syncTail, + OpQueueBatcher(OplogApplier* oplogApplier, StorageInterface* storageInterface, OplogBuffer* oplogBuffer, OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn); @@ -143,7 +143,7 @@ private: void run(); - SyncTail* const _syncTail; + OplogApplier* _oplogApplier; StorageInterface* const _storageInterface; OplogBuffer* const _oplogBuffer; OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index d1975168f77..49be20493d8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -223,6 +223,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( replCoord, _replicationProcess->getConsistencyMarkers(), _storageInterface, + multiSyncApply, OplogApplier::Options(OplogApplication::Mode::kSecondary), _writerPool.get()); diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 42b379c832e..4ef5b7bcc75 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -360,9 +360,10 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, OplogApplierImpl oplogApplier(nullptr, &oplogBuffer, &stats, - nullptr, + ReplicationCoordinator::get(opCtx), _consistencyMarkers, _storageInterface, + multiSyncApply, OplogApplier::Options(OplogApplication::Mode::kRecovering), writerPool.get()); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 18b3d7e0495..a5402fadfa7 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -85,23 +85,12 @@ namespace mongo { namespace repl { namespace { -MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion); -MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationAfterWritingOplogEntries); MONGO_FAIL_POINT_DEFINE(hangAfterRecordingOpApplicationStartTime); // The oplog entries applied Counter64 opsAppliedStats; ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats); -// Tracks the oplog application batch size. -Counter64 oplogApplicationBatchSize; -ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.batchSize", - &oplogApplicationBatchSize); - -// Number and time of each ApplyOps worker pool round -TimerStats applyBatchStats; -ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); - NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { auto optionalUuid = oplogEntry.getUuid(); if (!optionalUuid) { @@ -165,17 +154,9 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod } // namespace SyncTail::SyncTail(OplogApplier::Observer* observer, - ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, - MultiSyncApplyFunc func, - ThreadPool* writerPool, const OplogApplier::Options& options) - : _observer(observer), - _consistencyMarkers(consistencyMarkers), - _storageInterface(storageInterface), - _applyFunc(func), - _writerPool(writerPool), - _options(options) {} + : _observer(observer), _storageInterface(storageInterface), _options(options) {} SyncTail::~SyncTail() {} @@ -185,74 +166,6 @@ const OplogApplier::Options& SyncTail::getOptions() const { namespace { -// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that 'ops' -// stays valid until all scheduled work in the thread pool completes. -void scheduleWritesToOplog(OperationContext* opCtx, - StorageInterface* storageInterface, - ThreadPool* threadPool, - const MultiApplier::Operations& ops) { - - auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) { - // The returned function will be run in a separate thread after this returns. Therefore all - // captures other than 'ops' must be by value since they will not be available. The caller - // guarantees that 'ops' will stay in scope until the spawned threads complete. - return [storageInterface, &ops, begin, end](auto status) { - invariant(status); - - auto opCtx = cc().makeOperationContext(); - - // This code path is only executed on secondaries and initial syncing nodes, so it is - // safe to exclude any writes from Flow Control. - opCtx->setShouldParticipateInFlowControl(false); - - UnreplicatedWritesBlock uwb(opCtx.get()); - ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( - opCtx->lockState()); - - std::vector<InsertStatement> docs; - docs.reserve(end - begin); - for (size_t i = begin; i < end; i++) { - // Add as unowned BSON to avoid unnecessary ref-count bumps. - // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed. - docs.emplace_back(InsertStatement{ops[i].getRaw(), - ops[i].getOpTime().getTimestamp(), - ops[i].getOpTime().getTerm()}); - } - - fassert(40141, - storageInterface->insertDocuments( - opCtx.get(), NamespaceString::kRsOplogNamespace, docs)); - }; - }; - - // We want to be able to take advantage of bulk inserts so we don't use multiple threads if it - // would result too little work per thread. This also ensures that we can amortize the - // setup/teardown overhead across many writes. - const size_t kMinOplogEntriesPerThread = 16; - const bool enoughToMultiThread = - ops.size() >= kMinOplogEntriesPerThread * threadPool->getStats().numThreads; - - // Only doc-locking engines support parallel writes to the oplog because they are required to - // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally, - // there would be no way to take advantage of multiple threads if a storage engine doesn't - // support document locking. - if (!enoughToMultiThread || - !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { - - threadPool->schedule(makeOplogWriterForRange(0, ops.size())); - return; - } - - - const size_t numOplogThreads = threadPool->getStats().numThreads; - const size_t numOpsPerThread = ops.size() / numOplogThreads; - for (size_t thread = 0; thread < numOplogThreads; thread++) { - size_t begin = thread * numOpsPerThread; - size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread; - threadPool->schedule(makeOplogWriterForRange(begin, end)); - } -} - /** * Caches per-collection properties which are relevant for oplog application, so that they don't * have to be retrieved repeatedly for each op. @@ -366,16 +279,6 @@ void addDerivedOps(OperationContext* opCtx, } // namespace -void SyncTail::shutdown() { - stdx::lock_guard<Latch> lock(_mutex); - _inShutdown = true; -} - -bool SyncTail::inShutdown() const { - stdx::lock_guard<Latch> lock(_mutex); - return _inShutdown; -} - Status syncApply(OperationContext* opCtx, const OplogEntryBatch& batch, OplogApplication::Mode oplogApplicationMode) { @@ -691,158 +594,5 @@ void SyncTail::fillWriterVectors(OperationContext* opCtx, } } -StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { - invariant(!ops.empty()); - - LOG(2) << "replication batch size is " << ops.size(); - - // Stop all readers until we're done. This also prevents doc-locking engines from deleting old - // entries from the oplog until we finish writing. - Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); - - auto replCoord = ReplicationCoordinator::get(opCtx); - if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { - severe() << "attempting to replicate ops while primary"; - return {ErrorCodes::CannotApplyOplogWhilePrimary, - "attempting to replicate ops while primary"}; - } - - // Increment the batch size stat. - oplogApplicationBatchSize.increment(ops.size()); - - std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads); - { - // Each node records cumulative batch application stats for itself using this timer. - TimerHolder timer(&applyBatchStats); - - // We must wait for the all work we've dispatched to complete before leaving this block - // because the spawned threads refer to objects on the stack - ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); }); - - // Write batch of ops into oplog. - if (!_options.skipWritesToOplog) { - _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp()); - scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops); - } - - // Holds 'pseudo operations' generated by secondaries to aid in replication. - // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied. - // Pseudo operations include: - // - applyOps operations expanded to individual ops. - // - ops to update config.transactions. Normal writes to config.transactions in the - // primary don't create an oplog entry, so extract info from writes with transactions - // and create a pseudo oplog. - std::vector<MultiApplier::Operations> derivedOps; - - std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); - - // Wait for writes to finish before applying ops. - _writerPool->waitForIdle(); - - // Use this fail point to hold the PBWM lock after we have written the oplog entries but - // before we have applied them. - if (MONGO_unlikely(pauseBatchApplicationAfterWritingOplogEntries.shouldFail())) { - log() << "pauseBatchApplicationAfterWritingOplogEntries fail point enabled. Blocking " - "until fail point is disabled."; - pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx); - } - - // Reset consistency markers in case the node fails while applying ops. - if (!_options.skipWritesToOplog) { - _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); - _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); - } - - { - std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK()); - - // Doles out all the work to the writer pool threads. writerVectors is not modified, - // but multiSyncApply will modify the vectors that it contains. - invariant(writerVectors.size() == statusVector.size()); - for (size_t i = 0; i < writerVectors.size(); i++) { - if (writerVectors[i].empty()) - continue; - - _writerPool->schedule( - [this, - &writer = writerVectors.at(i), - &status = statusVector.at(i), - &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) { - invariant(scheduleStatus); - - auto opCtx = cc().makeOperationContext(); - - // This code path is only executed on secondaries and initial syncing nodes, - // so it is safe to exclude any writes from Flow Control. - opCtx->setShouldParticipateInFlowControl(false); - - status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { - return _applyFunc(opCtx.get(), &writer, this, &multikeyVector); - }); - }); - } - - _writerPool->waitForIdle(); - - // If any of the statuses is not ok, return error. - for (auto it = statusVector.cbegin(); it != statusVector.cend(); ++it) { - const auto& status = *it; - if (!status.isOK()) { - severe() - << "Failed to apply batch of operations. Number of operations in batch: " - << ops.size() << ". First operation: " << redact(ops.front().toBSON()) - << ". Last operation: " << redact(ops.back().toBSON()) - << ". Oplog application failed in writer thread " - << std::distance(statusVector.cbegin(), it) << ": " << redact(status); - return status; - } - } - } - } - - // Notify the storage engine that a replication batch has completed. This means that all the - // writes associated with the oplog entries in the batch are finished and no new writes with - // timestamps associated with those oplog entries will show up in the future. - const auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); - storageEngine->replicationBatchIsComplete(); - - // Use this fail point to hold the PBWM lock and prevent the batch from completing. - if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { - log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail " - "point is disabled."; - while (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { - if (inShutdown()) { - severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting " - "clean shutdown"; - fassertFailedNoTrace(50798); - } - sleepmillis(100); - } - } - - Timestamp firstTimeInBatch = ops.front().getTimestamp(); - // Set any indexes to multikey that this batch ignored. This must be done while holding the - // parallel batch writer mode lock. - for (WorkerMultikeyPathInfo infoVector : multikeyVector) { - for (MultikeyPathInfo info : infoVector) { - // We timestamp every multikey write with the first timestamp in the batch. It is always - // safe to set an index as multikey too early, just not too late. We conservatively pick - // the first timestamp in the batch since we do not have enough information to find out - // the timestamp of the first write that set the given multikey path. - fassert(50686, - _storageInterface->setIndexIsMultikey( - opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch)); - } - } - - // Increment the counter for the number of ops applied during catchup if the node is in catchup - // mode. - replCoord->incrementNumCatchUpOpsIfCatchingUp(ops.size()); - - // We have now written all database writes and updated the oplog to match. - return ops.back().getOpTime(); -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 8fd2d9aef31..c8f5353b262 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -66,12 +66,6 @@ class OpTime; */ class SyncTail { public: - using MultiSyncApplyFunc = - std::function<Status(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; - /** * * Constructs a SyncTail. @@ -82,10 +76,7 @@ public: * of operations using 'func'. The writer thread pool is not owned by us. */ SyncTail(OplogApplier::Observer* observer, - ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, - MultiSyncApplyFunc func, - ThreadPool* writerPool, const OplogApplier::Options& options); virtual ~SyncTail(); @@ -94,35 +85,8 @@ public: */ const OplogApplier::Options& getOptions() const; - /** - * Shuts down oplogApplication() processing. - */ - void shutdown(); - - /** - * Returns true if we are shutting down. - */ - bool inShutdown() const; - - using BatchLimits = OplogApplier::BatchLimits; - /** - * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then - * using a set of threads to apply the operations. It will only apply (but will - * still write to the oplog) oplog entries with a timestamp greater than or equal to the - * beginApplyingTimestamp. - * - * If the batch application is successful, returns the optime of the last op applied, which - * should be the last op in the batch. - * Returns ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary. - * - * To provide crash resilience, this function will advance the persistent value of 'minValid' - * to at least the last optime of the batch. If 'minValid' is already greater than or equal - * to the last optime of this batch, it will not be updated. - */ - StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops); - void fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, @@ -130,7 +94,6 @@ public: private: OplogApplier::Observer* const _observer; - ReplicationConsistencyMarkers* const _consistencyMarkers; StorageInterface* const _storageInterface; void _deriveOpsAndFillWriterVectors(OperationContext* opCtx, @@ -138,21 +101,12 @@ private: std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, SessionUpdateTracker* sessionUpdateTracker) noexcept; - // Function to use during applyOps - MultiSyncApplyFunc _applyFunc; - - // Pool of worker threads for writing ops to the databases. - // Not owned by us. - ThreadPool* const _writerPool; // Used to configure multiApply() behavior. const OplogApplier::Options _options; // Protects member data of SyncTail. mutable Mutex _mutex = MONGO_MAKE_LATCH("SyncTail::_mutex"); - - // Set to true if shutdown() has been called. - bool _inShutdown = false; }; /** diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index c2a18035ea7..5276f4142cc 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -109,10 +109,7 @@ public: SyncTailForTest::SyncTailForTest() : SyncTail(nullptr, // observer - nullptr, // consistency markers nullptr, // storage interface - SyncTail::MultiSyncApplyFunc(), - nullptr, // writer pool repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)) {} /** @@ -300,950 +297,22 @@ TEST_F(SyncTailTest, SyncApplyCommand) { ASSERT_TRUE(applyCmdCalled); } -DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") { - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - noopApplyOperationFn, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore(); -} - -bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, - ReplicationConsistencyMarkers* const consistencyMarkers, - StorageInterface* const storageInterface, - const NamespaceString& nss, - const CollectionOptions& options) { - auto writerPool = makeReplWriterPool(); - MultiApplier::Operations operationsApplied; - auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, - MultiApplier::OperationPtrs* operationsToApply, - SyncTail* st, - WorkerMultikeyPathInfo*) -> Status { - for (auto&& opPtr : *operationsToApply) { - operationsApplied.push_back(*opPtr); - } - return Status::OK(); - }; - createCollection(opCtx, nss, options); - - auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); - ASSERT_FALSE(op.isForCappedCollection); - - SyncTail syncTail(nullptr, - consistencyMarkers, - storageInterface, - applyOperationFn, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op})); - ASSERT_EQUALS(op.getOpTime(), lastOpTime); - - ASSERT_EQUALS(1U, operationsApplied.size()); - const auto& opApplied = operationsApplied.front(); - ASSERT_EQUALS(op, opApplied); - // "isForCappedCollection" is not parsed from raw oplog entry document. - return opApplied.isForCappedCollection; -} - -TEST_F( - SyncTailTest, - MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - ASSERT_FALSE(_testOplogEntryIsForCappedCollection( - _opCtx.get(), getConsistencyMarkers(), getStorageInterface(), nss, CollectionOptions())); -} - -TEST_F(SyncTailTest, - MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - ASSERT_TRUE(_testOplogEntryIsForCappedCollection(_opCtx.get(), - getConsistencyMarkers(), - getStorageInterface(), - nss, - createOplogCollectionOptions())); -} - TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - SyncTail syncTail(nullptr, - nullptr, - nullptr, - {}, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary)); + SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); // Collection should be created after syncApply() processes operation. ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } -class MultiOplogEntrySyncTailTest : public SyncTailTest { -public: - MultiOplogEntrySyncTailTest() : _nss1("test.preptxn1"), _nss2("test.preptxn2"), _txnNum(1) {} - -protected: - void setUp() override { - SyncTailTest::setUp(); - const NamespaceString cmdNss{"admin", "$cmd"}; - - _uuid1 = createCollectionWithUuid(_opCtx.get(), _nss1); - _uuid2 = createCollectionWithUuid(_opCtx.get(), _nss2); - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - - _lsid = makeLogicalSessionId(_opCtx.get()); - - _insertOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 1), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" - << BSON("_id" << 1))) - << "partialTxn" << true), - _lsid, - _txnNum, - StmtId(0), - OpTime()); - _insertOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 2), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o" - << BSON("_id" << 2))) - << "partialTxn" << true), - _lsid, - _txnNum, - StmtId(1), - _insertOp1->getOpTime()); - _commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 3), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o" - << BSON("_id" << 3)))), - _lsid, - _txnNum, - StmtId(2), - _insertOp2->getOpTime()); - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - stdx::lock_guard<Latch> lock(_insertMutex); - if (nss.isOplog() || nss == _nss1 || nss == _nss2 || - nss == NamespaceString::kSessionTransactionsTableNamespace) { - _insertedDocs[nss].insert(_insertedDocs[nss].end(), docs.begin(), docs.end()); - } else - FAIL("Unexpected insert") << " into " << nss << " first doc: " << docs.front(); - }; - - _writerPool = makeReplWriterPool(); - } - - void tearDown() override { - SyncTailTest::tearDown(); - } - - void checkTxnTable(const LogicalSessionId& lsid, - const TxnNumber& txnNum, - const repl::OpTime& expectedOpTime, - Date_t expectedWallClock, - boost::optional<repl::OpTime> expectedStartOpTime, - DurableTxnStateEnum expectedState) { - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - expectedOpTime, - expectedWallClock, - expectedStartOpTime, - expectedState); - } - - std::vector<BSONObj>& oplogDocs() { - return _insertedDocs[NamespaceString::kRsOplogNamespace]; - } - -protected: - NamespaceString _nss1; - NamespaceString _nss2; - boost::optional<UUID> _uuid1; - boost::optional<UUID> _uuid2; - LogicalSessionId _lsid; - TxnNumber _txnNum; - boost::optional<OplogEntry> _insertOp1, _insertOp2; - boost::optional<OplogEntry> _commitOp; - std::map<NamespaceString, std::vector<BSONObj>> _insertedDocs; - std::unique_ptr<ThreadPool> _writerPool; - -private: - Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntrySyncTailTest::_insertMutex"); -}; - -TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - // Apply a batch with only the first operation. This should result in the first oplog entry - // being put in the oplog and updating the transaction table, but not actually being applied - // because they are part of a pending transaction. - const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1})); - ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _insertOp1->getOpTime(), - _insertOp1->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Apply a batch with only the second operation. This should result in the second oplog entry - // being put in the oplog, but with no effect because the operation is part of a pending - // transaction. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2})); - ASSERT_EQ(2U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - // The transaction table should not have been updated for partialTxn operations that are not the - // first in a transaction. - checkTxnTable(_lsid, - _txnNum, - _insertOp1->getOpTime(), - _insertOp1->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Apply a batch with only the commit. This should result in the commit being put in the - // oplog, and the two previous entries being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp})); - ASSERT_EQ(3U, oplogDocs().size()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - ASSERT_BSONOBJ_EQ(oplogDocs().back(), _commitOp->getRaw()); - checkTxnTable(_lsid, - _txnNum, - _commitOp->getOpTime(), - _commitOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { - // Skipping writes to oplog proves we're testing the code path which does not rely on reading - // the oplog. - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kRecovering)); - - // Apply both inserts and the commit in a single batch. We expect no oplog entries to - // be inserted (because we've set skipWritesToOplog), and both entries to be committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp})); - ASSERT_EQ(0U, oplogDocs().size()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - _commitOp->getOpTime(), - _commitOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { - // Tests an unprepared transaction with ops both in the batch with the commit and prior - // batches. - // Populate transaction with 4 linked inserts, one in nss2 and the others in nss1. - std::vector<OplogEntry> insertOps; - std::vector<BSONObj> insertDocs; - - const NamespaceString cmdNss{"admin", "$cmd"}; - for (int i = 0; i < 4; i++) { - insertDocs.push_back(BSON("_id" << i)); - insertOps.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), i + 1), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << (i == 1 ? _nss2.ns() : _nss1.ns()) << "ui" - << (i == 1 ? *_uuid2 : *_uuid1) << "o" - << insertDocs.back())) - << "partialTxn" << true), - _lsid, - _txnNum, - StmtId(i), - i == 0 ? OpTime() : insertOps.back().getOpTime())); - } - auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 5), 1LL}, - cmdNss, - BSON("applyOps" << BSONArray()), - _lsid, - _txnNum, - StmtId(4), - insertOps.back().getOpTime()); - - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - // Insert the first entry in its own batch. This should result in the oplog entry being written - // but the entry should not be applied as it is part of a pending transaction. - const auto expectedStartOpTime = insertOps[0].getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]})); - ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_EQ(0U, _insertedDocs[_nss1].size()); - ASSERT_EQ(0U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - insertOps[0].getOpTime(), - insertOps[0].getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Insert the rest of the entries, including the commit. These entries should be added to the - // oplog, and all the entries including the first should be applied. - ASSERT_OK( - syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp})); - ASSERT_EQ(5U, oplogDocs().size()); - ASSERT_EQ(3U, _insertedDocs[_nss1].size()); - ASSERT_EQ(1U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - - // Check docs and ordering of docs in nss1. - // The insert into nss2 is unordered with respect to those. - ASSERT_BSONOBJ_EQ(insertDocs[0], _insertedDocs[_nss1][0]); - ASSERT_BSONOBJ_EQ(insertDocs[1], _insertedDocs[_nss2].front()); - ASSERT_BSONOBJ_EQ(insertDocs[2], _insertedDocs[_nss1][1]); - ASSERT_BSONOBJ_EQ(insertDocs[3], _insertedDocs[_nss1][2]); -} - -TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) { - // Tests that two transactions on the same session ID in the same batch both - // apply correctly. - TxnNumber txnNum1(1); - TxnNumber txnNum2(2); - - - std::vector<OplogEntry> insertOps1, insertOps2; - const NamespaceString cmdNss{"admin", "$cmd"}; - insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 1), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" - << BSON("_id" << 1))) - << "partialTxn" << true), - _lsid, - txnNum1, - StmtId(0), - OpTime())); - insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 2), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" - << BSON("_id" << 2))) - << "partialTxn" << true), - - _lsid, - txnNum1, - StmtId(1), - insertOps1.back().getOpTime())); - insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(2), 1), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" - << BSON("_id" << 3))) - << "partialTxn" << true), - _lsid, - txnNum2, - StmtId(0), - OpTime())); - insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(2), 2), 1LL}, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" - << BSON("_id" << 4))) - << "partialTxn" << true), - _lsid, - txnNum2, - StmtId(1), - insertOps2.back().getOpTime())); - auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 3), 1LL}, - _nss1, - BSON("applyOps" << BSONArray()), - _lsid, - txnNum1, - StmtId(2), - insertOps1.back().getOpTime()); - auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 3), 1LL}, - _nss1, - BSON("applyOps" << BSONArray()), - _lsid, - txnNum2, - StmtId(2), - insertOps2.back().getOpTime()); - - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - // Note the insert counter so we can check it later. It is necessary to use opCounters as - // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in - // the opObserver. - int insertsBefore = replOpCounters.getInsert()->load(); - // Insert all the oplog entries in one batch. All inserts should be executed, in order, exactly - // once. - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), - {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2})); - ASSERT_EQ(6U, oplogDocs().size()); - ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore); - ASSERT_EQ(4U, _insertedDocs[_nss1].size()); - checkTxnTable(_lsid, - txnNum2, - commitOp2.getOpTime(), - commitOp2.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - - // Check docs and ordering of docs in nss1. - ASSERT_BSONOBJ_EQ(BSON("_id" << 1), _insertedDocs[_nss1][0]); - ASSERT_BSONOBJ_EQ(BSON("_id" << 2), _insertedDocs[_nss1][1]); - ASSERT_BSONOBJ_EQ(BSON("_id" << 3), _insertedDocs[_nss1][2]); - ASSERT_BSONOBJ_EQ(BSON("_id" << 4), _insertedDocs[_nss1][3]); -} - - -class MultiOplogEntryPreparedTransactionTest : public MultiOplogEntrySyncTailTest { -protected: - void setUp() override { - MultiOplogEntrySyncTailTest::setUp(); - - _prepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 3), 1LL}, - _nss1, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss2.ns() << "ui" << *_uuid2 << "o" - << BSON("_id" << 3))) - << "prepare" << true), - _lsid, - _txnNum, - StmtId(2), - _insertOp2->getOpTime()); - _singlePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 3), 1LL}, - _nss1, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << _nss1.ns() << "ui" << *_uuid1 << "o" - << BSON("_id" << 0))) - << "prepare" << true), - _lsid, - _txnNum, - StmtId(0), - OpTime()); - _commitPrepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 4), 1LL}, - _nss1, - BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)), - _lsid, - _txnNum, - StmtId(3), - _prepareWithPrevOp->getOpTime()); - _commitSinglePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 4), 1LL}, - _nss1, - BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)), - _lsid, - _txnNum, - StmtId(1), - _prepareWithPrevOp->getOpTime()); - _abortPrepareWithPrevOp = - makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL}, - _nss1, - BSON("abortTransaction" << 1), - _lsid, - _txnNum, - StmtId(3), - _prepareWithPrevOp->getOpTime()); - _abortSinglePrepareApplyOp = _abortPrepareWithPrevOp = - makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL}, - _nss1, - BSON("abortTransaction" << 1), - _lsid, - _txnNum, - StmtId(1), - _singlePrepareApplyOp->getOpTime()); - } - -protected: - boost::optional<OplogEntry> _commitPrepareWithPrevOp, _abortPrepareWithPrevOp, - _singlePrepareApplyOp, _prepareWithPrevOp, _commitSinglePrepareApplyOp, - _abortSinglePrepareApplyOp; - -private: - Mutex _insertMutex = MONGO_MAKE_LATCH("MultiOplogEntryPreparedTransactionTest::_insertMutex"); -}; - -TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionSteadyState) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - // Apply a batch with the insert operations. This should result in the oplog entries - // being put in the oplog and updating the transaction table, but not actually being applied - // because they are part of a pending transaction. - const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); - ASSERT_EQ(2U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); - ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _insertOp1->getOpTime(), - _insertOp1->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Apply a batch with only the prepare. This should result in the prepare being put in the - // oplog, and the two previous entries being applied (but in a transaction) along with the - // nested insert in the prepare oplog entry. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); - ASSERT_EQ(3U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - _prepareWithPrevOp->getOpTime(), - _prepareWithPrevOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the commit. This should result in the commit being put in the - // oplog, and the three previous entries being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); - ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - _commitPrepareWithPrevOp->getOpTime(), - _commitPrepareWithPrevOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - // Apply a batch with the insert operations. This should result in the oplog entries - // being put in the oplog and updating the transaction table, but not actually being applied - // because they are part of a pending transaction. - const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); - checkTxnTable(_lsid, - _txnNum, - _insertOp1->getOpTime(), - _insertOp1->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Apply a batch with only the prepare. This should result in the prepare being put in the - // oplog, and the two previous entries being applied (but in a transaction) along with the - // nested insert in the prepare oplog entry. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); - checkTxnTable(_lsid, - _txnNum, - _prepareWithPrevOp->getOpTime(), - _prepareWithPrevOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the abort. This should result in the abort being put in the - // oplog and the transaction table being updated accordingly. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp})); - ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - _abortPrepareWithPrevOp->getOpTime(), - _abortPrepareWithPrevOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kAborted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInitialSync) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); - - // Apply a batch with the insert operations. This should result in the oplog entries - // being put in the oplog and updating the transaction table, but not actually being applied - // because they are part of a pending transaction. - const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); - ASSERT_EQ(2U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); - ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _insertOp1->getOpTime(), - _insertOp1->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Apply a batch with only the prepare applyOps. This should result in the prepare being put in - // the oplog, but, since this is initial sync, nothing else. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); - ASSERT_EQ(3U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _prepareWithPrevOp->getOpTime(), - _prepareWithPrevOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the commit. This should result in the commit being put in the - // oplog, and the three previous entries being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); - ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - _commitPrepareWithPrevOp->getOpTime(), - _commitPrepareWithPrevOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionRecovery) { - // For recovery, the oplog must contain the operations before starting. - for (auto&& entry : - {*_insertOp1, *_insertOp2, *_prepareWithPrevOp, *_commitPrepareWithPrevOp}) { - ASSERT_OK(getStorageInterface()->insertDocument( - _opCtx.get(), - NamespaceString::kRsOplogNamespace, - {entry.toBSON(), entry.getOpTime().getTimestamp()}, - entry.getOpTime().getTerm())); - } - // Ignore docs inserted into oplog in setup. - oplogDocs().clear(); - - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kRecovering)); - - // Apply a batch with the insert operations. This should have no effect, because this is - // recovery. - const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); - ASSERT_TRUE(oplogDocs().empty()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _insertOp1->getOpTime(), - _insertOp1->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Apply a batch with only the prepare applyOps. This should have no effect, since this is - // recovery. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); - ASSERT_TRUE(oplogDocs().empty()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _prepareWithPrevOp->getOpTime(), - _prepareWithPrevOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the commit. This should result in the the three previous entries - // being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); - ASSERT_TRUE(oplogDocs().empty()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - checkTxnTable(_lsid, - _txnNum, - _commitPrepareWithPrevOp->getOpTime(), - _commitPrepareWithPrevOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedTransaction) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); - - // Apply a batch with only the prepare applyOps. This should result in the prepare being put in - // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); - ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - checkTxnTable(_lsid, - _txnNum, - _singlePrepareApplyOp->getOpTime(), - _singlePrepareApplyOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the commit. This should result in the commit being put in the - // oplog, and prepared insert being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _commitSinglePrepareApplyOp->getOpTime(), - _commitSinglePrepareApplyOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTransaction) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 3), 1LL}, - _nss1, - BSON("applyOps" << BSONArray() << "prepare" << true), - _lsid, - _txnNum, - StmtId(0), - OpTime()); - const auto expectedStartOpTime = emptyPrepareApplyOp.getOpTime(); - - // Apply a batch with only the prepare applyOps. This should result in the prepare being put in - // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp})); - ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - checkTxnTable(_lsid, - _txnNum, - emptyPrepareApplyOp.getOpTime(), - emptyPrepareApplyOp.getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the commit. This should result in the commit being put in the - // oplog, and prepared insert being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _commitSinglePrepareApplyOp->getOpTime(), - _commitSinglePrepareApplyOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPreparedTransaction) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); - // Apply a batch with only the prepare applyOps. This should result in the prepare being put in - // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); - checkTxnTable(_lsid, - _txnNum, - _singlePrepareApplyOp->getOpTime(), - _singlePrepareApplyOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the abort. This should result in the abort being put in the - // oplog and the transaction table being updated accordingly. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _abortSinglePrepareApplyOp->getOpTime(), - _abortSinglePrepareApplyOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kAborted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, - MultiApplySingleApplyOpsPreparedTransactionInitialSync) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); - - const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); - - // Apply a batch with only the prepare applyOps. This should result in the prepare being put in - // the oplog, but, since this is initial sync, nothing else. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); - ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _singlePrepareApplyOp->getOpTime(), - _singlePrepareApplyOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the commit. This should result in the commit being put in the - // oplog, and the previous entry being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _commitSinglePrepareApplyOp->getOpTime(), - _commitSinglePrepareApplyOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(MultiOplogEntryPreparedTransactionTest, - MultiApplySingleApplyOpsPreparedTransactionRecovery) { - // For recovery, the oplog must contain the operations before starting. - for (auto&& entry : {*_singlePrepareApplyOp, *_commitPrepareWithPrevOp}) { - ASSERT_OK(getStorageInterface()->insertDocument( - _opCtx.get(), - NamespaceString::kRsOplogNamespace, - {entry.toBSON(), entry.getOpTime().getTimestamp()}, - entry.getOpTime().getTerm())); - } - // Ignore docs inserted into oplog in setup. - oplogDocs().clear(); - - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - _writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kRecovering)); - - const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); - - // Apply a batch with only the prepare applyOps. This should have no effect, since this is - // recovery. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); - ASSERT_TRUE(oplogDocs().empty()); - ASSERT_TRUE(_insertedDocs[_nss1].empty()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _singlePrepareApplyOp->getOpTime(), - _singlePrepareApplyOp->getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kPrepared); - - // Apply a batch with only the commit. This should result in the previous entry being - // applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); - ASSERT_TRUE(oplogDocs().empty()); - ASSERT_EQ(1U, _insertedDocs[_nss1].size()); - ASSERT_TRUE(_insertedDocs[_nss2].empty()); - checkTxnTable(_lsid, - _txnNum, - _commitSinglePrepareApplyOp->getOpTime(), - _commitSinglePrepareApplyOp->getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - void testWorkerMultikeyPaths(OperationContext* opCtx, const OplogEntry& op, unsigned long numPaths) { - SyncTail syncTail(nullptr, - nullptr, - nullptr, - {}, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary)); + SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&op}; ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo)); @@ -1299,12 +368,8 @@ TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA); auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7)); auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); - SyncTail syncTail(nullptr, - nullptr, - nullptr, - {}, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary)); + SyncTail syncTail( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&opA, &opB}; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); @@ -1347,12 +412,7 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - SyncTail syncTail(nullptr, - nullptr, - nullptr, - {}, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary)); + SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); MultiApplier::OperationPtrs ops = {&op}; ASSERT_EQUALS(ErrorCodes::InvalidOptions, multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr)); @@ -2252,574 +1312,6 @@ TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) { ASSERT_EQUALS(0, countLogLinesContaining(expected.str())); } -class SyncTailTxnTableTest : public SyncTailTest { -public: - void setUp() override { - SyncTailTest::setUp(); - - MongoDSessionCatalog::onStepUp(_opCtx.get()); - - DBDirectClient client(_opCtx.get()); - BSONObj result; - ASSERT(client.runCommand(kNs.db().toString(), BSON("create" << kNs.coll()), result)); - } - - /** - * Creates an OplogEntry with given parameters and preset defaults for this test suite. - */ - repl::OplogEntry makeOplogEntry(const NamespaceString& ns, - repl::OpTime opTime, - repl::OpTypeEnum opType, - BSONObj object, - boost::optional<BSONObj> object2, - const OperationSessionInfo& sessionInfo, - Date_t wallClockTime) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - ns, // namespace - boost::none, // uuid - boost::none, // fromMigrate - 0, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // false - wallClockTime, // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none); // post-image optime - } - - /** - * Creates an OplogEntry with given parameters and preset defaults for this test suite. - */ - repl::OplogEntry makeOplogEntryForMigrate(const NamespaceString& ns, - repl::OpTime opTime, - repl::OpTypeEnum opType, - BSONObj object, - boost::optional<BSONObj> object2, - const OperationSessionInfo& sessionInfo, - Date_t wallClockTime) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - ns, // namespace - boost::none, // uuid - true, // fromMigrate - 0, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // false - wallClockTime, // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none); // post-image optime - } - - void checkTxnTable(const OperationSessionInfo& sessionInfo, - const repl::OpTime& expectedOpTime, - Date_t expectedWallClock) { - invariant(sessionInfo.getSessionId()); - invariant(sessionInfo.getTxnNumber()); - - repl::checkTxnTable(_opCtx.get(), - *sessionInfo.getSessionId(), - *sessionInfo.getTxnNumber(), - expectedOpTime, - expectedWallClock, - {}, - {}); - } - - static const NamespaceString& nss() { - return kNs; - } - -private: - static const NamespaceString kNs; -}; - -const NamespaceString SyncTailTxnTableTest::kNs("test.foo"); - -TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) { - const auto sessionId = makeLogicalSessionIdForTest(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(3); - const auto date = Date_t::now(); - - auto insertOp = makeOplogEntry(nss(), - {Timestamp(1, 0), 1}, - repl::OpTypeEnum::kInsert, - BSON("_id" << 1), - boost::none, - sessionInfo, - date); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp})); - - checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date); -} - -TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { - const auto sessionId = makeLogicalSessionIdForTest(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(3); - const auto date = Date_t::now(); - - auto insertOp = makeOplogEntry(nss(), - {Timestamp(1, 0), 1}, - repl::OpTypeEnum::kInsert, - BSON("_id" << 1), - boost::none, - sessionInfo, - date); - - auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, - {Timestamp(2, 0), 1}, - repl::OpTypeEnum::kDelete, - BSON("_id" << sessionInfo.getSessionId()->toBSON()), - boost::none, - {}, - Date_t::now()); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp})); - - ASSERT_FALSE(docExists( - _opCtx.get(), - NamespaceString::kSessionTransactionsTableNamespace, - BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()))); -} - -TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTable) { - const auto sessionId = makeLogicalSessionIdForTest(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(3); - auto date = Date_t::now(); - - auto insertOp = makeOplogEntry(nss(), - {Timestamp(1, 0), 1}, - repl::OpTypeEnum::kInsert, - BSON("_id" << 1), - boost::none, - sessionInfo, - date); - - auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, - {Timestamp(2, 0), 1}, - repl::OpTypeEnum::kDelete, - BSON("_id" << sessionInfo.getSessionId()->toBSON()), - boost::none, - {}, - Date_t::now()); - - date = Date_t::now(); - sessionInfo.setTxnNumber(7); - auto insertOp2 = makeOplogEntry(nss(), - {Timestamp(3, 0), 2}, - repl::OpTypeEnum::kInsert, - BSON("_id" << 6), - boost::none, - sessionInfo, - date); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2})); - - checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date); -} - -TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTable) { - const auto sessionId = makeLogicalSessionIdForTest(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(3); - auto date = Date_t::now(); - - auto insertOp = makeOplogEntry(nss(), - {Timestamp(1, 0), 1}, - repl::OpTypeEnum::kInsert, - BSON("_id" << 1), - boost::none, - sessionInfo, - date); - - repl::OpTime newWriteOpTime(Timestamp(2, 0), 1); - auto updateOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, - {Timestamp(4, 0), 1}, - repl::OpTypeEnum::kUpdate, - BSON("$set" << BSON("lastWriteOpTime" << newWriteOpTime)), - BSON("_id" << sessionInfo.getSessionId()->toBSON()), - {}, - Date_t::now()); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp})); - - checkTxnTable(sessionInfo, newWriteOpTime, date); -} - -TEST_F(SyncTailTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSession) { - const NamespaceString cmdNss{"admin", "$cmd"}; - const auto sessionId = makeLogicalSessionIdForTest(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(3); - auto date = Date_t::now(); - auto uuid = [&] { - return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid(); - }(); - - repl::OpTime retryableInsertOpTime(Timestamp(1, 0), 1); - - auto retryableInsertOp = makeOplogEntry(nss(), - retryableInsertOpTime, - repl::OpTypeEnum::kInsert, - BSON("_id" << 1), - boost::none, - sessionInfo, - date); - - repl::OpTime txnInsertOpTime(Timestamp(2, 0), 1); - sessionInfo.setTxnNumber(4); - - auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - txnInsertOpTime, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << nss().ns() << "ui" << uuid << "o" - << BSON("_id" << 2))) - << "partialTxn" << true), - sessionId, - *sessionInfo.getTxnNumber(), - StmtId(0), - OpTime()); - - repl::OpTime txnCommitOpTime(Timestamp(3, 0), 1); - auto txnCommitOp = - makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime, - cmdNss, - BSON("applyOps" << BSONArray()), - sessionId, - *sessionInfo.getTxnNumber(), - StmtId(1), - txnInsertOpTime); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp})); - - repl::checkTxnTable(_opCtx.get(), - *sessionInfo.getSessionId(), - *sessionInfo.getTxnNumber(), - txnCommitOpTime, - txnCommitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); -} - -TEST_F(SyncTailTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSession) { - const NamespaceString cmdNss{"admin", "$cmd"}; - const auto sessionId = makeLogicalSessionIdForTest(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(3); - auto date = Date_t::now(); - auto uuid = [&] { - return AutoGetCollectionForRead(_opCtx.get(), nss()).getCollection()->uuid(); - }(); - - repl::OpTime txnInsertOpTime(Timestamp(1, 0), 1); - auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId( - txnInsertOpTime, - cmdNss, - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << nss().ns() << "ui" << uuid << "o" - << BSON("_id" << 2))) - << "partialTxn" << true), - sessionId, - *sessionInfo.getTxnNumber(), - StmtId(0), - OpTime()); - - repl::OpTime txnCommitOpTime(Timestamp(2, 0), 1); - auto txnCommitOp = - makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime, - cmdNss, - BSON("applyOps" << BSONArray()), - sessionId, - *sessionInfo.getTxnNumber(), - StmtId(1), - txnInsertOpTime); - - repl::OpTime retryableInsertOpTime(Timestamp(3, 0), 1); - sessionInfo.setTxnNumber(4); - - auto retryableInsertOp = makeOplogEntry(nss(), - retryableInsertOpTime, - repl::OpTypeEnum::kInsert, - BSON("_id" << 1), - boost::none, - sessionInfo, - date); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp})); - - repl::checkTxnTable(_opCtx.get(), - *sessionInfo.getSessionId(), - *sessionInfo.getTxnNumber(), - retryableInsertOpTime, - retryableInsertOp.getWallClockTime(), - boost::none, - boost::none); -} - - -TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { - NamespaceString ns0("test.0"); - NamespaceString ns1("test.1"); - NamespaceString ns2("test.2"); - NamespaceString ns3("test.3"); - - DBDirectClient client(_opCtx.get()); - BSONObj result; - ASSERT(client.runCommand(ns0.db().toString(), BSON("create" << ns0.coll()), result)); - ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result)); - ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result)); - ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result)); - auto uuid0 = [&] { - return AutoGetCollectionForRead(_opCtx.get(), ns0).getCollection()->uuid(); - }(); - auto uuid1 = [&] { - return AutoGetCollectionForRead(_opCtx.get(), ns1).getCollection()->uuid(); - }(); - auto uuid2 = [&] { - return AutoGetCollectionForRead(_opCtx.get(), ns2).getCollection()->uuid(); - }(); - - // Entries with a session id and a txnNumber update the transaction table. - auto lsidSingle = makeLogicalSessionIdForTest(); - auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0); - - // For entries with the same session, the entry with a larger txnNumber is saved. - auto lsidDiffTxn = makeLogicalSessionIdForTest(); - auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1); - auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1); - - // For entries with the same session and txnNumber, the later optime is saved. - auto lsidSameTxn = makeLogicalSessionIdForTest(); - auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0); - auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( - {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1); - - // Entries with a session id but no txnNumber do not lead to updates. - auto lsidNoTxn = makeLogicalSessionIdForTest(); - OperationSessionInfo info; - info.setSessionId(lsidNoTxn); - auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo( - {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), - {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn})); - - // The txnNum and optime of the only write were saved. - auto resultSingleDoc = - client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), - BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON())); - ASSERT_TRUE(!resultSingleDoc.isEmpty()); - - auto resultSingle = - SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc); - - ASSERT_EQ(resultSingle.getTxnNum(), 5LL); - ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1)); - - // The txnNum and optime of the write with the larger txnNum were saved. - auto resultDiffTxnDoc = - client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), - BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON())); - ASSERT_TRUE(!resultDiffTxnDoc.isEmpty()); - - auto resultDiffTxn = - SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc); - - ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL); - ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1)); - - // The txnNum and optime of the write with the later optime were saved. - auto resultSameTxnDoc = - client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), - BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON())); - ASSERT_TRUE(!resultSameTxnDoc.isEmpty()); - - auto resultSameTxn = - SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc); - - ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL); - ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1)); - - // There is no entry for the write with no txnNumber. - auto resultNoTxn = - client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), - BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON())); - ASSERT_TRUE(resultNoTxn.isEmpty()); -} - -TEST_F(SyncTailTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) { - const auto insertLsid = makeLogicalSessionIdForTest(); - OperationSessionInfo insertSessionInfo; - insertSessionInfo.setSessionId(insertLsid); - insertSessionInfo.setTxnNumber(3); - auto date = Date_t::now(); - - auto innerOplog = makeOplogEntry(nss(), - {Timestamp(10, 10), 1}, - repl::OpTypeEnum::kInsert, - BSON("_id" << 1), - boost::none, - insertSessionInfo, - date); - - auto outerInsertDate = Date_t::now(); - auto insertOplog = makeOplogEntryForMigrate(nss(), - {Timestamp(40, 0), 1}, - repl::OpTypeEnum::kNoop, - BSON("$sessionMigrateInfo" << 1), - innerOplog.toBSON(), - insertSessionInfo, - outerInsertDate); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog})); - - checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate); -} - -TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) { - const auto preImageLsid = makeLogicalSessionIdForTest(); - OperationSessionInfo preImageSessionInfo; - preImageSessionInfo.setSessionId(preImageLsid); - preImageSessionInfo.setTxnNumber(3); - auto preImageDate = Date_t::now(); - - auto preImageOplog = makeOplogEntryForMigrate(nss(), - {Timestamp(30, 0), 1}, - repl::OpTypeEnum::kNoop, - BSON("_id" << 1), - boost::none, - preImageSessionInfo, - preImageDate); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog})); - - ASSERT_FALSE(docExists(_opCtx.get(), - NamespaceString::kSessionTransactionsTableNamespace, - BSON(SessionTxnRecord::kSessionIdFieldName - << preImageSessionInfo.getSessionId()->toBSON()))); -} - -TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) { - const auto lsid = makeLogicalSessionIdForTest(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(lsid); - sessionInfo.setTxnNumber(3); - - auto oplog = makeOplogEntry(nss(), - {Timestamp(30, 0), 1}, - repl::OpTypeEnum::kNoop, - BSON("_id" << 1), - boost::none, - sessionInfo, - Date_t::now()); - - auto writerPool = makeReplWriterPool(); - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - multiSyncApply, - writerPool.get(), - OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog})); - - ASSERT_FALSE(docExists( - _opCtx.get(), - NamespaceString::kSessionTransactionsTableNamespace, - BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()))); -} - TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) { // Create a BSON "emptycapped" command. auto emptyCappedCmd = BSON("emptycapped" << nss.coll()); diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 4bca63586f3..bd5fe5c9d13 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -204,12 +204,8 @@ Status SyncTailTest::runOpSteadyState(const OplogEntry& op) { } Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) { - SyncTail syncTail(nullptr, - getConsistencyMarkers(), - getStorageInterface(), - SyncTail::MultiSyncApplyFunc(), - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary)); + SyncTail syncTail( + nullptr, getStorageInterface(), OplogApplier::Options(OplogApplication::Mode::kSecondary)); MultiApplier::OperationPtrs opsPtrs; for (auto& op : ops) { opsPtrs.push_back(&op); @@ -224,10 +220,7 @@ Status SyncTailTest::runOpInitialSync(const OplogEntry& op) { Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { SyncTail syncTail(nullptr, - getConsistencyMarkers(), getStorageInterface(), - SyncTail::MultiSyncApplyFunc(), - nullptr, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD // operations provided by idempotency tests. @@ -245,10 +238,7 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { Status SyncTailTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) { SyncTail syncTail(nullptr, - getConsistencyMarkers(), getStorageInterface(), - SyncTail::MultiSyncApplyFunc(), - nullptr, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD // operations provided by idempotency tests. diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 597b84fde43..45b72b54edc 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1308,9 +1308,10 @@ public: nullptr, // task executor. not required for multiApply(). nullptr, // oplog buffer. not required for multiApply(). &observer, - nullptr, // replication coordinator. not required for multiApply(). + _coordinatorMock, _consistencyMarkers, storageInterface, + repl::multiSyncApply, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops))); @@ -1392,9 +1393,10 @@ public: nullptr, // task executor. not required for multiApply(). nullptr, // oplog buffer. not required for multiApply(). &observer, - nullptr, // replication coordinator. not required for multiApply(). + _coordinatorMock, _consistencyMarkers, storageInterface, + repl::multiSyncApply, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), writerPool.get()); auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)); @@ -2440,17 +2442,21 @@ public: << "ns" << ns.ns() << "ui" << uuid << "wall" << Date_t() << "o" << doc0)); + DoNothingOplogApplierObserver observer; // Apply the operation. auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::makeReplWriterPool(1); - repl::SyncTail syncTail( - nullptr, + repl::OplogApplierImpl oplogApplier( + nullptr, // task executor. not required for multiApply(). + nullptr, // oplog buffer. not required for multiApply(). + &observer, + _coordinatorMock, _consistencyMarkers, storageInterface, applyOperationFn, - writerPool.get(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary)); - auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp})); + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + writerPool.get()); + auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, {insertOp})); ASSERT_EQ(insertOp.getOpTime(), lastOpTime); joinGuard.dismiss(); |