diff options
29 files changed, 2814 insertions, 3340 deletions
diff --git a/jstests/replsets/log_secondary_oplog_application.js b/jstests/replsets/log_secondary_oplog_application.js index 2e8647bf0c1..8c213bb494f 100644 --- a/jstests/replsets/log_secondary_oplog_application.js +++ b/jstests/replsets/log_secondary_oplog_application.js @@ -45,7 +45,7 @@ assert.throws(function() { /** * Part 2: Issue a slow op and make sure that we *do* log it. - * We use a failpoint in syncApply which blocks after we read the time at the start + * We use a failpoint in applyOplogEntryBatch which blocks after we read the time at the start * of the application of the op, and we wait there to simulate slowness. */ @@ -62,8 +62,9 @@ assert.commandWorked(secondary.adminCommand( // Issue a write and make sure we've hit the failpoint before moving on. assert.commandWorked(primary.getDB(name)["slowOp"].insert({"slow": "sloth"})); -checkLog.contains(secondary, - "syncApply - fail point hangAfterRecordingOpApplicationStartTime enabled"); +checkLog.contains( + secondary, + "applyOplogEntryBatch - fail point hangAfterRecordingOpApplicationStartTime enabled"); // Wait for an amount of time safely above the "slowMS" we set. sleep(0.5 * 1000); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index ae9b95ab902..a0283ce3ae5 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -500,23 +500,6 @@ env.Library( ], ) - -env.Library( - target='oplog_applier_impl_test_fixture', - source=[ - 'oplog_applier_impl_test_fixture.cpp', - ], - LIBDEPS=[ - 'drop_pending_collection_reaper', - 'oplog_application', - 'replmocks', - 'storage_interface_impl', - '$BUILD_DIR/mongo/db/catalog/document_validation', - '$BUILD_DIR/mongo/db/s/op_observer_sharding_impl', - '$BUILD_DIR/mongo/db/service_context_d_test_fixture', - ], -) - env.Library( target='oplog_application_interface', source=[ @@ -542,7 +525,6 @@ env.Library( 'oplog_applier_impl.cpp', 'opqueue_batcher.cpp', 'session_update_tracker.cpp', - 'sync_tail.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authorization_manager_global', @@ -575,7 +557,7 @@ env.Library( 'idempotency_test_fixture.cpp', ], LIBDEPS=[ - 'sync_tail_test_fixture', + 'oplog_applier_impl_test_fixture', '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/catalog/collection_validation', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', @@ -583,9 +565,9 @@ env.Library( ) env.Library( - target='sync_tail_test_fixture', + target='oplog_applier_impl_test_fixture', source=[ - 'sync_tail_test_fixture.cpp', + 'oplog_applier_impl_test_fixture.cpp', ], LIBDEPS=[ 'drop_pending_collection_reaper', @@ -1274,7 +1256,6 @@ env.CppUnitTest( 'split_horizon_test.cpp', 'storage_interface_impl_test.cpp', 'sync_source_resolver_test.cpp', - 'sync_tail_test.cpp', 'task_runner_test.cpp', 'task_runner_test_fixture.cpp', 'vote_requester_test.cpp', @@ -1349,7 +1330,6 @@ env.CppUnitTest( 'storage_interface_impl', 'sync_source_resolver', 'sync_source_selector_mock', - 'sync_tail_test_fixture', 'task_executor_mock', 'task_runner', ], diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index e7cb221c92b..357f5959aa2 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -143,7 +143,7 @@ std::unique_ptr<OplogApplier> DataReplicatorExternalStateImpl::makeOplogApplier( _replicationCoordinator, consistencyMarkers, storageInterface, - multiSyncApply, + applyOplogGroup, options, writerPool); } diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index adae4b62607..185e4f0dd4c 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -53,6 +53,8 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/oplog_applier_impl.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" @@ -96,6 +98,20 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none); // post-image optime } +/** + * Test only subclass of OplogApplierImpl for using fillWriterVectors in tests. + */ +class OplogApplierImplForTest : public OplogApplierImpl { +public: + OplogApplierImplForTest(const OplogApplier::Options& options); + using OplogApplierImpl::fillWriterVectors; +}; + +// Minimal constructor that takes options, the only member accessed in fillWriterVectors. +OplogApplierImplForTest::OplogApplierImplForTest(const OplogApplier::Options& options) + : OplogApplierImpl( + nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, options, nullptr) {} + } // namespace /** @@ -380,10 +396,8 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence {entry.toBSON(), entry.getOpTime().getTimestamp()}, entry.getOpTime().getTerm())); } - - SyncTail syncTail(nullptr, // observer - nullptr, // storage interface - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); + OplogApplier::Options option(OplogApplication::Mode::kInitialSync); + OplogApplierImplForTest oplogApplier(option); std::vector<MultiApplier::OperationPtrs> writerVectors(1); std::vector<MultiApplier::Operations> derivedOps; @@ -394,7 +408,7 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence std::vector<OplogEntry> op; op.push_back(entry); singleOpVectors.emplace_back(op); - syncTail.fillWriterVectors( + oplogApplier.fillWriterVectors( _opCtx.get(), &singleOpVectors.back(), &writerVectors, &derivedOps); } diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h index 5443e79ce4d..9e6d14cb228 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.h +++ b/src/mongo/db/repl/idempotency_test_fixture.h @@ -40,9 +40,9 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" -#include "mongo/db/repl/sync_tail_test_fixture.h" #include "mongo/util/duration.h" #include "mongo/util/uuid.h" @@ -87,7 +87,7 @@ std::ostream& operator<<(std::ostream& stream, const CollectionState& state); StringBuilderImpl<SharedBufferAllocator>& operator<<(StringBuilderImpl<SharedBufferAllocator>& sb, const CollectionState& state); -class IdempotencyTest : public SyncTailTest { +class IdempotencyTest : public OplogApplierImplTest { protected: enum class SequenceType : int { kEntireSequence, kAnyPrefix, kAnySuffix, kAnyPrefixOrSuffix }; OplogEntry createCollection(CollectionUUID uuid = UUID::gen()); diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index c83f4c134df..adc891bc5d1 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -96,7 +96,7 @@ struct InitialSyncerOptions { // InitialSyncer waits this long before retrying getApplierBatchCallback() if there are // currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active. - // This default value is based on the duration in SyncTail::OpQueueBatcher::run(). + // This default value is based on the duration in OpQueueBatcher::run(). Milliseconds getApplierBatchCallbackRetryWait{1000}; // Replication settings diff --git a/src/mongo/db/repl/insert_group.cpp b/src/mongo/db/repl/insert_group.cpp index 5c8f39be097..3c0bd54e25a 100644 --- a/src/mongo/db/repl/insert_group.cpp +++ b/src/mongo/db/repl/insert_group.cpp @@ -38,8 +38,8 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/repl/oplog_applier_impl.h" #include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/repl/sync_tail.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -127,7 +127,7 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(ConstIt OplogEntryBatch groupedInsertBatch(it, endOfGroupableOpsIterator); try { // Apply the group of inserts by passing in groupedInsertBatch. - uassertStatusOK(syncApply(_opCtx, groupedInsertBatch, _mode)); + uassertStatusOK(applyOplogEntryBatch(_opCtx, groupedInsertBatch, _mode)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. return endOfGroupableOpsIterator - 1; diff --git a/src/mongo/db/repl/insert_group.h b/src/mongo/db/repl/insert_group.h index 4aeffa3780e..e96aeece12d 100644 --- a/src/mongo/db/repl/insert_group.h +++ b/src/mongo/db/repl/insert_group.h @@ -32,7 +32,7 @@ #include "mongo/base/status_with.h" #include "mongo/db/repl/multiapplier.h" -#include "mongo/db/repl/sync_tail.h" +#include "mongo/db/repl/oplog_applier.h" namespace mongo { namespace repl { @@ -68,7 +68,7 @@ private: // Used for constructing search bounds when grouping inserts. ConstIterator _end; - // Passed to _syncApply when applying grouped inserts. + // Passed to applyOplogEntryBatch when applying grouped inserts. OperationContext* _opCtx; Mode _mode; }; diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 0bb82669ddc..6906a0897e7 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -37,7 +37,6 @@ #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/repl_server_parameters_gen.h" -#include "mongo/db/repl/sync_tail.h" #include "mongo/util/log.h" #include "mongo/util/time_support.h" @@ -51,7 +50,7 @@ using CallbackArgs = executor::TaskExecutor::CallbackArgs; OplogApplier::OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer, - const OplogApplier::Options& options) + const Options& options) : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _options(options) {} OplogBuffer* OplogApplier::getBuffer() const { @@ -263,6 +262,10 @@ StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations return lastApplied; } +const OplogApplier::Options& OplogApplier::getOptions() const { + return _options; +} + void OplogApplier::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { // This is just to get the op off the queue; it's been peeked at and queued for application // already. diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 63be125b2bd..ee51b146d27 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -104,7 +104,9 @@ public: using Operations = std::vector<OplogEntry>; - // Used by SyncTail to access batching logic. + // TODO (SERVER-43001): This potentially violates layering as OpQueueBatcher calls an + // OplogApplier method. + // Used to access batching logic. using GetNextApplierBatchFn = std::function<StatusWith<OplogApplier::Operations>( OperationContext* opCtx, const BatchLimits& batchLimits)>; @@ -182,6 +184,8 @@ public: */ StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops); + const Options& getOptions() const; + private: /** * Pops the operation at the front of the OplogBuffer. diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index f0317f59b79..3a36fb4dba9 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -29,18 +29,38 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication +#include "mongo/db/repl/oplog_applier_impl.h" + +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/repl/apply_ops.h" +#include "mongo/db/repl/insert_group.h" +#include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/platform/basic.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" - -#include "mongo/db/repl/oplog_applier_impl.h" +#include "third_party/murmurhash3/MurmurHash3.h" namespace mongo { namespace repl { +namespace { MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion); MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationAfterWritingOplogEntries); +MONGO_FAIL_POINT_DEFINE(hangAfterRecordingOpApplicationStartTime); + +// The oplog entries applied +Counter64 opsAppliedStats; +ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats); // Tracks the oplog application batch size. Counter64 oplogApplicationBatchSize; @@ -50,6 +70,174 @@ ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply. TimerStats applyBatchStats; ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); +NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { + auto optionalUuid = oplogEntry.getUuid(); + if (!optionalUuid) { + return oplogEntry.getNss(); + } + + const auto& uuid = optionalUuid.get(); + auto& catalog = CollectionCatalog::get(opCtx); + auto nss = catalog.lookupNSSByUUID(uuid); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "No namespace with UUID " << uuid.toString(), + nss); + return *nss; +} + +NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const OplogEntry& op) { + if (auto ui = op.getUuid()) { + return {nss.db().toString(), ui.get()}; + } + return nss; +} + +/** + * Used for logging a report of ops that take longer than "slowMS" to apply. This is called + * right before returning from applyOplogEntryBatch, and it returns the same status. + */ +Status finishAndLogApply(ClockSource* clockSource, + Status finalStatus, + Date_t applyStartTime, + const OplogEntryBatch& batch) { + + if (finalStatus.isOK()) { + auto applyEndTime = clockSource->now(); + auto diffMS = durationCount<Milliseconds>(applyEndTime - applyStartTime); + + // This op was slow to apply, so we should log a report of it. + if (diffMS > serverGlobalParams.slowMS) { + + StringBuilder s; + s << "applied op: "; + + if (batch.getOp().getOpType() == OpTypeEnum::kCommand) { + s << "command "; + } else { + s << "CRUD "; + } + + s << redact(batch.toBSON()); + s << ", took " << diffMS << "ms"; + + log() << s.str(); + } + } + return finalStatus; +} + +LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMode mode) { + return nss.isSystemDotViews() ? MODE_X : mode; +} + +/** + * Caches per-collection properties which are relevant for oplog application, so that they don't + * have to be retrieved repeatedly for each op. + */ +class CachedCollectionProperties { +public: + struct CollectionProperties { + bool isCapped = false; + const CollatorInterface* collator = nullptr; + }; + + CollectionProperties getCollectionProperties(OperationContext* opCtx, + const StringMapHashedKey& ns) { + auto it = _cache.find(ns); + if (it != _cache.end()) { + return it->second; + } + + auto collProperties = getCollectionPropertiesImpl(opCtx, NamespaceString(ns.key())); + _cache[ns] = collProperties; + return collProperties; + } + +private: + CollectionProperties getCollectionPropertiesImpl(OperationContext* opCtx, + const NamespaceString& nss) { + CollectionProperties collProperties; + + auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss); + + if (!collection) { + return collProperties; + } + + collProperties.isCapped = collection->isCapped(); + collProperties.collator = collection->getDefaultCollator(); + return collProperties; + } + + StringMap<CollectionProperties> _cache; +}; + +/** + * Updates a CRUD op's hash and isForCappedCollection field if necessary. + */ +void processCrudOp(OperationContext* opCtx, + OplogEntry* op, + uint32_t* hash, + StringMapHashedKey* hashedNs, + CachedCollectionProperties* collPropertiesCache) { + const bool supportsDocLocking = + opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking(); + auto collProperties = collPropertiesCache->getCollectionProperties(opCtx, *hashedNs); + + // For doc locking engines, include the _id of the document in the hash so we get + // parallelism even if all writes are to a single collection. + // + // For capped collections, this is illegal, since capped collections must preserve + // insertion order. + if (supportsDocLocking && !collProperties.isCapped) { + BSONElement id = op->getIdElement(); + BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore, + collProperties.collator); + const size_t idHash = elementHasher.hash(id); + MurmurHash3_x86_32(&idHash, sizeof(idHash), *hash, hash); + } + + if (op->getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) { + // Mark capped collection ops before storing them to ensure we do not attempt to + // bulk insert them. + op->isForCappedCollection = true; + } +} + +/** + * Adds a single oplog entry to the appropriate writer vector. + */ +void addToWriterVector(OplogEntry* op, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + uint32_t hash) { + const uint32_t numWriters = writerVectors->size(); + auto& writer = (*writerVectors)[hash % numWriters]; + if (writer.empty()) { + writer.reserve(8); // Skip a few growth rounds + } + writer.push_back(op); +} + +/** + * Adds a set of derivedOps to writerVectors. + */ +void addDerivedOps(OperationContext* opCtx, + MultiApplier::Operations* derivedOps, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + CachedCollectionProperties* collPropertiesCache) { + for (auto&& op : *derivedOps) { + auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns()); + uint32_t hash = static_cast<uint32_t>(hashedNs.hash()); + if (op.isCrudOpType()) { + processCrudOp(opCtx, &op, &hash, &hashedNs, collPropertiesCache); + } + addToWriterVector(&op, writerVectors, hash); + } +} + +} // namespace + + namespace { class ApplyBatchFinalizer { @@ -163,7 +351,7 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, ReplicationCoordinator* replCoord, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, - MultiSyncApplyFunc func, + ApplyGroupFunc func, const OplogApplier::Options& options, ThreadPool* writerPool) : OplogApplier(executor, oplogBuffer, observer, options), @@ -172,18 +360,13 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers), _applyFunc(func), - _syncTail(observer, storageInterface, options), _beginApplyingOpTime(options.beginApplyingOpTime) {} void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { auto getNextApplierBatchFn = [this](OperationContext* opCtx, const BatchLimits& batchLimits) { return getNextApplierBatch(opCtx, batchLimits); }; - _runLoop(oplogBuffer, getNextApplierBatchFn); -} -void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer, - OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) { // We don't start data replication for arbiters at all and it's not allowed to reconfig // arbiterOnly field for any member. invariant(!_replCoord->getMemberState().arbiter()); @@ -404,9 +587,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); }); // Write batch of ops into oplog. - // TODO (SERVER-43651): _options currently belongs to SyncTail for use in the free - // function multiSyncApply; move this field to OplogApplierImpl. - if (!_syncTail.getOptions().skipWritesToOplog) { + if (!getOptions().skipWritesToOplog) { _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp()); scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops); } @@ -421,7 +602,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, std::vector<MultiApplier::Operations> derivedOps; std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - _syncTail.fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); + fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); @@ -435,7 +616,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, } // Reset consistency markers in case the node fails while applying ops. - if (!_syncTail.getOptions().skipWritesToOplog) { + if (!getOptions().skipWritesToOplog) { _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); } @@ -444,7 +625,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK()); // Doles out all the work to the writer pool threads. writerVectors is not modified, - // but multiSyncApply will modify the vectors that it contains. + // but applyOplogGroup will modify the vectors that it contains. invariant(writerVectors.size() == statusVector.size()); for (size_t i = 0; i < writerVectors.size(); i++) { if (writerVectors[i].empty()) @@ -464,7 +645,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, opCtx->setShouldParticipateInFlowControl(false); status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { - return _applyFunc(opCtx.get(), &writer, &_syncTail, &multikeyVector); + return _applyFunc(opCtx.get(), &writer, this, &multikeyVector); }); }); } @@ -530,5 +711,329 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, return ops.back().getOpTime(); } +/** + * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops + * vector in any other way. + * writerVectors - Set of operations for each worker thread to apply. + * derivedOps - If provided, this function inserts a decomposition of applyOps operations + * and instructions for updating the transactions table. Required if processing oplogs + * with transactions. + * sessionUpdateTracker - if provided, keeps track of session info from ops. + */ +void OplogApplierImpl::_deriveOpsAndFillWriterVectors( + OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps, + SessionUpdateTracker* sessionUpdateTracker) noexcept { + + LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps; + CachedCollectionProperties collPropertiesCache; + for (auto&& op : *ops) { + // If the operation's optime is before or the same as the beginApplyingOpTime we don't want + // to apply it, so don't include it in writerVectors. + if (op.getOpTime() <= getOptions().beginApplyingOpTime) { + continue; + } + + auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns()); + // Reduce the hash from 64bit down to 32bit, just to allow combinations with murmur3 later + // on. Bit depth not important, we end up just doing integer modulo with this in the end. + // The hash function should provide entropy in the lower bits as it's used in hash tables. + uint32_t hash = static_cast<uint32_t>(hashedNs.hash()); + + // We need to track all types of ops, including type 'n' (these are generated from chunk + // migrations). + if (sessionUpdateTracker) { + if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) { + derivedOps->emplace_back(std::move(*newOplogWrites)); + addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); + } + } + + + // If this entry is part of a multi-oplog-entry transaction, ignore it until the commit. + // We must save it here because we are not guaranteed it has been written to the oplog + // yet. + // We also do this for prepare during initial sync. + if (op.isPartialTransaction() || + (op.shouldPrepare() && getOptions().mode == OplogApplication::Mode::kInitialSync)) { + auto& partialTxnList = partialTxnOps[*op.getSessionId()]; + // If this operation belongs to an existing partial transaction, partialTxnList + // must contain the previous operations of the transaction. + invariant(partialTxnList.empty() || + partialTxnList.front()->getTxnNumber() == op.getTxnNumber()); + partialTxnList.push_back(&op); + continue; + } + + if (op.getCommandType() == OplogEntry::CommandType::kAbortTransaction) { + auto& partialTxnList = partialTxnOps[*op.getSessionId()]; + partialTxnList.clear(); + } + + if (op.isCrudOpType()) + processCrudOp(opCtx, &op, &hash, &hashedNs, &collPropertiesCache); + // Extract applyOps operations and fill writers with extracted operations using this + // function. + if (op.isTerminalApplyOps()) { + auto logicalSessionId = op.getSessionId(); + // applyOps entries generated by a transaction must have a sessionId and a + // transaction number. + if (logicalSessionId && op.getTxnNumber()) { + // On commit of unprepared transactions, get transactional operations from the + // oplog and fill writers with those operations. + // Flush partialTxnList operations for current transaction. + auto& partialTxnList = partialTxnOps[*logicalSessionId]; + + derivedOps->emplace_back( + readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); + partialTxnList.clear(); + + // Transaction entries cannot have different session updates. + addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); + } else { + // The applyOps entry was not generated as part of a transaction. + invariant(!op.getPrevWriteOpTimeInTransaction()); + + derivedOps->emplace_back(ApplyOps::extractOperations(op)); + + // Nested entries cannot have different session updates. + addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); + } + continue; + } + + // If we see a commitTransaction command that is a part of a prepared transaction during + // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers + // with the extracted operations. + if (op.isPreparedCommit() && (getOptions().mode == OplogApplication::Mode::kInitialSync)) { + auto logicalSessionId = op.getSessionId(); + auto& partialTxnList = partialTxnOps[*logicalSessionId]; + + derivedOps->emplace_back( + readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); + partialTxnList.clear(); + + addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); + continue; + } + + addToWriterVector(&op, writerVectors, hash); + } +} + +void OplogApplierImpl::fillWriterVectors( + OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps) noexcept { + + SessionUpdateTracker sessionUpdateTracker; + _deriveOpsAndFillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); + + auto newOplogWrites = sessionUpdateTracker.flushAll(); + if (!newOplogWrites.empty()) { + derivedOps->emplace_back(std::move(newOplogWrites)); + _deriveOpsAndFillWriterVectors( + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + } +} + +Status applyOplogEntryBatch(OperationContext* opCtx, + const OplogEntryBatch& batch, + OplogApplication::Mode oplogApplicationMode) { + // Guarantees that applyOplogEntryBatch's context matches that of its calling function, + // applyOplogGroup. + invariant(!opCtx->writesAreReplicated()); + invariant(documentValidationDisabled(opCtx)); + + auto op = batch.getOp(); + // Count each log op application as a separate operation, for reporting purposes + CurOp individualOp(opCtx); + + const NamespaceString nss(op.getNss()); + + auto incrementOpsAppliedStats = [] { opsAppliedStats.increment(1); }; + + auto applyOp = [&](Database* db) { + // We convert updates to upserts when not in initial sync because after rollback and during + // startup we may replay an update after a delete and crash since we do not ignore + // errors. In initial sync we simply ignore these update errors so there is no reason to + // upsert. + // + // TODO (SERVER-21700): Never upsert during oplog application unless an external applyOps + // wants to. We should ignore these errors intelligently while in RECOVERING and STARTUP + // mode (similar to initial sync) instead so we do not accidentally ignore real errors. + bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync); + Status status = applyOperation_inlock( + opCtx, db, batch, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats); + if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { + throw WriteConflictException(); + } + return status; + }; + + auto clockSource = opCtx->getServiceContext()->getFastClockSource(); + auto applyStartTime = clockSource->now(); + + if (MONGO_unlikely(hangAfterRecordingOpApplicationStartTime.shouldFail())) { + log() << "applyOplogEntryBatch - fail point hangAfterRecordingOpApplicationStartTime " + "enabled. " + << "Blocking until fail point is disabled. "; + hangAfterRecordingOpApplicationStartTime.pauseWhileSet(); + } + + auto opType = op.getOpType(); + + auto finishApply = [&](Status status) { + return finishAndLogApply(clockSource, status, applyStartTime, batch); + }; + + if (opType == OpTypeEnum::kNoop) { + incrementOpsAppliedStats(); + return Status::OK(); + } else if (OplogEntry::isCrudOpType(opType)) { + return finishApply(writeConflictRetry(opCtx, "applyOplogEntryBatch_CRUD", nss.ns(), [&] { + // Need to throw instead of returning a status for it to be properly ignored. + try { + AutoGetCollection autoColl( + opCtx, getNsOrUUID(nss, op), fixLockModeForSystemDotViewsChanges(nss, MODE_IX)); + auto db = autoColl.getDb(); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "missing database (" << nss.db() << ")", + db); + OldClientContext ctx(opCtx, autoColl.getNss().ns(), db); + return applyOp(ctx.db()); + } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { + // Delete operations on non-existent namespaces can be treated as successful for + // idempotency reasons. + // During RECOVERING mode, we ignore NamespaceNotFound for all CRUD ops since + // storage does not wait for drops to be checkpointed (SERVER-33161). + if (opType == OpTypeEnum::kDelete || + oplogApplicationMode == OplogApplication::Mode::kRecovering) { + return Status::OK(); + } + + ex.addContext(str::stream() + << "Failed to apply operation: " << redact(batch.toBSON())); + throw; + } + })); + } else if (opType == OpTypeEnum::kCommand) { + return finishApply(writeConflictRetry(opCtx, "applyOplogEntryBatch_command", nss.ns(), [&] { + // A special case apply for commands to avoid implicit database creation. + Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode); + incrementOpsAppliedStats(); + return status; + })); + } + + MONGO_UNREACHABLE; +} + +void stableSortByNamespace(MultiApplier::OperationPtrs* oplogEntryPointers) { + if (oplogEntryPointers->size() < 1U) { + return; + } + auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) { + return l->getNss() < r->getNss(); + }; + std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), nssComparator); +} + +/** + * This free function is used by the thread pool workers to write ops to the db. + * This consumes the passed in OperationPtrs and callers should not make any assumptions about the + * state of the container after calling. However, this function cannot modify the pointed-to + * operations because the OperationPtrs container contains const pointers. + */ +Status applyOplogGroup(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + OplogApplierImpl* oai, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + invariant(oai); + + UnreplicatedWritesBlock uwb(opCtx); + DisableDocumentValidation validationDisabler(opCtx); + // Since we swap the locker in stash / unstash transaction resources, + // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been + // destroyed by unstash in its destructor. Thus we set the flag explicitly. + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + + // Explicitly start future read transactions without a timestamp. + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + + // When querying indexes, we return the record matching the key if it exists, or an adjacent + // document. This means that it is possible for us to hit a prepare conflict if we query for an + // incomplete key and an adjacent key is prepared. + // We ignore prepare conflicts on secondaries because they may encounter prepare conflicts that + // did not occur on the primary. + opCtx->recoveryUnit()->setPrepareConflictBehavior( + PrepareConflictBehavior::kIgnoreConflictsAllowWrites); + + stableSortByNamespace(ops); + + const auto oplogApplicationMode = oai->getOptions().mode; + + InsertGroup insertGroup(ops, opCtx, oplogApplicationMode); + + { // Ensure that the MultikeyPathTracker stops tracking paths. + ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); }); + MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo(); + + for (auto it = ops->cbegin(); it != ops->cend(); ++it) { + const OplogEntry& entry = **it; + + // If we are successful in grouping and applying inserts, advance the current iterator + // past the end of the inserted group of entries. + auto groupResult = insertGroup.groupAndApplyInserts(it); + if (groupResult.isOK()) { + it = groupResult.getValue(); + continue; + } + + // If we didn't create a group, try to apply the op individually. + try { + const Status status = applyOplogEntryBatch(opCtx, &entry, oplogApplicationMode); + + if (!status.isOK()) { + // Tried to apply an update operation but the document is missing, there must be + // a delete operation for the document later in the oplog. + if (status == ErrorCodes::UpdateOperationFailed && + oplogApplicationMode == OplogApplication::Mode::kInitialSync) { + continue; + } + + severe() << "Error applying operation (" << redact(entry.toBSON()) + << "): " << causedBy(redact(status)); + return status; + } + } catch (const DBException& e) { + // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be + // dropped before initial sync or recovery ends anyways and we should ignore it. + if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType() && + oai->getOptions().allowNamespaceNotFoundErrorsOnCrudOps) { + continue; + } + + severe() << "writer worker caught exception: " << redact(e) + << " on: " << redact(entry.toBSON()); + return e.toStatus(); + } + } + } + + invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); + invariant(workerMultikeyPathInfo->empty()); + auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo(); + if (!newPaths.empty()) { + workerMultikeyPathInfo->swap(newPaths); + } + + return Status::OK(); +} + + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index efd31736a6b..4d67f022d4d 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -38,30 +38,36 @@ #include "mongo/db/repl/opqueue_batcher.h" #include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/session_update_tracker.h" #include "mongo/db/repl/storage_interface.h" -#include "mongo/db/repl/sync_tail.h" namespace mongo { namespace repl { /** * Applies oplog entries. - * Reads from an OplogBuffer batches of operations that may be applied in parallel. + * Primarily used to apply batches of operations fetched from a sync source during steady state + * replication and initial sync. + * + * When used for steady state replication, runs a thread that reads batches of operations from + * an oplog buffer (through the BackgroundSync interface), writes them into the oplog collection, + * and applies the batch of operations. */ class OplogApplierImpl : public OplogApplier { OplogApplierImpl(const OplogApplierImpl&) = delete; OplogApplierImpl& operator=(const OplogApplierImpl&) = delete; public: - using MultiSyncApplyFunc = - std::function<Status(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; + using ApplyGroupFunc = std::function<Status(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + OplogApplierImpl* oai, + WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; /** * Constructs this OplogApplier with specific options. - * Obtains batches of operations from the OplogBuffer to apply. - * Reports oplog application progress using the Observer. + * During steady state replication, _run() obtains batches of operations to apply + * from the oplogBuffer. During the oplog application phase, the batch of operations is + * distributed across writer threads in 'writerPool'. Each writer thread applies its own vector + * of operations using 'func'. The writer thread pool is not owned by us. */ OplogApplierImpl(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, @@ -69,20 +75,18 @@ public: ReplicationCoordinator* replCoord, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, - MultiSyncApplyFunc func, + ApplyGroupFunc func, const Options& options, ThreadPool* writerPool); -private: - void _run(OplogBuffer* oplogBuffer) override; +private: /** * Runs oplog application in a loop until shutdown() is called. * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using * multiApply(). */ - void _runLoop(OplogBuffer* oplogBuffer, - OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn); + void _run(OplogBuffer* oplogBuffer) override; /** * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then @@ -99,6 +103,12 @@ private: */ StatusWith<OpTime> _multiApply(OperationContext* opCtx, MultiApplier::Operations ops); + void _deriveOpsAndFillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps, + SessionUpdateTracker* sessionUpdateTracker) noexcept; + // Not owned by us. ReplicationCoordinator* const _replCoord; @@ -111,16 +121,36 @@ private: ReplicationConsistencyMarkers* const _consistencyMarkers; // Function to use during _multiApply - MultiSyncApplyFunc _applyFunc; - - // Used to run oplog application loop. - // TODO (SERVER-43651): Remove this member once sync_tail.cpp is fully merged in. - SyncTail _syncTail; + ApplyGroupFunc _applyFunc; // Used to determine which operations should be applied during initial sync. If this is null, // we will apply all operations that were fetched. OpTime _beginApplyingOpTime = OpTime(); + +protected: + // Marked as protected for use in unit tests. + void fillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps) noexcept; }; +/** + * Applies a batch of operations. + */ +Status applyOplogEntryBatch(OperationContext* opCtx, + const OplogEntryBatch& batch, + OplogApplication::Mode oplogApplicationMode); + +/** + * This free function is used by the thread pool workers to write ops to the db. + * This consumes the passed in OperationPtrs and callers should not make any assumptions about the + * state of the container after calling. However, this function cannot modify the pointed-to + * operations because the OperationPtrs container contains const pointers. + */ +Status applyOplogGroup(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + OplogApplierImpl* oai, + WorkerMultikeyPathInfo* workerMultikeyPathInfo); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index 59268e41483..e403254ae15 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -27,26 +27,99 @@ * it in the license file. */ -#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" -// TODO SERVER-41882 This is a temporary inclusion to avoid duplicate definitions of free -// functions. Remove once sync_tail_test.cpp is fully merged in. -#include "mongo/db/repl/sync_tail_test_fixture.h" +#include "mongo/platform/basic.h" +#include <algorithm> +#include <memory> +#include <utility> +#include <vector> + +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/collection_options.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/client.h" +#include "mongo/db/commands/feature_compatibility_version_parser.h" +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/jsobj.h" #include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/idempotency_test_fixture.h" -#include "mongo/db/repl/sync_tail.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/counters.h" +#include "mongo/db/transaction_participant_gen.h" +#include "mongo/platform/mutex.h" #include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/md5.hpp" +#include "mongo/util/scopeguard.h" +#include "mongo/util/string_map.h" namespace mongo { namespace repl { namespace { /** + * Creates an OplogEntry with given parameters and preset defaults for this test suite. + */ +OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollectionUUID uuid) { + return OplogEntry(OpTime(Timestamp(1, 1), 1), // optime + boost::none, // hash + opType, // opType + nss, // namespace + uuid, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + BSON("_id" << 0), // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime +} + +/** + * Testing-only OplogApplierImpl + */ + +class OplogApplierImplForTest : public OplogApplierImpl { +public: + OplogApplierImplForTest(); +}; + +// Minimal constructor that takes options, the only member accessed in fillWriterVectors. +OplogApplierImplForTest::OplogApplierImplForTest() + : OplogApplierImpl(nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), + nullptr) {} + +/** * Creates collection options suitable for oplog. */ CollectionOptions createOplogCollectionOptions() { @@ -99,10 +172,143 @@ void createDatabase(OperationContext* opCtx, StringData dbName) { ASSERT_TRUE(justCreated); } +/** + * Returns true if collection exists. + */ +bool collectionExists(OperationContext* opCtx, const NamespaceString& nss) { + return AutoGetCollectionForRead(opCtx, nss).getCollection() != nullptr; +} + +auto parseFromOplogEntryArray(const BSONObj& obj, int elem) { + BSONElement tsArray; + Status status = + bsonExtractTypedField(obj, OpTime::kTimestampFieldName, BSONType::Array, &tsArray); + ASSERT_OK(status); + + BSONElement termArray; + status = bsonExtractTypedField(obj, OpTime::kTermFieldName, BSONType::Array, &termArray); + ASSERT_OK(status); + + return OpTime(tsArray.Array()[elem].timestamp(), termArray.Array()[elem].Long()); +}; + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchInsertDocumentDatabaseMissing) { + NamespaceString nss("test.t"); + auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); + ASSERT_THROWS( + _applyOplogEntryBatchWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), + ExceptionFor<ErrorCodes::NamespaceNotFound>); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchDeleteDocumentDatabaseMissing) { + NamespaceString otherNss("test.othername"); + auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, {}); + _testApplyOplogEntryBatchCrudOperation(ErrorCodes::OK, op, false); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchInsertDocumentCollectionLookupByUUIDFails) { + const NamespaceString nss("test.t"); + createDatabase(_opCtx.get(), nss.db()); + NamespaceString otherNss(nss.getSisterNS("othername")); + auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, kUuid); + ASSERT_THROWS( + _applyOplogEntryBatchWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), + ExceptionFor<ErrorCodes::NamespaceNotFound>); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchDeleteDocumentCollectionLookupByUUIDFails) { + const NamespaceString nss("test.t"); + createDatabase(_opCtx.get(), nss.db()); + NamespaceString otherNss(nss.getSisterNS("othername")); + auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, kUuid); + _testApplyOplogEntryBatchCrudOperation(ErrorCodes::OK, op, false); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchInsertDocumentCollectionMissing) { + const NamespaceString nss("test.t"); + createDatabase(_opCtx.get(), nss.db()); + // Even though the collection doesn't exist, this is handled in the actual application function, + // which in the case of this test just ignores such errors. This tests mostly that we don't + // implicitly create the collection and lock the database in MODE_X. + auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); + ASSERT_THROWS( + _applyOplogEntryBatchWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), + ExceptionFor<ErrorCodes::NamespaceNotFound>); + ASSERT_FALSE(collectionExists(_opCtx.get(), nss)); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchDeleteDocumentCollectionMissing) { + const NamespaceString nss("test.t"); + createDatabase(_opCtx.get(), nss.db()); + // Even though the collection doesn't exist, this is handled in the actual application function, + // which in the case of this test just ignores such errors. This tests mostly that we don't + // implicitly create the collection and lock the database in MODE_X. + auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {}); + _testApplyOplogEntryBatchCrudOperation(ErrorCodes::OK, op, false); + ASSERT_FALSE(collectionExists(_opCtx.get(), nss)); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchInsertDocumentCollectionExists) { + const NamespaceString nss("test.t"); + createCollection(_opCtx.get(), nss, {}); + auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); + _testApplyOplogEntryBatchCrudOperation(ErrorCodes::OK, op, true); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchDeleteDocumentCollectionExists) { + const NamespaceString nss("test.t"); + createCollection(_opCtx.get(), nss, {}); + auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {}); + _testApplyOplogEntryBatchCrudOperation(ErrorCodes::OK, op, false); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchInsertDocumentCollectionLockedByUUID) { + const NamespaceString nss("test.t"); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + // Test that the collection to lock is determined by the UUID and not the 'ns' field. + NamespaceString otherNss(nss.getSisterNS("othername")); + auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, uuid); + _testApplyOplogEntryBatchCrudOperation(ErrorCodes::OK, op, true); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchDeleteDocumentCollectionLockedByUUID) { + const NamespaceString nss("test.t"); + CollectionOptions options; + options.uuid = kUuid; + createCollection(_opCtx.get(), nss, options); + + // Test that the collection to lock is determined by the UUID and not the 'ns' field. + NamespaceString otherNss(nss.getSisterNS("othername")); + auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, options.uuid); + _testApplyOplogEntryBatchCrudOperation(ErrorCodes::OK, op, false); +} + +TEST_F(OplogApplierImplTest, applyOplogEntryBatchCommand) { + NamespaceString nss("test.t"); + auto op = + BSON("op" + << "c" + << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" + << BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen()); + bool applyCmdCalled = false; + _opObserver->onCreateCollectionFn = [&](OperationContext* opCtx, + Collection*, + const NamespaceString& collNss, + const CollectionOptions&, + const BSONObj&) { + applyCmdCalled = true; + ASSERT_TRUE(opCtx); + ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.db(), MODE_X)); + ASSERT_EQUALS(nss, collNss); + return Status::OK(); + }; + auto entry = OplogEntry(op); + ASSERT_OK( + _applyOplogEntryBatchWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kInitialSync)); + ASSERT_TRUE(applyCmdCalled); +} + DEATH_TEST_F(OplogApplierImplTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") { - // TODO SERVER-43344 Currently, this file only tests multiApply, so several parameters have been - // set to nullptr during OplogAppierImpl construction as they are not needed in multiApply. - // Update constructors as needed once the rest of sync_tail_test.cpp has been merged in. auto writerPool = makeReplWriterPool(); NoopOplogApplierObserver observer; OplogApplierImpl oplogApplier( @@ -128,7 +334,7 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, MultiApplier::Operations operationsApplied; auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, MultiApplier::OperationPtrs* operationsToApply, - SyncTail* st, + OplogApplierImpl* oai, WorkerMultikeyPathInfo*) -> Status { for (auto&& opPtr : *operationsToApply) { operationsApplied.push_back(*opPtr); @@ -184,6 +390,27 @@ TEST_F(OplogApplierImplTest, createOplogCollectionOptions())); } +TEST_F(OplogApplierImplTest, ApplyGroupUsesApplyOplogEntryBatchToApplyOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + + MultiApplier::OperationPtrs ops = {&op}; + WorkerMultikeyPathInfo pathInfo; + + OplogApplierImpl oplogApplier(nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary), + nullptr); + ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + // Collection should be created after applyOplogEntryBatch() processes operation. + ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); +} + class MultiOplogEntryOplogApplierImplTest : public OplogApplierImplTest { public: MultiOplogEntryOplogApplierImplTest() @@ -296,7 +523,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSepar ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -359,7 +586,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionAllAt ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), _writerPool.get()); @@ -417,7 +644,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionTwoBa ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -538,7 +765,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) { ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -650,7 +877,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -709,7 +936,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -759,7 +986,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), _writerPool.get()); // Apply a batch with the insert operations. This should result in the oplog entries @@ -828,7 +1055,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), _writerPool.get()); @@ -882,7 +1109,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); @@ -923,7 +1150,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -973,7 +1200,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -1012,7 +1239,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), _writerPool.get()); @@ -1067,7 +1294,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), _writerPool.get()); @@ -1100,6 +1327,1040 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, DurableTxnStateEnum::kCommitted); } +void testWorkerMultikeyPaths(OperationContext* opCtx, + const OplogEntry& op, + unsigned long numPaths) { + + OplogApplierImpl oplogApplier(nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary), + nullptr); + WorkerMultikeyPathInfo pathInfo; + MultiApplier::OperationPtrs ops = {&op}; + ASSERT_OK(applyOplogGroup(opCtx, &ops, &oplogApplier, &pathInfo)); + ASSERT_EQ(pathInfo.size(), numPaths); +} + +TEST_F(OplogApplierImplTest, ApplyGroupAddsWorkerMultikeyPathInfoOnInsert) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + + { + auto op = makeCreateCollectionOplogEntry( + {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + { + auto keyPattern = BSON("a" << 1); + auto op = makeCreateIndexOplogEntry( + {Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern, kUuid); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + { + auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, doc); + testWorkerMultikeyPaths(_opCtx.get(), op, 1UL); + } +} + +TEST_F(OplogApplierImplTest, ApplyGroupAddsMultipleWorkerMultikeyPathInfo) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + + { + auto op = makeCreateCollectionOplogEntry( + {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + + { + auto keyPattern = BSON("a" << 1); + auto op = makeCreateIndexOplogEntry( + {Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern, kUuid); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + + { + auto keyPattern = BSON("b" << 1); + auto op = makeCreateIndexOplogEntry( + {Timestamp(Seconds(3), 0), 1LL}, nss, "b_1", keyPattern, kUuid); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + + { + auto docA = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); + auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA); + auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7)); + auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); + + OplogApplierImpl oplogApplier(nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary), + nullptr); + WorkerMultikeyPathInfo pathInfo; + MultiApplier::OperationPtrs ops = {&opA, &opB}; + ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + ASSERT_EQ(pathInfo.size(), 2UL); + } +} + +TEST_F(OplogApplierImplTest, ApplyGroupDoesNotAddWorkerMultikeyPathInfoOnCreateIndex) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + + { + auto op = makeCreateCollectionOplogEntry( + {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + + { + auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + + { + auto keyPattern = BSON("a" << 1); + auto op = makeCreateIndexOplogEntry( + {Timestamp(Seconds(3), 0), 1LL}, nss, "a_1", keyPattern, kUuid); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } + + { + auto doc = BSON("_id" << 2 << "a" << BSON_ARRAY(6 << 7)); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); + } +} + +TEST_F(OplogApplierImplTest, ApplyGroupFailsWhenCollectionCreationTriesToMakeUUID) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_SECONDARY)); + NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); + + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + + OplogApplierImpl oplogApplier(nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary), + nullptr); + MultiApplier::OperationPtrs ops = {&op}; + ASSERT_EQUALS(ErrorCodes::InvalidOptions, + applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, nullptr)); +} + +TEST_F(OplogApplierImplTest, ApplyGroupDisablesDocumentValidationWhileApplyingOperations) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + bool onInsertsCalled = false; + _opObserver->onInsertsFn = + [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { + onInsertsCalled = true; + ASSERT_FALSE(opCtx->writesAreReplicated()); + ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); + ASSERT_TRUE(documentValidationDisabled(opCtx)); + return Status::OK(); + }; + createCollectionWithUuid(_opCtx.get(), nss); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0)); + ASSERT_OK(runOpSteadyState(op)); + ASSERT(onInsertsCalled); +} + +TEST_F(OplogApplierImplTest, + ApplyGroupPassesThroughApplyOplogEntryBatchErrorAfterFailingToApplyOperation) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + // Delete operation without _id in 'o' field. + auto op = makeDeleteDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, {}); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, runOpSteadyState(op)); +} + +TEST_F(OplogApplierImplTest, ApplyGroupPassesThroughApplyOplogEntryBatchException) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + bool onInsertsCalled = false; + _opObserver->onInsertsFn = + [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { + onInsertsCalled = true; + uasserted(ErrorCodes::OperationFailed, ""); + MONGO_UNREACHABLE; + }; + createCollectionWithUuid(_opCtx.get(), nss); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0)); + ASSERT_EQUALS(ErrorCodes::OperationFailed, runOpSteadyState(op)); + ASSERT(onInsertsCalled); +} + +TEST_F(OplogApplierImplTest, ApplyGroupSortsOperationsStablyByNamespaceBeforeApplying) { + NamespaceString nss1("test.t1"); + NamespaceString nss2("test.t2"); + NamespaceString nss3("test.t3"); + + const Seconds s(1); + unsigned int i = 1; + auto op1 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss1, BSON("_id" << 1)); + auto op2 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss1, BSON("_id" << 2)); + auto op3 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss2, BSON("_id" << 3)); + auto op4 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss3, BSON("_id" << 4)); + + std::vector<NamespaceString> nssInserted; + std::vector<BSONObj> docsInserted; + bool onInsertsCalled = false; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + onInsertsCalled = true; + for (const auto& doc : docs) { + nssInserted.push_back(nss); + docsInserted.push_back(doc); + } + }; + + createCollectionWithUuid(_opCtx.get(), nss1); + createCollectionWithUuid(_opCtx.get(), nss2); + createCollectionWithUuid(_opCtx.get(), nss3); + + ASSERT_OK(runOpsSteadyState({op4, op1, op3, op2})); + + ASSERT_EQUALS(4U, nssInserted.size()); + ASSERT_EQUALS(nss1, nssInserted[0]); + ASSERT_EQUALS(nss1, nssInserted[1]); + ASSERT_EQUALS(nss2, nssInserted[2]); + ASSERT_EQUALS(nss3, nssInserted[3]); + + ASSERT_EQUALS(4U, docsInserted.size()); + ASSERT_BSONOBJ_EQ(op1.getObject(), docsInserted[0]); + ASSERT_BSONOBJ_EQ(op2.getObject(), docsInserted[1]); + ASSERT_BSONOBJ_EQ(op3.getObject(), docsInserted[2]); + ASSERT_BSONOBJ_EQ(op4.getObject(), docsInserted[3]); + + ASSERT(onInsertsCalled); +} + +TEST_F(OplogApplierImplTest, ApplyGroupGroupsInsertOperationByNamespaceBeforeApplying) { + int seconds = 1; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss1("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + NamespaceString nss2("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_2"); + auto createOp1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss1); + auto createOp2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss2); + auto insertOp1a = makeOp(nss1); + auto insertOp1b = makeOp(nss1); + auto insertOp2a = makeOp(nss2); + auto insertOp2b = makeOp(nss2); + + // Each element in 'docsInserted' is a grouped insert operation. + std::vector<std::vector<BSONObj>> docsInserted; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + docsInserted.push_back(docs); + }; + + MultiApplier::Operations ops = { + createOp1, createOp2, insertOp1a, insertOp2a, insertOp1b, insertOp2b}; + ASSERT_OK(runOpsSteadyState(ops)); + + ASSERT_EQUALS(2U, docsInserted.size()); + + // Check grouped insert operations in namespace "nss1". + const auto& group1 = docsInserted[0]; + ASSERT_EQUALS(2U, group1.size()); + ASSERT_BSONOBJ_EQ(insertOp1a.getObject(), group1[0]); + ASSERT_BSONOBJ_EQ(insertOp1b.getObject(), group1[1]); + + // Check grouped insert operations in namespace "nss2". + const auto& group2 = docsInserted[1]; + ASSERT_EQUALS(2U, group2.size()); + ASSERT_BSONOBJ_EQ(insertOp2a.getObject(), group2[0]); + ASSERT_BSONOBJ_EQ(insertOp2b.getObject(), group2[1]); +} + +TEST_F(OplogApplierImplTest, ApplyGroupLimitsBatchCountWhenGroupingInsertOperation) { + int seconds = 1; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Generate operations to apply: + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + std::size_t limit = 64; + MultiApplier::Operations insertOps; + for (std::size_t i = 0; i < limit + 1; ++i) { + insertOps.push_back(makeOp(nss)); + } + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + + // Each element in 'docsInserted' is a grouped insert operation. + std::vector<std::vector<BSONObj>> docsInserted; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + docsInserted.push_back(docs); + }; + + ASSERT_OK(runOpsSteadyState(operationsToApply)); + + // applyOplogGroup should combine operations as follows: + // {create}, {grouped_insert}, {insert_(limit+1)} + // Ignore {create} since we are only tracking inserts. + ASSERT_EQUALS(2U, docsInserted.size()); + + const auto& groupedInsertDocuments = docsInserted[0]; + ASSERT_EQUALS(limit, groupedInsertDocuments.size()); + for (std::size_t i = 0; i < limit; ++i) { + const auto& insertOp = insertOps[i]; + ASSERT_BSONOBJ_EQ(insertOp.getObject(), groupedInsertDocuments[i]); + } + + // (limit + 1)-th insert operations should not be included in group of first (limit) inserts. + const auto& singleInsertDocumentGroup = docsInserted[1]; + ASSERT_EQUALS(1U, singleInsertDocumentGroup.size()); + ASSERT_BSONOBJ_EQ(insertOps.back().getObject(), singleInsertDocumentGroup[0]); +} + +// Create an 'insert' oplog operation of an approximate size in bytes. The '_id' of the oplog entry +// and its optime in seconds are given by the 'id' argument. +OplogEntry makeSizedInsertOp(const NamespaceString& nss, int size, int id) { + return makeInsertDocumentOplogEntry({Timestamp(Seconds(id), 0), 1LL}, + nss, + BSON("_id" << id << "data" << std::string(size, '*'))); +}; + +TEST_F(OplogApplierImplTest, ApplyGroupLimitsBatchSizeWhenGroupingInsertOperations) { + int seconds = 1; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Create a sequence of insert ops that are too large to fit in one group. + int maxBatchSize = write_ops::insertVectorMaxBytes; + int opsPerBatch = 3; + int opSize = maxBatchSize / opsPerBatch - 500; // Leave some room for other oplog fields. + + // Create the insert ops. + MultiApplier::Operations insertOps; + int numOps = 4; + for (int i = 0; i < numOps; i++) { + insertOps.push_back(makeSizedInsertOp(nss, opSize, seconds++)); + } + + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + + // Each element in 'docsInserted' is a grouped insert operation. + std::vector<std::vector<BSONObj>> docsInserted; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + docsInserted.push_back(docs); + }; + + // Apply the ops. + ASSERT_OK(runOpsSteadyState(operationsToApply)); + + // Applied ops should be as follows: + // [ {create}, INSERT_GROUP{insert 1, insert 2, insert 3}, {insert 4} ] + // Ignore {create} since we are only tracking inserts. + ASSERT_EQUALS(2U, docsInserted.size()); + + // Make sure the insert group was created correctly. + const auto& groupedInsertOpArray = docsInserted[0]; + ASSERT_EQUALS(std::size_t(opsPerBatch), groupedInsertOpArray.size()); + for (int i = 0; i < opsPerBatch; ++i) { + ASSERT_BSONOBJ_EQ(insertOps[i].getObject(), groupedInsertOpArray[i]); + } + + // Check that the last op was applied individually. + const auto& singleInsertDocumentGroup = docsInserted[1]; + ASSERT_EQUALS(1U, singleInsertDocumentGroup.size()); + ASSERT_BSONOBJ_EQ(insertOps[3].getObject(), singleInsertDocumentGroup[0]); +} + +TEST_F(OplogApplierImplTest, ApplyGroupAppliesOpIndividuallyWhenOpIndividuallyExceedsBatchSize) { + int seconds = 1; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + int maxBatchSize = write_ops::insertVectorMaxBytes; + // Create an insert op that exceeds the maximum batch size by itself. + auto insertOpLarge = makeSizedInsertOp(nss, maxBatchSize, seconds++); + auto insertOpSmall = makeSizedInsertOp(nss, 100, seconds++); + + MultiApplier::Operations operationsToApply = {createOp, insertOpLarge, insertOpSmall}; + + // Each element in 'docsInserted' is a grouped insert operation. + std::vector<std::vector<BSONObj>> docsInserted; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + docsInserted.push_back(docs); + }; + + // Apply the ops. + ASSERT_OK(runOpsSteadyState(operationsToApply)); + + // Applied ops should be as follows: + // [ {create}, {large insert} {small insert} ] + // Ignore {create} since we are only tracking inserts. + ASSERT_EQUALS(2U, docsInserted.size()); + + ASSERT_EQUALS(1U, docsInserted[0].size()); + ASSERT_BSONOBJ_EQ(insertOpLarge.getObject(), docsInserted[0][0]); + + ASSERT_EQUALS(1U, docsInserted[1].size()); + ASSERT_BSONOBJ_EQ(insertOpSmall.getObject(), docsInserted[1][0]); +} + +TEST_F(OplogApplierImplTest, + ApplyGroupAppliesInsertOpsIndividuallyWhenUnableToCreateGroupByNamespace) { + int seconds = 1; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + + auto testNs = "test." + _agent.getSuiteName() + "_" + _agent.getTestName(); + + // Create a sequence of 3 'insert' ops that can't be grouped because they are from different + // namespaces. + MultiApplier::Operations operationsToApply = {makeOp(NamespaceString(testNs + "_1")), + makeOp(NamespaceString(testNs + "_2")), + makeOp(NamespaceString(testNs + "_3"))}; + + for (const auto& oplogEntry : operationsToApply) { + createCollectionWithUuid(_opCtx.get(), oplogEntry.getNss()); + } + + // Each element in 'docsInserted' is a grouped insert operation. + std::vector<std::vector<BSONObj>> docsInserted; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + docsInserted.push_back(docs); + }; + + // Apply the ops. + ASSERT_OK(runOpsSteadyState(operationsToApply)); + + // Applied ops should be as follows i.e. no insert grouping: + // [{insert 1}, {insert 2}, {insert 3}] + ASSERT_EQ(operationsToApply.size(), docsInserted.size()); + for (std::size_t i = 0; i < operationsToApply.size(); i++) { + const auto& group = docsInserted[i]; + ASSERT_EQUALS(1U, group.size()) << i; + ASSERT_BSONOBJ_EQ(operationsToApply[i].getObject(), group[0]); + } +} + +TEST_F(OplogApplierImplTest, + ApplyGroupFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { + int seconds = 1; + auto makeOp = [&seconds](const NamespaceString& nss) { + return makeInsertDocumentOplogEntry( + {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); + }; + NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); + auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); + + // Generate operations to apply: + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + std::size_t limit = 64; + MultiApplier::Operations insertOps; + for (std::size_t i = 0; i < limit + 1; ++i) { + insertOps.push_back(makeOp(nss)); + } + MultiApplier::Operations operationsToApply; + operationsToApply.push_back(createOp); + std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); + + // Each element in 'docsInserted' is a grouped insert operation. + std::vector<std::vector<BSONObj>> docsInserted; + std::size_t numFailedGroupedInserts = 0; + _opObserver->onInsertsFn = + [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + // Reject grouped insert operations. + if (docs.size() > 1U) { + numFailedGroupedInserts++; + uasserted(ErrorCodes::OperationFailed, "grouped inserts not supported"); + } + docsInserted.push_back(docs); + }; + + ASSERT_OK(runOpsSteadyState(operationsToApply)); + + // On failing to apply the grouped insert operation, applyOplogGroup should apply the operations + // as given in "operationsToApply": + // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} + // Ignore {create} since we are only tracking inserts. + ASSERT_EQUALS(limit + 1, docsInserted.size()); + + for (std::size_t i = 0; i < limit + 1; ++i) { + const auto& insertOp = insertOps[i]; + const auto& group = docsInserted[i]; + ASSERT_EQUALS(1U, group.size()) << i; + ASSERT_BSONOBJ_EQ(insertOp.getObject(), group[0]); + } + + // Ensure that applyOplogGroup does not attempt to group remaining operations in first failed + // grouped insert operation. + ASSERT_EQUALS(1U, numFailedGroupedInserts); +} + +TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) { + OplogApplierImplForTest oplogApplier; + NamespaceString nss("test.t"); + { + Lock::GlobalWrite globalLock(_opCtx.get()); + bool justCreated = false; + auto databaseHolder = DatabaseHolder::get(_opCtx.get()); + auto db = databaseHolder->openDb(_opCtx.get(), nss.db(), &justCreated); + ASSERT_TRUE(db); + ASSERT_TRUE(justCreated); + } + auto op = makeUpdateDocumentOplogEntry( + {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); + MultiApplier::OperationPtrs ops = {&op}; + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + + // Since the document was missing when we cloned data from the sync source, the collection + // referenced by the failed operation should not be automatically created. + ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); +} + +TEST_F(OplogApplierImplTest, ApplyGroupSkipsDocumentOnNamespaceNotFoundDuringInitialSync) { + BSONObj emptyDoc; + OplogApplierImplForTest oplogApplier; + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); + auto doc1 = BSON("_id" << 1); + auto doc2 = BSON("_id" << 2); + auto doc3 = BSON("_id" << 3); + auto op0 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc1); + auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, badNss, doc2); + auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); + MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + + CollectionReader collectionReader(_opCtx.get(), nss); + ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); + ASSERT_BSONOBJ_EQ(doc3, unittest::assertGet(collectionReader.next())); + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, collectionReader.next().getStatus()); +} + +TEST_F(OplogApplierImplTest, ApplyGroupSkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) { + BSONObj emptyDoc; + OplogApplierImplForTest oplogApplier; + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); + auto doc1 = BSON("_id" << 1); + auto keyPattern = BSON("a" << 1); + auto doc3 = BSON("_id" << 3); + auto op0 = + makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); + auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc1); + auto op2 = makeCreateIndexOplogEntry( + {Timestamp(Seconds(3), 0), 1LL}, badNss, "a_1", keyPattern, kUuid); + auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); + MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + + CollectionReader collectionReader(_opCtx.get(), nss); + ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); + ASSERT_BSONOBJ_EQ(doc3, unittest::assertGet(collectionReader.next())); + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, collectionReader.next().getStatus()); + + // 'badNss' collection should not be implicitly created while attempting to create an index. + ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), badNss).getCollection()); +} + +TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnUpdate) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); + auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); + auto indexOp = + buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3), kUuid); + + auto ops = {insertOp, updateOp, indexOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 16755); +} + +TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnIndexing) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto indexOp = + buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3), kUuid); + auto dropIndexOp = dropIndex("loc_index", kUuid); + auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); + + auto ops = {indexOp, dropIndexOp, insertOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 16755); +} + +TEST_F(IdempotencyTest, Geo2dIndex) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto insertOp = insert(fromjson("{_id: 1, loc: [1]}")); + auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); + auto indexOp = buildIndex(fromjson("{loc: '2d'}"), BSONObj(), kUuid); + + auto ops = {insertOp, updateOp, indexOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 13068); +} + +TEST_F(IdempotencyTest, UniqueKeyIndex) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto insertOp = insert(fromjson("{_id: 1, x: 5}")); + auto updateOp = update(1, fromjson("{$set: {x: 6}}")); + auto insertOp2 = insert(fromjson("{_id: 2, x: 5}")); + auto indexOp = buildIndex(fromjson("{x: 1}"), fromjson("{unique: true}"), kUuid); + + auto ops = {insertOp, updateOp, insertOp2, indexOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), ErrorCodes::DuplicateKey); +} + +TEST_F(IdempotencyTest, ParallelArrayError) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + ASSERT_OK(runOpInitialSync(insert(fromjson("{_id: 1}")))); + + auto updateOp1 = update(1, fromjson("{$set: {x: [1, 2]}}")); + auto updateOp2 = update(1, fromjson("{$set: {x: 1}}")); + auto updateOp3 = update(1, fromjson("{$set: {y: [3, 4]}}")); + auto indexOp = buildIndex(fromjson("{x: 1, y: 1}"), BSONObj(), kUuid); + + auto ops = {updateOp1, updateOp2, updateOp3, indexOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), ErrorCodes::CannotIndexParallelArrays); +} + +TEST_F(IdempotencyTest, IndexWithDifferentOptions) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + ASSERT_OK(runOpInitialSync(insert(fromjson("{_id: 1, x: 'hi'}")))); + + auto indexOp1 = + buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'spanish'}"), kUuid); + auto dropIndexOp = dropIndex("x_index", kUuid); + auto indexOp2 = + buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'english'}"), kUuid); + + auto ops = {indexOp1, dropIndexOp, indexOp2}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), ErrorCodes::IndexOptionsConflict); +} + +TEST_F(IdempotencyTest, TextIndexDocumentHasNonStringLanguageField) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 1}")); + auto updateOp = update(1, fromjson("{$unset: {language: 1}}")); + auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj(), kUuid); + + auto ops = {insertOp, updateOp, indexOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 17261); +} + +TEST_F(IdempotencyTest, InsertDocumentWithNonStringLanguageFieldWhenTextIndexExists) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj(), kUuid); + auto dropIndexOp = dropIndex("x_index", kUuid); + auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 1}")); + + auto ops = {indexOp, dropIndexOp, insertOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 17261); +} + +TEST_F(IdempotencyTest, TextIndexDocumentHasNonStringLanguageOverrideField) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', y: 1}")); + auto updateOp = update(1, fromjson("{$unset: {y: 1}}")); + auto indexOp = buildIndex(fromjson("{x: 'text'}"), fromjson("{language_override: 'y'}"), kUuid); + + auto ops = {insertOp, updateOp, indexOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 17261); +} + +TEST_F(IdempotencyTest, InsertDocumentWithNonStringLanguageOverrideFieldWhenTextIndexExists) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto indexOp = buildIndex(fromjson("{x: 'text'}"), fromjson("{language_override: 'y'}"), kUuid); + auto dropIndexOp = dropIndex("x_index", kUuid); + auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', y: 1}")); + + auto ops = {indexOp, dropIndexOp, insertOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 17261); +} + +TEST_F(IdempotencyTest, TextIndexDocumentHasUnknownLanguage) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 'bad'}")); + auto updateOp = update(1, fromjson("{$unset: {language: 1}}")); + auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj(), kUuid); + + auto ops = {insertOp, updateOp, indexOp}; + testOpsAreIdempotent(ops); + + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + auto status = runOpsInitialSync(ops); + ASSERT_EQ(status.code(), 17262); +} + +TEST_F(IdempotencyTest, CreateCollectionWithValidation) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + const BSONObj uuidObj = kUuid.toBSON(); + + auto runOpsAndValidate = [this, uuidObj]() { + auto options1 = fromjson("{'validator' : {'phone' : {'$type' : 'string' } } }"); + options1 = options1.addField(uuidObj.firstElement()); + auto createColl1 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options1); + auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); + + auto options2 = fromjson("{'validator' : {'phone' : {'$type' : 'number' } } }"); + options2 = options2.addField(uuidObj.firstElement()); + auto createColl2 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options2); + + auto ops = {createColl1, dropColl, createColl2}; + ASSERT_OK(runOpsInitialSync(ops)); + auto state = validate(); + + return state; + }; + + auto state1 = runOpsAndValidate(); + auto state2 = runOpsAndValidate(); + ASSERT_EQUALS(state1, state2); +} + +TEST_F(IdempotencyTest, CreateCollectionWithCollation) { + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + CollectionUUID uuid = UUID::gen(); + + auto runOpsAndValidate = [this, uuid]() { + auto insertOp1 = insert(fromjson("{ _id: 'foo' }")); + auto insertOp2 = insert(fromjson("{ _id: 'Foo', x: 1 }")); + auto updateOp = update("foo", BSON("$set" << BSON("x" << 2))); + auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); + auto options = BSON("collation" + << BSON("locale" + << "en" + << "caseLevel" << false << "caseFirst" + << "off" + << "strength" << 1 << "numericOrdering" << false << "alternate" + << "non-ignorable" + << "maxVariable" + << "punct" + << "normalization" << false << "backwards" << false << "version" + << "57.1") + << "uuid" << uuid); + auto createColl = makeCreateCollectionOplogEntry(nextOpTime(), nss, options); + + // We don't drop and re-create the collection since we don't have ways + // to wait until second-phase drop to completely finish. + auto ops = {createColl, insertOp1, insertOp2, updateOp}; + ASSERT_OK(runOpsInitialSync(ops)); + auto state = validate(); + + return state; + }; + + auto state1 = runOpsAndValidate(); + auto state2 = runOpsAndValidate(); + ASSERT_EQUALS(state1, state2); +} + +TEST_F(IdempotencyTest, CreateCollectionWithIdIndex) { + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + CollectionUUID uuid = kUuid; + + auto options1 = BSON("idIndex" << BSON("key" << fromjson("{_id: 1}") << "name" + << "_id_" + << "v" << 2) + << "uuid" << uuid); + auto createColl1 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options1); + ASSERT_OK(runOpInitialSync(createColl1)); + + auto runOpsAndValidate = [this, uuid]() { + auto insertOp = insert(BSON("_id" << Decimal128(1))); + auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); + auto createColl2 = createCollection(uuid); + + auto ops = {insertOp, dropColl, createColl2}; + ASSERT_OK(runOpsInitialSync(ops)); + auto state = validate(); + + return state; + }; + + auto state1 = runOpsAndValidate(); + auto state2 = runOpsAndValidate(); + ASSERT_EQUALS(state1, state2); +} + +TEST_F(IdempotencyTest, CreateCollectionWithView) { + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + CollectionOptions options; + options.uuid = kUuid; + + // Create data collection + ASSERT_OK(runOpInitialSync(createCollection())); + // Create "system.views" collection + auto viewNss = NamespaceString(nss.db(), "system.views"); + ASSERT_OK( + runOpInitialSync(makeCreateCollectionOplogEntry(nextOpTime(), viewNss, options.toBSON()))); + + auto viewDoc = BSON("_id" << NamespaceString(nss.db(), "view").ns() << "viewOn" << nss.coll() + << "pipeline" << fromjson("[ { '$project' : { 'x' : 1 } } ]")); + auto insertViewOp = makeInsertDocumentOplogEntry(nextOpTime(), viewNss, viewDoc); + auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); + + auto ops = {insertViewOp, dropColl}; + testOpsAreIdempotent(ops); +} + +TEST_F(IdempotencyTest, CollModNamespaceNotFound) { + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + ASSERT_OK(runOpInitialSync( + buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600), kUuid))); + + auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); + auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); + auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd, kUuid); + auto dropCollOp = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll()), kUuid); + + auto ops = {collModOp, dropCollOp}; + testOpsAreIdempotent(ops); +} + +TEST_F(IdempotencyTest, CollModIndexNotFound) { + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + ASSERT_OK(runOpInitialSync(createCollection(kUuid))); + ASSERT_OK(runOpInitialSync( + buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600), kUuid))); + + auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); + auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); + auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd, kUuid); + auto dropIndexOp = dropIndex("createdAt_index", kUuid); + + auto ops = {collModOp, dropIndexOp}; + testOpsAreIdempotent(ops); +} + +TEST_F(OplogApplierImplTest, FailOnDropFCVCollection) { + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + auto fcvNS(NamespaceString::kServerConfigurationNamespace); + auto cmd = BSON("drop" << fcvNS.coll()); + auto op = makeCommandOplogEntry(nextOpTime(), fcvNS, cmd); + ASSERT_EQUALS(runOpInitialSync(op), ErrorCodes::OplogOperationUnsupported); +} + +TEST_F(OplogApplierImplTest, FailOnInsertFCVDocument) { + auto fcvNS(NamespaceString::kServerConfigurationNamespace); + ::mongo::repl::createCollection(_opCtx.get(), fcvNS, CollectionOptions()); + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + auto op = makeInsertDocumentOplogEntry( + nextOpTime(), fcvNS, BSON("_id" << FeatureCompatibilityVersionParser::kParameterName)); + ASSERT_EQUALS(runOpInitialSync(op), ErrorCodes::OplogOperationUnsupported); +} + +TEST_F(IdempotencyTest, InsertToFCVCollectionBesidesFCVDocumentSucceeds) { + auto fcvNS(NamespaceString::kServerConfigurationNamespace); + ::mongo::repl::createCollection(_opCtx.get(), fcvNS, CollectionOptions()); + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + auto op = makeInsertDocumentOplogEntry(nextOpTime(), + fcvNS, + BSON("_id" + << "other")); + ASSERT_OK(runOpInitialSync(op)); +} + +TEST_F(IdempotencyTest, DropDatabaseSucceeds) { + // Choose `system.profile` so the storage engine doesn't expect the drop to be timestamped. + auto ns = NamespaceString("foo.system.profile"); + ::mongo::repl::createCollection(_opCtx.get(), ns, CollectionOptions()); + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + auto op = makeCommandOplogEntry(nextOpTime(), ns, BSON("dropDatabase" << 1)); + ASSERT_OK(runOpInitialSync(op)); +} + +TEST_F(OplogApplierImplTest, DropDatabaseSucceedsInRecovering) { + // Choose `system.profile` so the storage engine doesn't expect the drop to be timestamped. + auto ns = NamespaceString("foo.system.profile"); + ::mongo::repl::createCollection(_opCtx.get(), ns, CollectionOptions()); + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); + + auto op = makeCommandOplogEntry(nextOpTime(), ns, BSON("dropDatabase" << 1)); + ASSERT_OK(runOpSteadyState(op)); +} + +TEST_F(OplogApplierImplTest, LogSlowOpApplicationWhenSuccessful) { + // This duration is greater than "slowMS", so the op would be considered slow. + auto applyDuration = serverGlobalParams.slowMS * 10; + getServiceContext()->setFastClockSource( + std::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration))); + + // We are inserting into an existing collection. + const NamespaceString nss("test.t"); + createCollection(_opCtx.get(), nss, {}); + auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); + + startCapturingLogMessages(); + ASSERT_OK( + _applyOplogEntryBatchWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary)); + + // Use a builder for easier escaping. We expect the operation to be logged. + StringBuilder expected; + expected << "applied op: CRUD { ts: Timestamp(1, 1), t: 1, v: 2, op: \"i\", ns: \"test.t\", " + "wall: new Date(0), o: " + "{ _id: 0 } }, took " + << applyDuration << "ms"; + ASSERT_EQUALS(1, countLogLinesContaining(expected.str())); +} + +TEST_F(OplogApplierImplTest, DoNotLogSlowOpApplicationWhenFailed) { + // This duration is greater than "slowMS", so the op would be considered slow. + auto applyDuration = serverGlobalParams.slowMS * 10; + getServiceContext()->setFastClockSource( + std::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration))); + + // We are trying to insert into a non-existing database. + NamespaceString nss("test.t"); + auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); + + startCapturingLogMessages(); + ASSERT_THROWS( + _applyOplogEntryBatchWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary), + ExceptionFor<ErrorCodes::NamespaceNotFound>); + + // Use a builder for easier escaping. We expect the operation to *not* be logged + // even thought it was slow, since we couldn't apply it successfully. + StringBuilder expected; + expected << "applied op: CRUD { op: \"i\", ns: \"test.t\", o: { _id: 0 }, ts: Timestamp(1, 1), " + "t: 1, h: 1, v: 2 }, took " + << applyDuration << "ms"; + ASSERT_EQUALS(0, countLogLinesContaining(expected.str())); +} + +TEST_F(OplogApplierImplTest, DoNotLogNonSlowOpApplicationWhenSuccessful) { + // This duration is below "slowMS", so the op would *not* be considered slow. + auto applyDuration = serverGlobalParams.slowMS / 10; + getServiceContext()->setFastClockSource( + std::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration))); + + // We are inserting into an existing collection. + const NamespaceString nss("test.t"); + createCollection(_opCtx.get(), nss, {}); + auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); + + startCapturingLogMessages(); + ASSERT_OK( + _applyOplogEntryBatchWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary)); + + // Use a builder for easier escaping. We expect the operation to *not* be logged, + // since it wasn't slow to apply. + StringBuilder expected; + expected << "applied op: CRUD { op: \"i\", ns: \"test.t\", o: { _id: 0 }, ts: Timestamp(1, 1), " + "t: 1, h: 1, v: 2 }, took " + << applyDuration << "ms"; + ASSERT_EQUALS(0, countLogLinesContaining(expected.str())); +} class OplogApplierImplTxnTableTest : public OplogApplierImplTest { public: void setUp() override { @@ -1218,7 +2479,7 @@ TEST_F(OplogApplierImplTxnTableTest, SimpleWriteWithTxn) { ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1259,7 +2520,7 @@ TEST_F(OplogApplierImplTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1314,7 +2575,7 @@ TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDelet ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1356,7 +2617,7 @@ TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdat ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1421,7 +2682,7 @@ TEST_F(OplogApplierImplTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnS ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1491,7 +2752,7 @@ TEST_F(OplogApplierImplTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnS ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1564,7 +2825,7 @@ TEST_F(OplogApplierImplTxnTableTest, MultiApplyUpdatesTheTransactionTable) { ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1648,7 +2909,7 @@ TEST_F(OplogApplierImplTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnT ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1681,7 +2942,7 @@ TEST_F(OplogApplierImplTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1716,7 +2977,7 @@ TEST_F(OplogApplierImplTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTabl ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - multiSyncApply, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -1728,6 +2989,668 @@ TEST_F(OplogApplierImplTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTabl BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()))); } +TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) { + // Create a BSON "emptycapped" command. + auto emptyCappedCmd = BSON("emptycapped" << nss.coll()); + + // Create an "emptycapped" oplog entry. + auto emptyCappedOp = makeCommandOplogEntry(nextOpTime(), nss, emptyCappedCmd); + + // Ensure that NamespaceNotFound is acceptable. + ASSERT_OK(runOpInitialSync(emptyCappedOp)); + + AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); + + // Ensure that autoColl.getCollection() and autoColl.getDb() are both null. + ASSERT_FALSE(autoColl.getCollection()); + ASSERT_FALSE(autoColl.getDb()); +} + +TEST_F(IdempotencyTest, ConvertToCappedNamespaceNotFound) { + // Create a BSON "convertToCapped" command. + auto convertToCappedCmd = BSON("convertToCapped" << nss.coll()); + + // Create a "convertToCapped" oplog entry. + auto convertToCappedOp = makeCommandOplogEntry(nextOpTime(), nss, convertToCappedCmd); + + // Ensure that NamespaceNotFound is acceptable. + ASSERT_OK(runOpInitialSync(convertToCappedOp)); + + AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); + + // Ensure that autoColl.getCollection() and autoColl.getDb() are both null. + ASSERT_FALSE(autoColl.getCollection()); + ASSERT_FALSE(autoColl.getDb()); +} + +class IdempotencyTestTxns : public IdempotencyTest {}; + +// Document used by transaction idempotency tests. +const BSONObj doc = fromjson("{_id: 1}"); +const BSONObj doc2 = fromjson("{_id: 2}"); + +TEST_F(IdempotencyTestTxns, CommitUnpreparedTransaction) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto commitOp = commitUnprepared( + lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionDataPartiallyApplied) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + NamespaceString nss2("test.coll2"); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto commitOp = commitUnprepared(lsid, + txnNum, + StmtId(0), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc) + << makeInsertApplyOpsEntry(nss2, uuid2, doc))); + + // Manually insert one of the documents so that the data will partially reflect the transaction + // when the commitTransaction oplog entry is applied during initial sync. + ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + nss, + {doc, commitOp.getOpTime().getTimestamp()}, + commitOp.getOpTime().getTerm())); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc)); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc)); +} + +TEST_F(IdempotencyTestTxns, CommitPreparedTransaction) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto prepareOp = + prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + + auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTestTxns, CommitPreparedTransactionDataPartiallyApplied) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + NamespaceString nss2("test.coll2"); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto prepareOp = prepare(lsid, + txnNum, + StmtId(0), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc) + << makeInsertApplyOpsEntry(nss2, uuid2, doc))); + + auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); + + // Manually insert one of the documents so that the data will partially reflect the transaction + // when the commitTransaction oplog entry is applied during initial sync. + ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + nss, + {doc, commitOp.getOpTime().getTimestamp()}, + commitOp.getOpTime().getTerm())); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc)); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc)); +} + +TEST_F(IdempotencyTestTxns, AbortPreparedTransaction) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto prepareOp = + prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto abortOp = abortPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp, abortOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + abortOp.getOpTime(), + abortOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kAborted); + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTestTxns, SinglePartialTxnOp) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp}); + auto expectedStartOpTime = partialOp.getOpTime(); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + partialOp.getOpTime(), + partialOp.getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Document should not be visible yet. + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTestTxns, MultiplePartialTxnOps) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp1 = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto partialOp2 = partialTxn(lsid, + txnNum, + StmtId(1), + partialOp1.getOpTime(), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp1, partialOp2}); + auto expectedStartOpTime = partialOp1.getOpTime(); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + partialOp1.getOpTime(), + partialOp1.getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + // Document should not be visible yet. + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionWithPartialTxnOps) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + + auto commitOp = commitUnprepared(lsid, + txnNum, + StmtId(1), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), + partialOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, CommitTwoUnpreparedTransactionsWithPartialTxnOpsAtOnce) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum1(1); + TxnNumber txnNum2(2); + + auto partialOp1 = partialTxn( + lsid, txnNum1, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto commitOp1 = + commitUnprepared(lsid, txnNum1, StmtId(1), BSONArray(), partialOp1.getOpTime()); + + // The second transaction (with a different transaction number) in the same session. + auto partialOp2 = partialTxn( + lsid, txnNum2, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); + auto commitOp2 = + commitUnprepared(lsid, txnNum2, StmtId(1), BSONArray(), partialOp2.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + // This also tests that we clear the partialTxnList for the session after applying the commit of + // the first transaction. Otherwise, saving operations from the second transaction to the same + // partialTxnList as the first transaction will trigger an invariant because of the mismatching + // transaction numbers. + testOpsAreIdempotent({partialOp1, commitOp1, partialOp2, commitOp2}); + + // The transaction table should only contain the second transaction of the session. + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum2, + commitOp2.getOpTime(), + commitOp2.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, CommitAndAbortTwoTransactionsWithPartialTxnOpsAtOnce) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum1(1); + TxnNumber txnNum2(2); + + auto partialOp1 = partialTxn( + lsid, txnNum1, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto abortOp1 = abortPrepared(lsid, txnNum1, StmtId(1), partialOp1.getOpTime()); + + // The second transaction (with a different transaction number) in the same session. + auto partialOp2 = partialTxn( + lsid, txnNum2, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); + auto commitOp2 = + commitUnprepared(lsid, txnNum2, StmtId(1), BSONArray(), partialOp2.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + // This also tests that we clear the partialTxnList for the session after applying the abort of + // the first transaction. Otherwise, saving operations from the second transaction to the same + // partialTxnList as the first transaction will trigger an invariant because of the mismatching + // transaction numbers. + testOpsAreIdempotent({partialOp1, abortOp1, partialOp2, commitOp2}); + + // The transaction table should only contain the second transaction of the session. + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum2, + commitOp2.getOpTime(), + commitOp2.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionWithPartialTxnOpsAndDataPartiallyApplied) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + + auto commitOp = commitUnprepared(lsid, + txnNum, + StmtId(1), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), + partialOp.getOpTime()); + + // Manually insert the first document so that the data will partially reflect the transaction + // when the commitTransaction oplog entry is applied during initial sync. This simulates the + // case where the transaction committed on the sync source at a point during the initial sync, + // such that we cloned 'doc' but missed 'doc2'. + ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + nss, + {doc, commitOp.getOpTime().getTimestamp()}, + commitOp.getOpTime().getTerm())); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, PrepareTransactionWithPartialTxnOps) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto prepareOp = prepare(lsid, + txnNum, + StmtId(1), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), + partialOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp, prepareOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + prepareOp.getOpTime(), + prepareOp.getWallClockTime(), + partialOp.getOpTime(), + DurableTxnStateEnum::kPrepared); + // Document should not be visible yet. + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTestTxns, EmptyPrepareTransaction) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + // It is possible to have an empty prepare oplog entry. + auto prepareOp = prepare(lsid, txnNum, StmtId(1), BSONArray(), OpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + prepareOp.getOpTime(), + prepareOp.getWallClockTime(), + prepareOp.getOpTime(), + DurableTxnStateEnum::kPrepared); +} + +TEST_F(IdempotencyTestTxns, CommitPreparedTransactionWithPartialTxnOps) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto prepareOp = prepare(lsid, + txnNum, + StmtId(1), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), + partialOp.getOpTime()); + auto commitOp = commitPrepared(lsid, txnNum, StmtId(2), prepareOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp, prepareOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, CommitTwoPreparedTransactionsWithPartialTxnOpsAtOnce) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum1(1); + TxnNumber txnNum2(2); + + auto partialOp1 = partialTxn( + lsid, txnNum1, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto prepareOp1 = prepare(lsid, txnNum1, StmtId(1), BSONArray(), partialOp1.getOpTime()); + auto commitOp1 = commitPrepared(lsid, txnNum1, StmtId(2), prepareOp1.getOpTime()); + + // The second transaction (with a different transaction number) in the same session. + auto partialOp2 = partialTxn( + lsid, txnNum2, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); + auto prepareOp2 = prepare(lsid, txnNum2, StmtId(1), BSONArray(), partialOp2.getOpTime()); + auto commitOp2 = commitPrepared(lsid, txnNum2, StmtId(2), prepareOp2.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + // This also tests that we clear the partialTxnList for the session after applying the commit of + // the first prepared transaction. Otherwise, saving operations from the second transaction to + // the same partialTxnList as the first transaction will trigger an invariant because of the + // mismatching transaction numbers. + testOpsAreIdempotent({partialOp1, prepareOp1, commitOp1, partialOp2, prepareOp2, commitOp2}); + + // The transaction table should only contain the second transaction of the session. + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum2, + commitOp2.getOpTime(), + commitOp2.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, CommitPreparedTransactionWithPartialTxnOpsAndDataPartiallyApplied) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto prepareOp = prepare(lsid, + txnNum, + StmtId(1), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), + partialOp.getOpTime()); + auto commitOp = commitPrepared(lsid, txnNum, StmtId(2), prepareOp.getOpTime()); + + // Manually insert the first document so that the data will partially reflect the transaction + // when the commitTransaction oplog entry is applied during initial sync. This simulates the + // case where the transaction committed on the sync source at a point during the initial sync, + // such that we cloned 'doc' but missed 'doc2'. + ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + nss, + {doc, commitOp.getOpTime().getTimestamp()}, + commitOp.getOpTime().getTerm())); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp, prepareOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, AbortPreparedTransactionWithPartialTxnOps) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto prepareOp = prepare(lsid, + txnNum, + StmtId(1), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), + partialOp.getOpTime()); + auto abortOp = abortPrepared(lsid, txnNum, StmtId(2), prepareOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp, prepareOp, abortOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + abortOp.getOpTime(), + abortOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kAborted); + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc2)); +} + +TEST_F(IdempotencyTestTxns, AbortInProgressTransaction) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto partialOp = partialTxn( + lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto abortOp = abortPrepared(lsid, txnNum, StmtId(1), partialOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({partialOp, abortOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + abortOp.getOpTime(), + abortOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kAborted); + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionIgnoresNamespaceNotFoundErrors) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + + // Instead of creating a collection, we generate an arbitrary UUID to use for the operations + // below. This simulates the case where, during initial sync, a document D was inserted into a + // collection C on the sync source and then collection C was dropped, after we started fetching + // oplog entries but before we started collection cloning. In this case, we would not clone + // collection C, but when we try to apply the insertion of document D after collection cloning + // has finished, the collection would not exist since we never created it. It is acceptable to + // ignore the NamespaceNotFound error in this case since we know the collection will be dropped + // later on. + auto uuid = UUID::gen(); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto commitOp = commitUnprepared( + lsid, txnNum, StmtId(1), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)), OpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({commitOp}); + + // The op should have thrown a NamespaceNotFound error, which should have been ignored, so the + // operation has no effect. + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTestTxns, CommitPreparedTransactionIgnoresNamespaceNotFoundErrors) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + + // Instead of creating a collection, we generate an arbitrary UUID to use for the operations + // below. This simulates the case where, during initial sync, a document D was inserted into a + // collection C on the sync source and then collection C was dropped, after we started fetching + // oplog entries but before we started collection cloning. In this case, we would not clone + // collection C, but when we try to apply the insertion of document D after collection cloning + // has finished, the collection would not exist since we never created it. It is acceptable to + // ignore the NamespaceNotFound error in this case since we know the collection will be dropped + // later on. + auto uuid = UUID::gen(); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto prepareOp = prepare( + lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)), OpTime()); + auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp, commitOp}); + + // The op should have thrown a NamespaceNotFound error, which should have been ignored, so the + // operation has no effect. + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); +} } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp index 1e85b6eba26..a9d6821d6f0 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -89,7 +89,6 @@ void OplogApplierImplOpObserver::onCreateCollection(OperationContext* opCtx, } onCreateCollectionFn(opCtx, coll, collectionName, options, idIndex); } - void OplogApplierImplTest::setUp() { ServiceContextMongoDTest::setUp(); @@ -138,5 +137,200 @@ StorageInterface* OplogApplierImplTest::getStorageInterface() const { return StorageInterface::get(serviceContext); } +// Since applyOplogEntryBatch is being tested outside of its calling function (applyOplogGroup), we +// recreate the necessary calling context. +Status OplogApplierImplTest::_applyOplogEntryBatchWrapper( + OperationContext* opCtx, + const OplogEntryBatch& batch, + OplogApplication::Mode oplogApplicationMode) { + UnreplicatedWritesBlock uwb(opCtx); + DisableDocumentValidation validationDisabler(opCtx); + return applyOplogEntryBatch(opCtx, batch, oplogApplicationMode); +} + +void OplogApplierImplTest::_testApplyOplogEntryBatchCrudOperation(ErrorCodes::Error expectedError, + const OplogEntry& op, + bool expectedApplyOpCalled) { + bool applyOpCalled = false; + + auto checkOpCtx = [](OperationContext* opCtx) { + ASSERT_TRUE(opCtx); + ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode("test", MODE_IX)); + ASSERT_FALSE(opCtx->lockState()->isDbLockedForMode("test", MODE_X)); + ASSERT_TRUE( + opCtx->lockState()->isCollectionLockedForMode(NamespaceString("test.t"), MODE_IX)); + ASSERT_FALSE(opCtx->writesAreReplicated()); + ASSERT_TRUE(documentValidationDisabled(opCtx)); + }; + + _opObserver->onInsertsFn = + [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + applyOpCalled = true; + checkOpCtx(opCtx); + ASSERT_EQUALS(NamespaceString("test.t"), nss); + ASSERT_EQUALS(1U, docs.size()); + ASSERT_BSONOBJ_EQ(op.getObject(), docs[0]); + return Status::OK(); + }; + + _opObserver->onDeleteFn = [&](OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) { + applyOpCalled = true; + checkOpCtx(opCtx); + ASSERT_EQUALS(NamespaceString("test.t"), nss); + ASSERT(deletedDoc); + ASSERT_BSONOBJ_EQ(op.getObject(), *deletedDoc); + return Status::OK(); + }; + + ASSERT_EQ(_applyOplogEntryBatchWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), + expectedError); + ASSERT_EQ(applyOpCalled, expectedApplyOpCalled); +} + +Status failedApplyCommand(OperationContext* opCtx, + const BSONObj& theOperation, + OplogApplication::Mode) { + FAIL("applyCommand unexpectedly invoked."); + return Status::OK(); +} + +Status OplogApplierImplTest::runOpSteadyState(const OplogEntry& op) { + return runOpsSteadyState({op}); +} + +Status OplogApplierImplTest::runOpsSteadyState(std::vector<OplogEntry> ops) { + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + nullptr, // observer + nullptr, // replCoord + getConsistencyMarkers(), + getStorageInterface(), + OplogApplierImpl::ApplyGroupFunc(), + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), + nullptr); + MultiApplier::OperationPtrs opsPtrs; + for (auto& op : ops) { + opsPtrs.push_back(&op); + } + WorkerMultikeyPathInfo pathInfo; + return applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo); +} + +Status OplogApplierImplTest::runOpInitialSync(const OplogEntry& op) { + return runOpsInitialSync({op}); +} + +Status OplogApplierImplTest::runOpsInitialSync(std::vector<OplogEntry> ops) { + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + nullptr, // observer + nullptr, // replCoord + getConsistencyMarkers(), + getStorageInterface(), + OplogApplierImpl::ApplyGroupFunc(), + repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), + nullptr); + // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD + // operations provided by idempotency tests. + for (auto& op : ops) { + MultiApplier::OperationPtrs opsPtrs; + opsPtrs.push_back(&op); + WorkerMultikeyPathInfo pathInfo; + auto status = applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo); + if (!status.isOK()) { + return status; + } + } + return Status::OK(); +} + +Status OplogApplierImplTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) { + OplogApplierImpl oplogApplier( + nullptr, // executor + nullptr, // oplogBuffer + nullptr, // observer + nullptr, // replCoord + getConsistencyMarkers(), + getStorageInterface(), + OplogApplierImpl::ApplyGroupFunc(), + repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), + nullptr); + // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD + // operations provided by idempotency tests. + for (auto& op : ops) { + MultiApplier::OperationPtrs opsPtrs; + opsPtrs.push_back(op); + WorkerMultikeyPathInfo pathInfo; + auto status = applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo); + if (!status.isOK()) { + return status; + } + } + return Status::OK(); +} + +void checkTxnTable(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNum, + const repl::OpTime& expectedOpTime, + Date_t expectedWallClock, + boost::optional<repl::OpTime> expectedStartOpTime, + boost::optional<DurableTxnStateEnum> expectedState) { + DBDirectClient client(opCtx); + auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())}); + ASSERT_FALSE(result.isEmpty()); + + auto txnRecord = + SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result); + + ASSERT_EQ(txnNum, txnRecord.getTxnNum()); + ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime()); + ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate()); + if (expectedStartOpTime) { + ASSERT(txnRecord.getStartOpTime()); + ASSERT_EQ(*expectedStartOpTime, *txnRecord.getStartOpTime()); + } else { + ASSERT(!txnRecord.getStartOpTime()); + } + if (expectedState) { + ASSERT(*expectedState == txnRecord.getState()); + } +} + +CollectionReader::CollectionReader(OperationContext* opCtx, const NamespaceString& nss) + : _collToScan(opCtx, nss), + _exec(InternalPlanner::collectionScan(opCtx, + nss.ns(), + _collToScan.getCollection(), + PlanExecutor::NO_YIELD, + InternalPlanner::FORWARD)) {} + +StatusWith<BSONObj> CollectionReader::next() { + BSONObj obj; + + auto state = _exec->getNext(&obj, nullptr); + if (state == PlanExecutor::IS_EOF) { + return {ErrorCodes::CollectionIsEmpty, + str::stream() << "no more documents in " << _collToScan.getNss()}; + } + + // PlanExecutors that do not yield should only return ADVANCED or EOF. + invariant(state == PlanExecutor::ADVANCED); + return obj; +} + +bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) { + DBDirectClient client(opCtx); + auto result = client.findOne(nss.ns(), {doc}); + return !result.isEmpty(); +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h index 31c703bc97e..71967716330 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h @@ -45,7 +45,6 @@ class BSONObj; class OperationContext; namespace repl { - /** * OpObserver for OplogApplierImpl test fixture. */ @@ -104,23 +103,23 @@ public: class OplogApplierImplTest : public ServiceContextMongoDTest { protected: - void _testSyncApplyCrudOperation(ErrorCodes::Error expectedError, - const OplogEntry& op, - bool expectedApplyOpCalled); + void _testApplyOplogEntryBatchCrudOperation(ErrorCodes::Error expectedError, + const OplogEntry& op, + bool expectedApplyOpCalled); - Status _syncApplyWrapper(OperationContext* opCtx, - const OplogEntryBatch& batch, - OplogApplication::Mode oplogApplicationMode); + Status _applyOplogEntryBatchWrapper(OperationContext* opCtx, + const OplogEntryBatch& batch, + OplogApplication::Mode oplogApplicationMode); ServiceContext::UniqueOperationContext _opCtx; std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; ServiceContext* serviceContext; OplogApplierImplOpObserver* _opObserver = nullptr; - // Implements the OplogApplierImpl::MultiSyncApplyFn interface and does nothing. + // Implements the OplogApplierImpl::ApplyGroupFn interface and does nothing. static Status noopApplyOperationFn(OperationContext*, MultiApplier::OperationPtrs*, - SyncTail* st, + OplogApplierImpl* oai, WorkerMultikeyPathInfo*) { return Status::OK(); } @@ -147,9 +146,32 @@ protected: UUID kUuid{UUID::gen()}; }; +// Utility class to allow easily scanning a collection. Scans in forward order, returns +// Status::CollectionIsEmpty when scan is exhausted. +class CollectionReader { +public: + CollectionReader(OperationContext* opCtx, const NamespaceString& nss); + + StatusWith<BSONObj> next(); + +private: + AutoGetCollectionForRead _collToScan; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _exec; +}; + Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation, OplogApplication::Mode); +void checkTxnTable(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNum, + const repl::OpTime& expectedOpTime, + Date_t expectedWallClock, + boost::optional<repl::OpTime> expectedStartOpTime, + boost::optional<DurableTxnStateEnum> expectedState); + +bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index ca8582fc754..27af958e745 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -38,12 +38,12 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" #include "mongo/db/repl/oplog_buffer_collection.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/repl/sync_tail_test_fixture.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" diff --git a/src/mongo/db/repl/oplog_entry_batch.h b/src/mongo/db/repl/oplog_entry_batch.h index 444087c0a45..f65a62da8f5 100644 --- a/src/mongo/db/repl/oplog_entry_batch.h +++ b/src/mongo/db/repl/oplog_entry_batch.h @@ -35,9 +35,9 @@ namespace mongo { namespace repl { /** - * This is a class for a single oplog entry or grouped inserts to be applied in syncApply. This - * class is immutable and can only be initialized using either a single oplog entry or a range of - * grouped inserts. + * This is a class for a single oplog entry or grouped inserts to be applied in + * applyOplogEntryBatch. This class is immutable and can only be initialized using either a single + * oplog entry or a range of grouped inserts. */ class OplogEntryBatch { public: diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 49be20493d8..02cc3557a96 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -223,7 +223,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( replCoord, _replicationProcess->getConsistencyMarkers(), _storageInterface, - multiSyncApply, + applyOplogGroup, OplogApplier::Options(OplogApplication::Mode::kSecondary), _writerPool.get()); diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index a6f58f7b5a9..a38c75a3c6d 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -363,7 +363,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, ReplicationCoordinator::get(opCtx), _consistencyMarkers, _storageInterface, - multiSyncApply, + applyOplogGroup, OplogApplier::Options(OplogApplication::Mode::kRecovering), writerPool.get()); diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp index 3e97d2a56c4..a6d3603e9d0 100644 --- a/src/mongo/db/repl/replication_recovery_test.cpp +++ b/src/mongo/db/repl/replication_recovery_test.cpp @@ -38,13 +38,13 @@ #include "mongo/db/op_observer_registry.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" +#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/repl/sync_tail_test_fixture.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/session_txn_record_gen.h" diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 47209703d97..fa918ba6b46 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -1627,12 +1627,12 @@ void rollback(OperationContext* opCtx, // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or // after transitioning to RECOVERING. We always transition to RECOVERING regardless of success // or (recoverable) failure since we may be in an inconsistent state. If rollback failed before - // writing anything, SyncTail will quickly take us to SECONDARY since are are still at our - // original MinValid, which is fine because we may choose a sync source that doesn't require - // rollback. If it failed after we wrote to MinValid, then we will pick a sync source that will - // cause us to roll back to the same common point, which is fine. If we succeeded, we will be - // consistent as soon as we apply up to/through MinValid and SyncTail will make us SECONDARY - // then. + // writing anything, the Replication Coordinator will quickly take us to SECONDARY since we are + // still at our original MinValid, which is fine because we may choose a sync source that + // doesn't require rollback. If it failed after we wrote to MinValid, then we will pick a sync + // source that will cause us to roll back to the same common point, which is fine. If we + // succeeded, we will be consistent as soon as we apply up to/through MinValid and the + // Replication Coordinator will make us SECONDARY then. { ReplicationStateTransitionLockGuard transitionGuard(opCtx, MODE_X); diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index a5502f21cb2..378248e5316 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -47,9 +47,9 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/repl/sync_tail_test_fixture.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/stdx/thread.h" diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp deleted file mode 100644 index 79bf0c3c345..00000000000 --- a/src/mongo/db/repl/sync_tail.cpp +++ /dev/null @@ -1,598 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/sync_tail.h" - -#include "third_party/murmurhash3/MurmurHash3.h" -#include <boost/functional/hash.hpp> -#include <memory> - -#include "mongo/base/counter.h" -#include "mongo/bson/bsonelement_comparator.h" -#include "mongo/bson/timestamp.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/collection_catalog.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/catalog_raii.h" -#include "mongo/db/client.h" -#include "mongo/db/commands/server_status_metric.h" -#include "mongo/db/commands/txn_cmds_gen.h" -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/concurrency/lock_state.h" -#include "mongo/db/concurrency/replication_state_transition_lock_guard.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/logical_session_id.h" -#include "mongo/db/multi_key_path_tracker.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/query/query_knobs_gen.h" -#include "mongo/db/repl/apply_ops.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/initial_syncer.h" -#include "mongo/db/repl/insert_group.h" -#include "mongo/db/repl/multiapplier.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/repl_set_config.h" -#include "mongo/db/repl/replication_auth.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/transaction_oplog_application.h" -#include "mongo/db/session.h" -#include "mongo/db/session_txn_record_gen.h" -#include "mongo/db/stats/timer_stats.h" -#include "mongo/db/transaction_participant.h" -#include "mongo/db/transaction_participant_gen.h" -#include "mongo/util/exit.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" -#include "mongo/util/net/socket_exception.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/str.h" - -namespace mongo { -namespace repl { -namespace { - -MONGO_FAIL_POINT_DEFINE(hangAfterRecordingOpApplicationStartTime); - -// The oplog entries applied -Counter64 opsAppliedStats; -ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats); - -NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { - auto optionalUuid = oplogEntry.getUuid(); - if (!optionalUuid) { - return oplogEntry.getNss(); - } - - const auto& uuid = optionalUuid.get(); - auto& catalog = CollectionCatalog::get(opCtx); - auto nss = catalog.lookupNSSByUUID(uuid); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "No namespace with UUID " << uuid.toString(), - nss); - return *nss; -} - -NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const OplogEntry& op) { - if (auto ui = op.getUuid()) { - return {nss.db().toString(), ui.get()}; - } - return nss; -} - -/** - * Used for logging a report of ops that take longer than "slowMS" to apply. This is called - * right before returning from syncApply, and it returns the same status. - */ -Status finishAndLogApply(ClockSource* clockSource, - Status finalStatus, - Date_t applyStartTime, - const OplogEntryBatch& batch) { - - if (finalStatus.isOK()) { - auto applyEndTime = clockSource->now(); - auto diffMS = durationCount<Milliseconds>(applyEndTime - applyStartTime); - - // This op was slow to apply, so we should log a report of it. - if (diffMS > serverGlobalParams.slowMS) { - - StringBuilder s; - s << "applied op: "; - - if (batch.getOp().getOpType() == OpTypeEnum::kCommand) { - s << "command "; - } else { - s << "CRUD "; - } - - s << redact(batch.toBSON()); - s << ", took " << diffMS << "ms"; - - log() << s.str(); - } - } - return finalStatus; -} - -LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMode mode) { - return nss.isSystemDotViews() ? MODE_X : mode; -} - -} // namespace - -SyncTail::SyncTail(OplogApplier::Observer* observer, - StorageInterface* storageInterface, - const OplogApplier::Options& options) - : _observer(observer), _storageInterface(storageInterface), _options(options) {} - -SyncTail::~SyncTail() {} - -const OplogApplier::Options& SyncTail::getOptions() const { - return _options; -} - -namespace { - -/** - * Caches per-collection properties which are relevant for oplog application, so that they don't - * have to be retrieved repeatedly for each op. - */ -class CachedCollectionProperties { -public: - struct CollectionProperties { - bool isCapped = false; - const CollatorInterface* collator = nullptr; - }; - - CollectionProperties getCollectionProperties(OperationContext* opCtx, - const StringMapHashedKey& ns) { - auto it = _cache.find(ns); - if (it != _cache.end()) { - return it->second; - } - - auto collProperties = getCollectionPropertiesImpl(opCtx, NamespaceString(ns.key())); - _cache[ns] = collProperties; - return collProperties; - } - -private: - CollectionProperties getCollectionPropertiesImpl(OperationContext* opCtx, - const NamespaceString& nss) { - CollectionProperties collProperties; - - Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); - auto databaseHolder = DatabaseHolder::get(opCtx); - auto db = databaseHolder->getDb(opCtx, nss.db()); - if (!db) { - return collProperties; - } - - auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss); - if (!collection) { - return collProperties; - } - - collProperties.isCapped = collection->isCapped(); - collProperties.collator = collection->getDefaultCollator(); - return collProperties; - } - - StringMap<CollectionProperties> _cache; -}; - -/** - * Updates a CRUD op's hash and isForCappedCollection field if necessary. - */ -void processCrudOp(OperationContext* opCtx, - OplogEntry* op, - uint32_t* hash, - StringMapHashedKey* hashedNs, - CachedCollectionProperties* collPropertiesCache) { - const bool supportsDocLocking = - opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking(); - auto collProperties = collPropertiesCache->getCollectionProperties(opCtx, *hashedNs); - - // For doc locking engines, include the _id of the document in the hash so we get - // parallelism even if all writes are to a single collection. - // - // For capped collections, this is illegal, since capped collections must preserve - // insertion order. - if (supportsDocLocking && !collProperties.isCapped) { - BSONElement id = op->getIdElement(); - BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore, - collProperties.collator); - const size_t idHash = elementHasher.hash(id); - MurmurHash3_x86_32(&idHash, sizeof(idHash), *hash, hash); - } - - if (op->getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) { - // Mark capped collection ops before storing them to ensure we do not attempt to - // bulk insert them. - op->isForCappedCollection = true; - } -} - -/** - * Adds a single oplog entry to the appropriate writer vector. - */ -void addToWriterVector(OplogEntry* op, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - uint32_t hash) { - const uint32_t numWriters = writerVectors->size(); - auto& writer = (*writerVectors)[hash % numWriters]; - if (writer.empty()) { - writer.reserve(8); // Skip a few growth rounds - } - writer.push_back(op); -} - -/** - * Adds a set of derivedOps to writerVectors. - */ -void addDerivedOps(OperationContext* opCtx, - MultiApplier::Operations* derivedOps, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - CachedCollectionProperties* collPropertiesCache) { - for (auto&& op : *derivedOps) { - auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns()); - uint32_t hash = static_cast<uint32_t>(hashedNs.hash()); - if (op.isCrudOpType()) { - processCrudOp(opCtx, &op, &hash, &hashedNs, collPropertiesCache); - } - addToWriterVector(&op, writerVectors, hash); - } -} - -} // namespace - -Status syncApply(OperationContext* opCtx, - const OplogEntryBatch& batch, - OplogApplication::Mode oplogApplicationMode) { - // Guarantees that syncApply's context matches that of its calling function, multiSyncApply. - invariant(!opCtx->writesAreReplicated()); - invariant(documentValidationDisabled(opCtx)); - - auto op = batch.getOp(); - // Count each log op application as a separate operation, for reporting purposes - CurOp individualOp(opCtx); - - const NamespaceString nss(op.getNss()); - - auto incrementOpsAppliedStats = [] { opsAppliedStats.increment(1); }; - - auto applyOp = [&](Database* db) { - // We convert updates to upserts when not in initial sync because after rollback and during - // startup we may replay an update after a delete and crash since we do not ignore - // errors. In initial sync we simply ignore these update errors so there is no reason to - // upsert. - // - // TODO (SERVER-21700): Never upsert during oplog application unless an external applyOps - // wants to. We should ignore these errors intelligently while in RECOVERING and STARTUP - // mode (similar to initial sync) instead so we do not accidentally ignore real errors. - bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync); - Status status = applyOperation_inlock( - opCtx, db, batch, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats); - if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { - throw WriteConflictException(); - } - return status; - }; - - auto clockSource = opCtx->getServiceContext()->getFastClockSource(); - auto applyStartTime = clockSource->now(); - - if (MONGO_unlikely(hangAfterRecordingOpApplicationStartTime.shouldFail())) { - log() << "syncApply - fail point hangAfterRecordingOpApplicationStartTime enabled. " - << "Blocking until fail point is disabled. "; - hangAfterRecordingOpApplicationStartTime.pauseWhileSet(); - } - - auto opType = op.getOpType(); - - auto finishApply = [&](Status status) { - return finishAndLogApply(clockSource, status, applyStartTime, batch); - }; - - if (opType == OpTypeEnum::kNoop) { - incrementOpsAppliedStats(); - return Status::OK(); - } else if (OplogEntry::isCrudOpType(opType)) { - return finishApply(writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] { - // Need to throw instead of returning a status for it to be properly ignored. - try { - AutoGetCollection autoColl( - opCtx, getNsOrUUID(nss, op), fixLockModeForSystemDotViewsChanges(nss, MODE_IX)); - auto db = autoColl.getDb(); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "missing database (" << nss.db() << ")", - db); - OldClientContext ctx(opCtx, autoColl.getNss().ns(), db); - return applyOp(ctx.db()); - } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - // Delete operations on non-existent namespaces can be treated as successful for - // idempotency reasons. - // During RECOVERING mode, we ignore NamespaceNotFound for all CRUD ops since - // storage does not wait for drops to be checkpointed (SERVER-33161). - if (opType == OpTypeEnum::kDelete || - oplogApplicationMode == OplogApplication::Mode::kRecovering) { - return Status::OK(); - } - - ex.addContext(str::stream() - << "Failed to apply operation: " << redact(batch.toBSON())); - throw; - } - })); - } else if (opType == OpTypeEnum::kCommand) { - return finishApply(writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] { - // A special case apply for commands to avoid implicit database creation. - Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode); - incrementOpsAppliedStats(); - return status; - })); - } - - MONGO_UNREACHABLE; -} - -void stableSortByNamespace(MultiApplier::OperationPtrs* oplogEntryPointers) { - if (oplogEntryPointers->size() < 1U) { - return; - } - auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) { - return l->getNss() < r->getNss(); - }; - std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), nssComparator); -} - -// This free function is used by the writer threads to apply each op -Status multiSyncApply(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - invariant(st); - - UnreplicatedWritesBlock uwb(opCtx); - DisableDocumentValidation validationDisabler(opCtx); - // Since we swap the locker in stash / unstash transaction resources, - // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been - // destroyed by unstash in its destructor. Thus we set the flag explicitly. - opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); - - // Explicitly start future read transactions without a timestamp. - opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - - // When querying indexes, we return the record matching the key if it exists, or an adjacent - // document. This means that it is possible for us to hit a prepare conflict if we query for an - // incomplete key and an adjacent key is prepared. - // We ignore prepare conflicts on secondaries because they may encounter prepare conflicts that - // did not occur on the primary. - opCtx->recoveryUnit()->setPrepareConflictBehavior( - PrepareConflictBehavior::kIgnoreConflictsAllowWrites); - - stableSortByNamespace(ops); - - const auto oplogApplicationMode = st->getOptions().mode; - - InsertGroup insertGroup(ops, opCtx, oplogApplicationMode); - - { // Ensure that the MultikeyPathTracker stops tracking paths. - ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); }); - MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo(); - - for (auto it = ops->cbegin(); it != ops->cend(); ++it) { - const OplogEntry& entry = **it; - - // If we are successful in grouping and applying inserts, advance the current iterator - // past the end of the inserted group of entries. - auto groupResult = insertGroup.groupAndApplyInserts(it); - if (groupResult.isOK()) { - it = groupResult.getValue(); - continue; - } - - // If we didn't create a group, try to apply the op individually. - try { - const Status status = syncApply(opCtx, &entry, oplogApplicationMode); - - if (!status.isOK()) { - // Tried to apply an update operation but the document is missing, there must be - // a delete operation for the document later in the oplog. - if (status == ErrorCodes::UpdateOperationFailed && - oplogApplicationMode == OplogApplication::Mode::kInitialSync) { - continue; - } - - severe() << "Error applying operation (" << redact(entry.toBSON()) - << "): " << causedBy(redact(status)); - return status; - } - } catch (const DBException& e) { - // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be - // dropped before initial sync or recovery ends anyways and we should ignore it. - if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType() && - st->getOptions().allowNamespaceNotFoundErrorsOnCrudOps) { - continue; - } - - severe() << "writer worker caught exception: " << redact(e) - << " on: " << redact(entry.toBSON()); - return e.toStatus(); - } - } - } - - invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); - invariant(workerMultikeyPathInfo->empty()); - auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo(); - if (!newPaths.empty()) { - workerMultikeyPathInfo->swap(newPaths); - } - - return Status::OK(); -} - -/** - * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops - * vector in any other way. - * writerVectors - Set of operations for each worker thread to apply. - * derivedOps - If provided, this function inserts a decomposition of applyOps operations - * and instructions for updating the transactions table. Required if processing oplogs - * with transactions. - * sessionUpdateTracker - if provided, keeps track of session info from ops. - */ -void SyncTail::_deriveOpsAndFillWriterVectors( - OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker) noexcept { - - LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps; - CachedCollectionProperties collPropertiesCache; - for (auto&& op : *ops) { - // If the operation's optime is before or the same as the beginApplyingOpTime we don't want - // to apply it, so don't include it in writerVectors. - if (op.getOpTime() <= _options.beginApplyingOpTime) { - continue; - } - - auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns()); - // Reduce the hash from 64bit down to 32bit, just to allow combinations with murmur3 later - // on. Bit depth not important, we end up just doing integer modulo with this in the end. - // The hash function should provide entropy in the lower bits as it's used in hash tables. - uint32_t hash = static_cast<uint32_t>(hashedNs.hash()); - - // We need to track all types of ops, including type 'n' (these are generated from chunk - // migrations). - if (sessionUpdateTracker) { - if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) { - derivedOps->emplace_back(std::move(*newOplogWrites)); - addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); - } - } - - - // If this entry is part of a multi-oplog-entry transaction, ignore it until the commit. - // We must save it here because we are not guaranteed it has been written to the oplog - // yet. - // We also do this for prepare during initial sync. - if (op.isPartialTransaction() || - (op.shouldPrepare() && _options.mode == OplogApplication::Mode::kInitialSync)) { - auto& partialTxnList = partialTxnOps[*op.getSessionId()]; - // If this operation belongs to an existing partial transaction, partialTxnList - // must contain the previous operations of the transaction. - invariant(partialTxnList.empty() || - partialTxnList.front()->getTxnNumber() == op.getTxnNumber()); - partialTxnList.push_back(&op); - continue; - } - - if (op.getCommandType() == OplogEntry::CommandType::kAbortTransaction) { - auto& partialTxnList = partialTxnOps[*op.getSessionId()]; - partialTxnList.clear(); - } - - if (op.isCrudOpType()) - processCrudOp(opCtx, &op, &hash, &hashedNs, &collPropertiesCache); - // Extract applyOps operations and fill writers with extracted operations using this - // function. - if (op.isTerminalApplyOps()) { - auto logicalSessionId = op.getSessionId(); - // applyOps entries generated by a transaction must have a sessionId and a - // transaction number. - if (logicalSessionId && op.getTxnNumber()) { - // On commit of unprepared transactions, get transactional operations from the - // oplog and fill writers with those operations. - // Flush partialTxnList operations for current transaction. - auto& partialTxnList = partialTxnOps[*logicalSessionId]; - - derivedOps->emplace_back( - readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); - partialTxnList.clear(); - - // Transaction entries cannot have different session updates. - addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); - } else { - // The applyOps entry was not generated as part of a transaction. - invariant(!op.getPrevWriteOpTimeInTransaction()); - - derivedOps->emplace_back(ApplyOps::extractOperations(op)); - - // Nested entries cannot have different session updates. - addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); - } - continue; - } - - // If we see a commitTransaction command that is a part of a prepared transaction during - // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers - // with the extracted operations. - if (op.isPreparedCommit() && (_options.mode == OplogApplication::Mode::kInitialSync)) { - auto logicalSessionId = op.getSessionId(); - auto& partialTxnList = partialTxnOps[*logicalSessionId]; - - derivedOps->emplace_back( - readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); - partialTxnList.clear(); - - addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache); - continue; - } - - addToWriterVector(&op, writerVectors, hash); - } -} - -void SyncTail::fillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps) noexcept { - - SessionUpdateTracker sessionUpdateTracker; - _deriveOpsAndFillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); - - auto newOplogWrites = sessionUpdateTracker.flushAll(); - if (!newOplogWrites.empty()) { - derivedOps->emplace_back(std::move(newOplogWrites)); - _deriveOpsAndFillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); - } -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h deleted file mode 100644 index c8f5353b262..00000000000 --- a/src/mongo/db/repl/sync_tail.h +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <deque> -#include <functional> -#include <memory> - -#include "mongo/base/status.h" -#include "mongo/bson/bsonobj.h" -#include "mongo/db/repl/multiapplier.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplog_applier.h" -#include "mongo/db/repl/oplog_buffer.h" -#include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/repl/replication_consistency_markers.h" -#include "mongo/db/repl/session_update_tracker.h" -#include "mongo/db/repl/storage_interface.h" -#include "mongo/platform/mutex.h" -#include "mongo/util/concurrency/thread_pool.h" - -namespace mongo { - -class Database; -class OperationContext; -struct MultikeyPathInfo; - -namespace repl { -class ReplicationCoordinator; -class OpTime; - -/** - * Used for oplog application on a replica set secondary. - * Primarily used to apply batches of operations fetched from a sync source during steady state - * replication and initial sync. - * - * When used for steady state replication, runs a thread that reads batches of operations from - * an oplog buffer (through the BackgroundSync interface) and applies the batch of operations. - */ -class SyncTail { -public: - /** - * - * Constructs a SyncTail. - * During steady state replication, oplogApplication() obtains batches of operations to apply - * from 'observer'. It is not required to provide 'observer' at construction if we do not plan - * on using oplogApplication(). During the oplog application phase, the batch of operations is - * distributed across writer threads in 'writerPool'. Each writer thread applies its own vector - * of operations using 'func'. The writer thread pool is not owned by us. - */ - SyncTail(OplogApplier::Observer* observer, - StorageInterface* storageInterface, - const OplogApplier::Options& options); - virtual ~SyncTail(); - - /** - * Returns options for oplog application. - */ - const OplogApplier::Options& getOptions() const; - - using BatchLimits = OplogApplier::BatchLimits; - - void fillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps) noexcept; - -private: - OplogApplier::Observer* const _observer; - StorageInterface* const _storageInterface; - - void _deriveOpsAndFillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker) noexcept; - - // Used to configure multiApply() behavior. - const OplogApplier::Options _options; - - // Protects member data of SyncTail. - mutable Mutex _mutex = MONGO_MAKE_LATCH("SyncTail::_mutex"); -}; - -/** - * Applies a batch of operations. - */ -Status syncApply(OperationContext* opCtx, - const OplogEntryBatch& batch, - OplogApplication::Mode oplogApplicationMode); - -/** - * This free function is used by the thread pool workers to write ops to the db. - * This consumes the passed in OperationPtrs and callers should not make any assumptions about the - * state of the container after calling. However, this function cannot modify the pointed-to - * operations because the OperationPtrs container contains const pointers. - */ -Status multiSyncApply(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo); - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp deleted file mode 100644 index 5276f4142cc..00000000000 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ /dev/null @@ -1,1980 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include <algorithm> -#include <memory> -#include <utility> -#include <vector> - -#include "mongo/bson/util/bson_extract.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/collection_options.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/client.h" -#include "mongo/db/commands/feature_compatibility_version_parser.h" -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/logical_session_id_helpers.h" -#include "mongo/db/ops/write_ops.h" -#include "mongo/db/query/internal_plans.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/drop_pending_collection_reaper.h" -#include "mongo/db/repl/idempotency_test_fixture.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplog_applier.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/storage_interface.h" -#include "mongo/db/repl/sync_tail.h" -#include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/db/session_catalog_mongod.h" -#include "mongo/db/session_txn_record_gen.h" -#include "mongo/db/stats/counters.h" -#include "mongo/db/transaction_participant_gen.h" -#include "mongo/platform/mutex.h" -#include "mongo/unittest/death_test.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/clock_source_mock.h" -#include "mongo/util/md5.hpp" -#include "mongo/util/scopeguard.h" -#include "mongo/util/string_map.h" - -namespace mongo { -namespace repl { -namespace { - -/** - * Creates an OplogEntry with given parameters and preset defaults for this test suite. - */ -OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollectionUUID uuid) { - return OplogEntry(OpTime(Timestamp(1, 1), 1), // optime - boost::none, // hash - opType, // opType - nss, // namespace - uuid, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - BSON("_id" << 0), // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none); // post-image optime -} - -/** - * Testing-only SyncTail. - */ -class SyncTailForTest : public SyncTail { -public: - SyncTailForTest(); -}; - -SyncTailForTest::SyncTailForTest() - : SyncTail(nullptr, // observer - nullptr, // storage interface - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)) {} - -/** - * Creates collection options suitable for oplog. - */ -CollectionOptions createOplogCollectionOptions() { - CollectionOptions options; - options.capped = true; - options.cappedSize = 64 * 1024 * 1024LL; - options.autoIndexId = CollectionOptions::NO; - return options; -} - -/** - * Create test collection. - * Returns collection. - */ -void createCollection(OperationContext* opCtx, - const NamespaceString& nss, - const CollectionOptions& options) { - writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { - Lock::DBLock dblk(opCtx, nss.db(), MODE_X); - OldClientContext ctx(opCtx, nss.ns()); - auto db = ctx.db(); - ASSERT_TRUE(db); - mongo::WriteUnitOfWork wuow(opCtx); - auto coll = db->createCollection(opCtx, nss, options); - ASSERT_TRUE(coll); - wuow.commit(); - }); -} - - -/** - * Create test collection with UUID. - */ -auto createCollectionWithUuid(OperationContext* opCtx, const NamespaceString& nss) { - CollectionOptions options; - options.uuid = UUID::gen(); - createCollection(opCtx, nss, options); - return options.uuid.get(); -} - -/** - * Create test database. - */ -void createDatabase(OperationContext* opCtx, StringData dbName) { - Lock::GlobalWrite globalLock(opCtx); - bool justCreated; - auto databaseHolder = DatabaseHolder::get(opCtx); - auto db = databaseHolder->openDb(opCtx, dbName, &justCreated); - ASSERT_TRUE(db); - ASSERT_TRUE(justCreated); -} - -/** - * Returns true if collection exists. - */ -bool collectionExists(OperationContext* opCtx, const NamespaceString& nss) { - return AutoGetCollectionForRead(opCtx, nss).getCollection() != nullptr; -} - -auto parseFromOplogEntryArray(const BSONObj& obj, int elem) { - BSONElement tsArray; - Status status = - bsonExtractTypedField(obj, OpTime::kTimestampFieldName, BSONType::Array, &tsArray); - ASSERT_OK(status); - - BSONElement termArray; - status = bsonExtractTypedField(obj, OpTime::kTermFieldName, BSONType::Array, &termArray); - ASSERT_OK(status); - - return OpTime(tsArray.Array()[elem].timestamp(), termArray.Array()[elem].Long()); -}; - -TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) { - NamespaceString nss("test.t"); - auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), - ExceptionFor<ErrorCodes::NamespaceNotFound>); -} - -TEST_F(SyncTailTest, SyncApplyDeleteDocumentDatabaseMissing) { - NamespaceString otherNss("test.othername"); - auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); -} - -TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) { - const NamespaceString nss("test.t"); - createDatabase(_opCtx.get(), nss.db()); - NamespaceString otherNss(nss.getSisterNS("othername")); - auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, kUuid); - ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), - ExceptionFor<ErrorCodes::NamespaceNotFound>); -} - -TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) { - const NamespaceString nss("test.t"); - createDatabase(_opCtx.get(), nss.db()); - NamespaceString otherNss(nss.getSisterNS("othername")); - auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, kUuid); - _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); -} - -TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) { - const NamespaceString nss("test.t"); - createDatabase(_opCtx.get(), nss.db()); - // Even though the collection doesn't exist, this is handled in the actual application function, - // which in the case of this test just ignores such errors. This tests mostly that we don't - // implicitly create the collection and lock the database in MODE_X. - auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), - ExceptionFor<ErrorCodes::NamespaceNotFound>); - ASSERT_FALSE(collectionExists(_opCtx.get(), nss)); -} - -TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionMissing) { - const NamespaceString nss("test.t"); - createDatabase(_opCtx.get(), nss.db()); - // Even though the collection doesn't exist, this is handled in the actual application function, - // which in the case of this test just ignores such errors. This tests mostly that we don't - // implicitly create the collection and lock the database in MODE_X. - auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); - ASSERT_FALSE(collectionExists(_opCtx.get(), nss)); -} - -TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionExists) { - const NamespaceString nss("test.t"); - createCollection(_opCtx.get(), nss, {}); - auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); -} - -TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionExists) { - const NamespaceString nss("test.t"); - createCollection(_opCtx.get(), nss, {}); - auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); -} - -TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLockedByUUID) { - const NamespaceString nss("test.t"); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - // Test that the collection to lock is determined by the UUID and not the 'ns' field. - NamespaceString otherNss(nss.getSisterNS("othername")); - auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, uuid); - _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); -} - -TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLockedByUUID) { - const NamespaceString nss("test.t"); - CollectionOptions options; - options.uuid = kUuid; - createCollection(_opCtx.get(), nss, options); - - // Test that the collection to lock is determined by the UUID and not the 'ns' field. - NamespaceString otherNss(nss.getSisterNS("othername")); - auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, options.uuid); - _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); -} - -TEST_F(SyncTailTest, SyncApplyCommand) { - NamespaceString nss("test.t"); - auto op = - BSON("op" - << "c" - << "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o" - << BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen()); - bool applyCmdCalled = false; - _opObserver->onCreateCollectionFn = [&](OperationContext* opCtx, - Collection*, - const NamespaceString& collNss, - const CollectionOptions&, - const BSONObj&) { - applyCmdCalled = true; - ASSERT_TRUE(opCtx); - ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.db(), MODE_X)); - ASSERT_EQUALS(nss, collNss); - return Status::OK(); - }; - auto entry = OplogEntry(op); - ASSERT_OK(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kInitialSync)); - ASSERT_TRUE(applyCmdCalled); -} - -TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - - MultiApplier::OperationPtrs ops = {&op}; - WorkerMultikeyPathInfo pathInfo; - SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - // Collection should be created after syncApply() processes operation. - ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); -} - -void testWorkerMultikeyPaths(OperationContext* opCtx, - const OplogEntry& op, - unsigned long numPaths) { - SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); - WorkerMultikeyPathInfo pathInfo; - MultiApplier::OperationPtrs ops = {&op}; - ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo)); - ASSERT_EQ(pathInfo.size(), numPaths); -} - -TEST_F(SyncTailTest, MultiSyncApplyAddsWorkerMultikeyPathInfoOnInsert) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - - { - auto op = makeCreateCollectionOplogEntry( - {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - { - auto keyPattern = BSON("a" << 1); - auto op = makeCreateIndexOplogEntry( - {Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern, kUuid); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - { - auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); - auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, doc); - testWorkerMultikeyPaths(_opCtx.get(), op, 1UL); - } -} - -TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - - { - auto op = makeCreateCollectionOplogEntry( - {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - - { - auto keyPattern = BSON("a" << 1); - auto op = makeCreateIndexOplogEntry( - {Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern, kUuid); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - - { - auto keyPattern = BSON("b" << 1); - auto op = makeCreateIndexOplogEntry( - {Timestamp(Seconds(3), 0), 1LL}, nss, "b_1", keyPattern, kUuid); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - - { - auto docA = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); - auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA); - auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7)); - auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); - SyncTail syncTail( - nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); - WorkerMultikeyPathInfo pathInfo; - MultiApplier::OperationPtrs ops = {&opA, &opB}; - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - ASSERT_EQ(pathInfo.size(), 2UL); - } -} - -TEST_F(SyncTailTest, MultiSyncApplyDoesNotAddWorkerMultikeyPathInfoOnCreateIndex) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - - { - auto op = makeCreateCollectionOplogEntry( - {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - - { - auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); - auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - - { - auto keyPattern = BSON("a" << 1); - auto op = makeCreateIndexOplogEntry( - {Timestamp(Seconds(3), 0), 1LL}, nss, "a_1", keyPattern, kUuid); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } - - { - auto doc = BSON("_id" << 2 << "a" << BSON_ARRAY(6 << 7)); - auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc); - testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); - } -} - -TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_SECONDARY)); - NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); - - auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - SyncTail syncTail(nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); - MultiApplier::OperationPtrs ops = {&op}; - ASSERT_EQUALS(ErrorCodes::InvalidOptions, - multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr)); -} - -TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { - onInsertsCalled = true; - ASSERT_FALSE(opCtx->writesAreReplicated()); - ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); - ASSERT_TRUE(documentValidationDisabled(opCtx)); - return Status::OK(); - }; - createCollectionWithUuid(_opCtx.get(), nss); - auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0)); - ASSERT_OK(runOpSteadyState(op)); - ASSERT(onInsertsCalled); -} - -TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - // Delete operation without _id in 'o' field. - auto op = makeDeleteDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, {}); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, runOpSteadyState(op)); -} - -TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) { - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) { - onInsertsCalled = true; - uasserted(ErrorCodes::OperationFailed, ""); - MONGO_UNREACHABLE; - }; - createCollectionWithUuid(_opCtx.get(), nss); - auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0)); - ASSERT_EQUALS(ErrorCodes::OperationFailed, runOpSteadyState(op)); - ASSERT(onInsertsCalled); -} - -TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplying) { - NamespaceString nss1("test.t1"); - NamespaceString nss2("test.t2"); - NamespaceString nss3("test.t3"); - - const Seconds s(1); - unsigned int i = 1; - auto op1 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss1, BSON("_id" << 1)); - auto op2 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss1, BSON("_id" << 2)); - auto op3 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss2, BSON("_id" << 3)); - auto op4 = makeInsertDocumentOplogEntry({Timestamp(s, i++), 1LL}, nss3, BSON("_id" << 4)); - - std::vector<NamespaceString> nssInserted; - std::vector<BSONObj> docsInserted; - bool onInsertsCalled = false; - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - onInsertsCalled = true; - for (const auto& doc : docs) { - nssInserted.push_back(nss); - docsInserted.push_back(doc); - } - }; - - createCollectionWithUuid(_opCtx.get(), nss1); - createCollectionWithUuid(_opCtx.get(), nss2); - createCollectionWithUuid(_opCtx.get(), nss3); - - ASSERT_OK(runOpsSteadyState({op4, op1, op3, op2})); - - ASSERT_EQUALS(4U, nssInserted.size()); - ASSERT_EQUALS(nss1, nssInserted[0]); - ASSERT_EQUALS(nss1, nssInserted[1]); - ASSERT_EQUALS(nss2, nssInserted[2]); - ASSERT_EQUALS(nss3, nssInserted[3]); - - ASSERT_EQUALS(4U, docsInserted.size()); - ASSERT_BSONOBJ_EQ(op1.getObject(), docsInserted[0]); - ASSERT_BSONOBJ_EQ(op2.getObject(), docsInserted[1]); - ASSERT_BSONOBJ_EQ(op3.getObject(), docsInserted[2]); - ASSERT_BSONOBJ_EQ(op4.getObject(), docsInserted[3]); - - ASSERT(onInsertsCalled); -} - -TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplying) { - int seconds = 1; - auto makeOp = [&seconds](const NamespaceString& nss) { - return makeInsertDocumentOplogEntry( - {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); - }; - NamespaceString nss1("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); - NamespaceString nss2("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_2"); - auto createOp1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss1); - auto createOp2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss2); - auto insertOp1a = makeOp(nss1); - auto insertOp1b = makeOp(nss1); - auto insertOp2a = makeOp(nss2); - auto insertOp2b = makeOp(nss2); - - // Each element in 'docsInserted' is a grouped insert operation. - std::vector<std::vector<BSONObj>> docsInserted; - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - docsInserted.push_back(docs); - }; - - MultiApplier::Operations ops = { - createOp1, createOp2, insertOp1a, insertOp2a, insertOp1b, insertOp2b}; - ASSERT_OK(runOpsSteadyState(ops)); - - ASSERT_EQUALS(2U, docsInserted.size()); - - // Check grouped insert operations in namespace "nss1". - const auto& group1 = docsInserted[0]; - ASSERT_EQUALS(2U, group1.size()); - ASSERT_BSONOBJ_EQ(insertOp1a.getObject(), group1[0]); - ASSERT_BSONOBJ_EQ(insertOp1b.getObject(), group1[1]); - - // Check grouped insert operations in namespace "nss2". - const auto& group2 = docsInserted[1]; - ASSERT_EQUALS(2U, group2.size()); - ASSERT_BSONOBJ_EQ(insertOp2a.getObject(), group2[0]); - ASSERT_BSONOBJ_EQ(insertOp2b.getObject(), group2[1]); -} - -TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchCountWhenGroupingInsertOperation) { - int seconds = 1; - auto makeOp = [&seconds](const NamespaceString& nss) { - return makeInsertDocumentOplogEntry( - {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); - }; - NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); - auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); - - // Generate operations to apply: - // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} - std::size_t limit = 64; - MultiApplier::Operations insertOps; - for (std::size_t i = 0; i < limit + 1; ++i) { - insertOps.push_back(makeOp(nss)); - } - MultiApplier::Operations operationsToApply; - operationsToApply.push_back(createOp); - std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); - - // Each element in 'docsInserted' is a grouped insert operation. - std::vector<std::vector<BSONObj>> docsInserted; - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - docsInserted.push_back(docs); - }; - - ASSERT_OK(runOpsSteadyState(operationsToApply)); - - // multiSyncApply should combine operations as follows: - // {create}, {grouped_insert}, {insert_(limit+1)} - // Ignore {create} since we are only tracking inserts. - ASSERT_EQUALS(2U, docsInserted.size()); - - const auto& groupedInsertDocuments = docsInserted[0]; - ASSERT_EQUALS(limit, groupedInsertDocuments.size()); - for (std::size_t i = 0; i < limit; ++i) { - const auto& insertOp = insertOps[i]; - ASSERT_BSONOBJ_EQ(insertOp.getObject(), groupedInsertDocuments[i]); - } - - // (limit + 1)-th insert operations should not be included in group of first (limit) inserts. - const auto& singleInsertDocumentGroup = docsInserted[1]; - ASSERT_EQUALS(1U, singleInsertDocumentGroup.size()); - ASSERT_BSONOBJ_EQ(insertOps.back().getObject(), singleInsertDocumentGroup[0]); -} - -// Create an 'insert' oplog operation of an approximate size in bytes. The '_id' of the oplog entry -// and its optime in seconds are given by the 'id' argument. -OplogEntry makeSizedInsertOp(const NamespaceString& nss, int size, int id) { - return makeInsertDocumentOplogEntry({Timestamp(Seconds(id), 0), 1LL}, - nss, - BSON("_id" << id << "data" << std::string(size, '*'))); -}; - -TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchSizeWhenGroupingInsertOperations) { - int seconds = 1; - NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); - - // Create a sequence of insert ops that are too large to fit in one group. - int maxBatchSize = write_ops::insertVectorMaxBytes; - int opsPerBatch = 3; - int opSize = maxBatchSize / opsPerBatch - 500; // Leave some room for other oplog fields. - - // Create the insert ops. - MultiApplier::Operations insertOps; - int numOps = 4; - for (int i = 0; i < numOps; i++) { - insertOps.push_back(makeSizedInsertOp(nss, opSize, seconds++)); - } - - MultiApplier::Operations operationsToApply; - operationsToApply.push_back(createOp); - std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); - - // Each element in 'docsInserted' is a grouped insert operation. - std::vector<std::vector<BSONObj>> docsInserted; - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - docsInserted.push_back(docs); - }; - - // Apply the ops. - ASSERT_OK(runOpsSteadyState(operationsToApply)); - - // Applied ops should be as follows: - // [ {create}, INSERT_GROUP{insert 1, insert 2, insert 3}, {insert 4} ] - // Ignore {create} since we are only tracking inserts. - ASSERT_EQUALS(2U, docsInserted.size()); - - // Make sure the insert group was created correctly. - const auto& groupedInsertOpArray = docsInserted[0]; - ASSERT_EQUALS(std::size_t(opsPerBatch), groupedInsertOpArray.size()); - for (int i = 0; i < opsPerBatch; ++i) { - ASSERT_BSONOBJ_EQ(insertOps[i].getObject(), groupedInsertOpArray[i]); - } - - // Check that the last op was applied individually. - const auto& singleInsertDocumentGroup = docsInserted[1]; - ASSERT_EQUALS(1U, singleInsertDocumentGroup.size()); - ASSERT_BSONOBJ_EQ(insertOps[3].getObject(), singleInsertDocumentGroup[0]); -} - -TEST_F(SyncTailTest, MultiSyncApplyAppliesOpIndividuallyWhenOpIndividuallyExceedsBatchSize) { - int seconds = 1; - NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); - - int maxBatchSize = write_ops::insertVectorMaxBytes; - // Create an insert op that exceeds the maximum batch size by itself. - auto insertOpLarge = makeSizedInsertOp(nss, maxBatchSize, seconds++); - auto insertOpSmall = makeSizedInsertOp(nss, 100, seconds++); - - MultiApplier::Operations operationsToApply = {createOp, insertOpLarge, insertOpSmall}; - - // Each element in 'docsInserted' is a grouped insert operation. - std::vector<std::vector<BSONObj>> docsInserted; - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - docsInserted.push_back(docs); - }; - - // Apply the ops. - ASSERT_OK(runOpsSteadyState(operationsToApply)); - - // Applied ops should be as follows: - // [ {create}, {large insert} {small insert} ] - // Ignore {create} since we are only tracking inserts. - ASSERT_EQUALS(2U, docsInserted.size()); - - ASSERT_EQUALS(1U, docsInserted[0].size()); - ASSERT_BSONOBJ_EQ(insertOpLarge.getObject(), docsInserted[0][0]); - - ASSERT_EQUALS(1U, docsInserted[1].size()); - ASSERT_BSONOBJ_EQ(insertOpSmall.getObject(), docsInserted[1][0]); -} - -TEST_F(SyncTailTest, MultiSyncApplyAppliesInsertOpsIndividuallyWhenUnableToCreateGroupByNamespace) { - int seconds = 1; - auto makeOp = [&seconds](const NamespaceString& nss) { - return makeInsertDocumentOplogEntry( - {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); - }; - - auto testNs = "test." + _agent.getSuiteName() + "_" + _agent.getTestName(); - - // Create a sequence of 3 'insert' ops that can't be grouped because they are from different - // namespaces. - MultiApplier::Operations operationsToApply = {makeOp(NamespaceString(testNs + "_1")), - makeOp(NamespaceString(testNs + "_2")), - makeOp(NamespaceString(testNs + "_3"))}; - - for (const auto& oplogEntry : operationsToApply) { - createCollectionWithUuid(_opCtx.get(), oplogEntry.getNss()); - } - - // Each element in 'docsInserted' is a grouped insert operation. - std::vector<std::vector<BSONObj>> docsInserted; - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - docsInserted.push_back(docs); - }; - - // Apply the ops. - ASSERT_OK(runOpsSteadyState(operationsToApply)); - - // Applied ops should be as follows i.e. no insert grouping: - // [{insert 1}, {insert 2}, {insert 3}] - ASSERT_EQ(operationsToApply.size(), docsInserted.size()); - for (std::size_t i = 0; i < operationsToApply.size(); i++) { - const auto& group = docsInserted[i]; - ASSERT_EQUALS(1U, group.size()) << i; - ASSERT_BSONOBJ_EQ(operationsToApply[i].getObject(), group[0]); - } -} - -TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { - int seconds = 1; - auto makeOp = [&seconds](const NamespaceString& nss) { - return makeInsertDocumentOplogEntry( - {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); - }; - NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); - auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); - - // Generate operations to apply: - // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} - std::size_t limit = 64; - MultiApplier::Operations insertOps; - for (std::size_t i = 0; i < limit + 1; ++i) { - insertOps.push_back(makeOp(nss)); - } - MultiApplier::Operations operationsToApply; - operationsToApply.push_back(createOp); - std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); - - // Each element in 'docsInserted' is a grouped insert operation. - std::vector<std::vector<BSONObj>> docsInserted; - std::size_t numFailedGroupedInserts = 0; - _opObserver->onInsertsFn = - [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - // Reject grouped insert operations. - if (docs.size() > 1U) { - numFailedGroupedInserts++; - uasserted(ErrorCodes::OperationFailed, "grouped inserts not supported"); - } - docsInserted.push_back(docs); - }; - - ASSERT_OK(runOpsSteadyState(operationsToApply)); - - // On failing to apply the grouped insert operation, multiSyncApply should apply the operations - // as given in "operationsToApply": - // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} - // Ignore {create} since we are only tracking inserts. - ASSERT_EQUALS(limit + 1, docsInserted.size()); - - for (std::size_t i = 0; i < limit + 1; ++i) { - const auto& insertOp = insertOps[i]; - const auto& group = docsInserted[i]; - ASSERT_EQUALS(1U, group.size()) << i; - ASSERT_BSONOBJ_EQ(insertOp.getObject(), group[0]); - } - - // Ensure that multiSyncApply does not attempt to group remaining operations in first failed - // grouped insert operation. - ASSERT_EQUALS(1U, numFailedGroupedInserts); -} - -TEST_F(SyncTailTest, MultiSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) { - SyncTailForTest syncTail; - NamespaceString nss("test.t"); - { - Lock::GlobalWrite globalLock(_opCtx.get()); - bool justCreated = false; - auto databaseHolder = DatabaseHolder::get(_opCtx.get()); - auto db = databaseHolder->openDb(_opCtx.get(), nss.db(), &justCreated); - ASSERT_TRUE(db); - ASSERT_TRUE(justCreated); - } - auto op = makeUpdateDocumentOplogEntry( - {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); - MultiApplier::OperationPtrs ops = {&op}; - WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - - // Since the document was missing when we cloned data from the sync source, the collection - // referenced by the failed operation should not be automatically created. - ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); -} - -TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitialSync) { - BSONObj emptyDoc; - SyncTailForTest syncTail; - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); - auto doc1 = BSON("_id" << 1); - auto doc2 = BSON("_id" << 2); - auto doc3 = BSON("_id" << 3); - auto op0 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc1); - auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, badNss, doc2); - auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); - MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; - WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - - CollectionReader collectionReader(_opCtx.get(), nss); - ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); - ASSERT_BSONOBJ_EQ(doc3, unittest::assertGet(collectionReader.next())); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, collectionReader.next().getStatus()); -} - -TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) { - BSONObj emptyDoc; - SyncTailForTest syncTail; - NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); - auto doc1 = BSON("_id" << 1); - auto keyPattern = BSON("a" << 1); - auto doc3 = BSON("_id" << 3); - auto op0 = - makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("uuid" << kUuid)); - auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc1); - auto op2 = makeCreateIndexOplogEntry( - {Timestamp(Seconds(3), 0), 1LL}, badNss, "a_1", keyPattern, kUuid); - auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); - MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; - WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - - CollectionReader collectionReader(_opCtx.get(), nss); - ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); - ASSERT_BSONOBJ_EQ(doc3, unittest::assertGet(collectionReader.next())); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, collectionReader.next().getStatus()); - - // 'badNss' collection should not be implicitly created while attempting to create an index. - ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), badNss).getCollection()); -} - -TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnUpdate) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); - auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); - auto indexOp = - buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3), kUuid); - - auto ops = {insertOp, updateOp, indexOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 16755); -} - -TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnIndexing) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto indexOp = - buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3), kUuid); - auto dropIndexOp = dropIndex("loc_index", kUuid); - auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); - - auto ops = {indexOp, dropIndexOp, insertOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 16755); -} - -TEST_F(IdempotencyTest, Geo2dIndex) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto insertOp = insert(fromjson("{_id: 1, loc: [1]}")); - auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); - auto indexOp = buildIndex(fromjson("{loc: '2d'}"), BSONObj(), kUuid); - - auto ops = {insertOp, updateOp, indexOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 13068); -} - -TEST_F(IdempotencyTest, UniqueKeyIndex) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto insertOp = insert(fromjson("{_id: 1, x: 5}")); - auto updateOp = update(1, fromjson("{$set: {x: 6}}")); - auto insertOp2 = insert(fromjson("{_id: 2, x: 5}")); - auto indexOp = buildIndex(fromjson("{x: 1}"), fromjson("{unique: true}"), kUuid); - - auto ops = {insertOp, updateOp, insertOp2, indexOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), ErrorCodes::DuplicateKey); -} - -TEST_F(IdempotencyTest, ParallelArrayError) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - ASSERT_OK(runOpInitialSync(insert(fromjson("{_id: 1}")))); - - auto updateOp1 = update(1, fromjson("{$set: {x: [1, 2]}}")); - auto updateOp2 = update(1, fromjson("{$set: {x: 1}}")); - auto updateOp3 = update(1, fromjson("{$set: {y: [3, 4]}}")); - auto indexOp = buildIndex(fromjson("{x: 1, y: 1}"), BSONObj(), kUuid); - - auto ops = {updateOp1, updateOp2, updateOp3, indexOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), ErrorCodes::CannotIndexParallelArrays); -} - -TEST_F(IdempotencyTest, IndexWithDifferentOptions) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - ASSERT_OK(runOpInitialSync(insert(fromjson("{_id: 1, x: 'hi'}")))); - - auto indexOp1 = - buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'spanish'}"), kUuid); - auto dropIndexOp = dropIndex("x_index", kUuid); - auto indexOp2 = - buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'english'}"), kUuid); - - auto ops = {indexOp1, dropIndexOp, indexOp2}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), ErrorCodes::IndexOptionsConflict); -} - -TEST_F(IdempotencyTest, TextIndexDocumentHasNonStringLanguageField) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 1}")); - auto updateOp = update(1, fromjson("{$unset: {language: 1}}")); - auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj(), kUuid); - - auto ops = {insertOp, updateOp, indexOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 17261); -} - -TEST_F(IdempotencyTest, InsertDocumentWithNonStringLanguageFieldWhenTextIndexExists) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj(), kUuid); - auto dropIndexOp = dropIndex("x_index", kUuid); - auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 1}")); - - auto ops = {indexOp, dropIndexOp, insertOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 17261); -} - -TEST_F(IdempotencyTest, TextIndexDocumentHasNonStringLanguageOverrideField) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', y: 1}")); - auto updateOp = update(1, fromjson("{$unset: {y: 1}}")); - auto indexOp = buildIndex(fromjson("{x: 'text'}"), fromjson("{language_override: 'y'}"), kUuid); - - auto ops = {insertOp, updateOp, indexOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 17261); -} - -TEST_F(IdempotencyTest, InsertDocumentWithNonStringLanguageOverrideFieldWhenTextIndexExists) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto indexOp = buildIndex(fromjson("{x: 'text'}"), fromjson("{language_override: 'y'}"), kUuid); - auto dropIndexOp = dropIndex("x_index", kUuid); - auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', y: 1}")); - - auto ops = {indexOp, dropIndexOp, insertOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 17261); -} - -TEST_F(IdempotencyTest, TextIndexDocumentHasUnknownLanguage) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 'bad'}")); - auto updateOp = update(1, fromjson("{$unset: {language: 1}}")); - auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj(), kUuid); - - auto ops = {insertOp, updateOp, indexOp}; - testOpsAreIdempotent(ops); - - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - auto status = runOpsInitialSync(ops); - ASSERT_EQ(status.code(), 17262); -} - -TEST_F(IdempotencyTest, CreateCollectionWithValidation) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - const BSONObj uuidObj = kUuid.toBSON(); - - auto runOpsAndValidate = [this, uuidObj]() { - auto options1 = fromjson("{'validator' : {'phone' : {'$type' : 'string' } } }"); - options1 = options1.addField(uuidObj.firstElement()); - auto createColl1 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options1); - auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); - - auto options2 = fromjson("{'validator' : {'phone' : {'$type' : 'number' } } }"); - options2 = options2.addField(uuidObj.firstElement()); - auto createColl2 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options2); - - auto ops = {createColl1, dropColl, createColl2}; - ASSERT_OK(runOpsInitialSync(ops)); - auto state = validate(); - - return state; - }; - - auto state1 = runOpsAndValidate(); - auto state2 = runOpsAndValidate(); - ASSERT_EQUALS(state1, state2); -} - -TEST_F(IdempotencyTest, CreateCollectionWithCollation) { - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - CollectionUUID uuid = UUID::gen(); - - auto runOpsAndValidate = [this, uuid]() { - auto insertOp1 = insert(fromjson("{ _id: 'foo' }")); - auto insertOp2 = insert(fromjson("{ _id: 'Foo', x: 1 }")); - auto updateOp = update("foo", BSON("$set" << BSON("x" << 2))); - auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); - auto options = BSON("collation" - << BSON("locale" - << "en" - << "caseLevel" << false << "caseFirst" - << "off" - << "strength" << 1 << "numericOrdering" << false << "alternate" - << "non-ignorable" - << "maxVariable" - << "punct" - << "normalization" << false << "backwards" << false << "version" - << "57.1") - << "uuid" << uuid); - auto createColl = makeCreateCollectionOplogEntry(nextOpTime(), nss, options); - - // We don't drop and re-create the collection since we don't have ways - // to wait until second-phase drop to completely finish. - auto ops = {createColl, insertOp1, insertOp2, updateOp}; - ASSERT_OK(runOpsInitialSync(ops)); - auto state = validate(); - - return state; - }; - - auto state1 = runOpsAndValidate(); - auto state2 = runOpsAndValidate(); - ASSERT_EQUALS(state1, state2); -} - -TEST_F(IdempotencyTest, CreateCollectionWithIdIndex) { - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - CollectionUUID uuid = kUuid; - - auto options1 = BSON("idIndex" << BSON("key" << fromjson("{_id: 1}") << "name" - << "_id_" - << "v" << 2) - << "uuid" << uuid); - auto createColl1 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options1); - ASSERT_OK(runOpInitialSync(createColl1)); - - auto runOpsAndValidate = [this, uuid]() { - auto insertOp = insert(BSON("_id" << Decimal128(1))); - auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); - auto createColl2 = createCollection(uuid); - - auto ops = {insertOp, dropColl, createColl2}; - ASSERT_OK(runOpsInitialSync(ops)); - auto state = validate(); - - return state; - }; - - auto state1 = runOpsAndValidate(); - auto state2 = runOpsAndValidate(); - ASSERT_EQUALS(state1, state2); -} - -TEST_F(IdempotencyTest, CreateCollectionWithView) { - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - CollectionOptions options; - options.uuid = kUuid; - - // Create data collection - ASSERT_OK(runOpInitialSync(createCollection())); - // Create "system.views" collection - auto viewNss = NamespaceString(nss.db(), "system.views"); - ASSERT_OK( - runOpInitialSync(makeCreateCollectionOplogEntry(nextOpTime(), viewNss, options.toBSON()))); - - auto viewDoc = BSON("_id" << NamespaceString(nss.db(), "view").ns() << "viewOn" << nss.coll() - << "pipeline" << fromjson("[ { '$project' : { 'x' : 1 } } ]")); - auto insertViewOp = makeInsertDocumentOplogEntry(nextOpTime(), viewNss, viewDoc); - auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); - - auto ops = {insertViewOp, dropColl}; - testOpsAreIdempotent(ops); -} - -TEST_F(IdempotencyTest, CollModNamespaceNotFound) { - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - ASSERT_OK(runOpInitialSync( - buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600), kUuid))); - - auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); - auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); - auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd, kUuid); - auto dropCollOp = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll()), kUuid); - - auto ops = {collModOp, dropCollOp}; - testOpsAreIdempotent(ops); -} - -TEST_F(IdempotencyTest, CollModIndexNotFound) { - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - ASSERT_OK(runOpInitialSync(createCollection(kUuid))); - ASSERT_OK(runOpInitialSync( - buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600), kUuid))); - - auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); - auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); - auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd, kUuid); - auto dropIndexOp = dropIndex("createdAt_index", kUuid); - - auto ops = {collModOp, dropIndexOp}; - testOpsAreIdempotent(ops); -} - -TEST_F(SyncTailTest, FailOnDropFCVCollection) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - auto fcvNS(NamespaceString::kServerConfigurationNamespace); - auto cmd = BSON("drop" << fcvNS.coll()); - auto op = makeCommandOplogEntry(nextOpTime(), fcvNS, cmd); - ASSERT_EQUALS(runOpInitialSync(op), ErrorCodes::OplogOperationUnsupported); -} - -TEST_F(SyncTailTest, FailOnInsertFCVDocument) { - auto fcvNS(NamespaceString::kServerConfigurationNamespace); - ::mongo::repl::createCollection(_opCtx.get(), fcvNS, CollectionOptions()); - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - auto op = makeInsertDocumentOplogEntry( - nextOpTime(), fcvNS, BSON("_id" << FeatureCompatibilityVersionParser::kParameterName)); - ASSERT_EQUALS(runOpInitialSync(op), ErrorCodes::OplogOperationUnsupported); -} - -TEST_F(IdempotencyTest, InsertToFCVCollectionBesidesFCVDocumentSucceeds) { - auto fcvNS(NamespaceString::kServerConfigurationNamespace); - ::mongo::repl::createCollection(_opCtx.get(), fcvNS, CollectionOptions()); - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - auto op = makeInsertDocumentOplogEntry(nextOpTime(), - fcvNS, - BSON("_id" - << "other")); - ASSERT_OK(runOpInitialSync(op)); -} - -TEST_F(IdempotencyTest, DropDatabaseSucceeds) { - // Choose `system.profile` so the storage engine doesn't expect the drop to be timestamped. - auto ns = NamespaceString("foo.system.profile"); - ::mongo::repl::createCollection(_opCtx.get(), ns, CollectionOptions()); - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - auto op = makeCommandOplogEntry(nextOpTime(), ns, BSON("dropDatabase" << 1)); - ASSERT_OK(runOpInitialSync(op)); -} - -TEST_F(SyncTailTest, DropDatabaseSucceedsInRecovering) { - // Choose `system.profile` so the storage engine doesn't expect the drop to be timestamped. - auto ns = NamespaceString("foo.system.profile"); - ::mongo::repl::createCollection(_opCtx.get(), ns, CollectionOptions()); - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - auto op = makeCommandOplogEntry(nextOpTime(), ns, BSON("dropDatabase" << 1)); - ASSERT_OK(runOpSteadyState(op)); -} - -TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) { - // This duration is greater than "slowMS", so the op would be considered slow. - auto applyDuration = serverGlobalParams.slowMS * 10; - getServiceContext()->setFastClockSource( - std::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration))); - - // We are inserting into an existing collection. - const NamespaceString nss("test.t"); - createCollection(_opCtx.get(), nss, {}); - auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - - startCapturingLogMessages(); - ASSERT_OK(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary)); - - // Use a builder for easier escaping. We expect the operation to be logged. - StringBuilder expected; - expected << "applied op: CRUD { ts: Timestamp(1, 1), t: 1, v: 2, op: \"i\", ns: \"test.t\", " - "wall: new Date(0), o: " - "{ _id: 0 } }, took " - << applyDuration << "ms"; - ASSERT_EQUALS(1, countLogLinesContaining(expected.str())); -} - -TEST_F(SyncTailTest, DoNotLogSlowOpApplicationWhenFailed) { - // This duration is greater than "slowMS", so the op would be considered slow. - auto applyDuration = serverGlobalParams.slowMS * 10; - getServiceContext()->setFastClockSource( - std::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration))); - - // We are trying to insert into a non-existing database. - NamespaceString nss("test.t"); - auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - - startCapturingLogMessages(); - ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary), - ExceptionFor<ErrorCodes::NamespaceNotFound>); - - // Use a builder for easier escaping. We expect the operation to *not* be logged - // even thought it was slow, since we couldn't apply it successfully. - StringBuilder expected; - expected << "applied op: CRUD { op: \"i\", ns: \"test.t\", o: { _id: 0 }, ts: Timestamp(1, 1), " - "t: 1, h: 1, v: 2 }, took " - << applyDuration << "ms"; - ASSERT_EQUALS(0, countLogLinesContaining(expected.str())); -} - -TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) { - // This duration is below "slowMS", so the op would *not* be considered slow. - auto applyDuration = serverGlobalParams.slowMS / 10; - getServiceContext()->setFastClockSource( - std::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration))); - - // We are inserting into an existing collection. - const NamespaceString nss("test.t"); - createCollection(_opCtx.get(), nss, {}); - auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - - startCapturingLogMessages(); - ASSERT_OK(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary)); - - // Use a builder for easier escaping. We expect the operation to *not* be logged, - // since it wasn't slow to apply. - StringBuilder expected; - expected << "applied op: CRUD { op: \"i\", ns: \"test.t\", o: { _id: 0 }, ts: Timestamp(1, 1), " - "t: 1, h: 1, v: 2 }, took " - << applyDuration << "ms"; - ASSERT_EQUALS(0, countLogLinesContaining(expected.str())); -} - -TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) { - // Create a BSON "emptycapped" command. - auto emptyCappedCmd = BSON("emptycapped" << nss.coll()); - - // Create an "emptycapped" oplog entry. - auto emptyCappedOp = makeCommandOplogEntry(nextOpTime(), nss, emptyCappedCmd); - - // Ensure that NamespaceNotFound is acceptable. - ASSERT_OK(runOpInitialSync(emptyCappedOp)); - - AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); - - // Ensure that autoColl.getCollection() and autoColl.getDb() are both null. - ASSERT_FALSE(autoColl.getCollection()); - ASSERT_FALSE(autoColl.getDb()); -} - -TEST_F(IdempotencyTest, ConvertToCappedNamespaceNotFound) { - // Create a BSON "convertToCapped" command. - auto convertToCappedCmd = BSON("convertToCapped" << nss.coll()); - - // Create a "convertToCapped" oplog entry. - auto convertToCappedOp = makeCommandOplogEntry(nextOpTime(), nss, convertToCappedCmd); - - // Ensure that NamespaceNotFound is acceptable. - ASSERT_OK(runOpInitialSync(convertToCappedOp)); - - AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); - - // Ensure that autoColl.getCollection() and autoColl.getDb() are both null. - ASSERT_FALSE(autoColl.getCollection()); - ASSERT_FALSE(autoColl.getDb()); -} - -class IdempotencyTestTxns : public IdempotencyTest {}; - -// Document used by transaction idempotency tests. -const BSONObj doc = fromjson("{_id: 1}"); -const BSONObj doc2 = fromjson("{_id: 2}"); - -TEST_F(IdempotencyTestTxns, CommitUnpreparedTransaction) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto commitOp = commitUnprepared( - lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); -} - -TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionDataPartiallyApplied) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - NamespaceString nss2("test.coll2"); - auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); - - auto commitOp = commitUnprepared(lsid, - txnNum, - StmtId(0), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc) - << makeInsertApplyOpsEntry(nss2, uuid2, doc))); - - // Manually insert one of the documents so that the data will partially reflect the transaction - // when the commitTransaction oplog entry is applied during initial sync. - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), - nss, - {doc, commitOp.getOpTime().getTimestamp()}, - commitOp.getOpTime().getTerm())); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc)); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc)); -} - -TEST_F(IdempotencyTestTxns, CommitPreparedTransaction) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto prepareOp = - prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - - auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({prepareOp, commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); -} - -TEST_F(IdempotencyTestTxns, CommitPreparedTransactionDataPartiallyApplied) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - NamespaceString nss2("test.coll2"); - auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); - - auto prepareOp = prepare(lsid, - txnNum, - StmtId(0), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc) - << makeInsertApplyOpsEntry(nss2, uuid2, doc))); - - auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); - - // Manually insert one of the documents so that the data will partially reflect the transaction - // when the commitTransaction oplog entry is applied during initial sync. - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), - nss, - {doc, commitOp.getOpTime().getTimestamp()}, - commitOp.getOpTime().getTerm())); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc)); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({prepareOp, commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc)); -} - -TEST_F(IdempotencyTestTxns, AbortPreparedTransaction) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto prepareOp = - prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto abortOp = abortPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({prepareOp, abortOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - abortOp.getOpTime(), - abortOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kAborted); - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); -} - -TEST_F(IdempotencyTestTxns, SinglePartialTxnOp) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp}); - auto expectedStartOpTime = partialOp.getOpTime(); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - partialOp.getOpTime(), - partialOp.getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - - // Document should not be visible yet. - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); -} - -TEST_F(IdempotencyTestTxns, MultiplePartialTxnOps) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp1 = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto partialOp2 = partialTxn(lsid, - txnNum, - StmtId(1), - partialOp1.getOpTime(), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp1, partialOp2}); - auto expectedStartOpTime = partialOp1.getOpTime(); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - partialOp1.getOpTime(), - partialOp1.getWallClockTime(), - expectedStartOpTime, - DurableTxnStateEnum::kInProgress); - // Document should not be visible yet. - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionWithPartialTxnOps) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - - auto commitOp = commitUnprepared(lsid, - txnNum, - StmtId(1), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), - partialOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp, commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, CommitTwoUnpreparedTransactionsWithPartialTxnOpsAtOnce) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum1(1); - TxnNumber txnNum2(2); - - auto partialOp1 = partialTxn( - lsid, txnNum1, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto commitOp1 = - commitUnprepared(lsid, txnNum1, StmtId(1), BSONArray(), partialOp1.getOpTime()); - - // The second transaction (with a different transaction number) in the same session. - auto partialOp2 = partialTxn( - lsid, txnNum2, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); - auto commitOp2 = - commitUnprepared(lsid, txnNum2, StmtId(1), BSONArray(), partialOp2.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - // This also tests that we clear the partialTxnList for the session after applying the commit of - // the first transaction. Otherwise, saving operations from the second transaction to the same - // partialTxnList as the first transaction will trigger an invariant because of the mismatching - // transaction numbers. - testOpsAreIdempotent({partialOp1, commitOp1, partialOp2, commitOp2}); - - // The transaction table should only contain the second transaction of the session. - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum2, - commitOp2.getOpTime(), - commitOp2.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, CommitAndAbortTwoTransactionsWithPartialTxnOpsAtOnce) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum1(1); - TxnNumber txnNum2(2); - - auto partialOp1 = partialTxn( - lsid, txnNum1, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto abortOp1 = abortPrepared(lsid, txnNum1, StmtId(1), partialOp1.getOpTime()); - - // The second transaction (with a different transaction number) in the same session. - auto partialOp2 = partialTxn( - lsid, txnNum2, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); - auto commitOp2 = - commitUnprepared(lsid, txnNum2, StmtId(1), BSONArray(), partialOp2.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - // This also tests that we clear the partialTxnList for the session after applying the abort of - // the first transaction. Otherwise, saving operations from the second transaction to the same - // partialTxnList as the first transaction will trigger an invariant because of the mismatching - // transaction numbers. - testOpsAreIdempotent({partialOp1, abortOp1, partialOp2, commitOp2}); - - // The transaction table should only contain the second transaction of the session. - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum2, - commitOp2.getOpTime(), - commitOp2.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionWithPartialTxnOpsAndDataPartiallyApplied) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - - auto commitOp = commitUnprepared(lsid, - txnNum, - StmtId(1), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), - partialOp.getOpTime()); - - // Manually insert the first document so that the data will partially reflect the transaction - // when the commitTransaction oplog entry is applied during initial sync. This simulates the - // case where the transaction committed on the sync source at a point during the initial sync, - // such that we cloned 'doc' but missed 'doc2'. - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), - nss, - {doc, commitOp.getOpTime().getTimestamp()}, - commitOp.getOpTime().getTerm())); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp, commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, PrepareTransactionWithPartialTxnOps) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto prepareOp = prepare(lsid, - txnNum, - StmtId(1), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), - partialOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp, prepareOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - prepareOp.getOpTime(), - prepareOp.getWallClockTime(), - partialOp.getOpTime(), - DurableTxnStateEnum::kPrepared); - // Document should not be visible yet. - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); -} - -TEST_F(IdempotencyTestTxns, EmptyPrepareTransaction) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - // It is possible to have an empty prepare oplog entry. - auto prepareOp = prepare(lsid, txnNum, StmtId(1), BSONArray(), OpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({prepareOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - prepareOp.getOpTime(), - prepareOp.getWallClockTime(), - prepareOp.getOpTime(), - DurableTxnStateEnum::kPrepared); -} - -TEST_F(IdempotencyTestTxns, CommitPreparedTransactionWithPartialTxnOps) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto prepareOp = prepare(lsid, - txnNum, - StmtId(1), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), - partialOp.getOpTime()); - auto commitOp = commitPrepared(lsid, txnNum, StmtId(2), prepareOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp, prepareOp, commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, CommitTwoPreparedTransactionsWithPartialTxnOpsAtOnce) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum1(1); - TxnNumber txnNum2(2); - - auto partialOp1 = partialTxn( - lsid, txnNum1, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto prepareOp1 = prepare(lsid, txnNum1, StmtId(1), BSONArray(), partialOp1.getOpTime()); - auto commitOp1 = commitPrepared(lsid, txnNum1, StmtId(2), prepareOp1.getOpTime()); - - // The second transaction (with a different transaction number) in the same session. - auto partialOp2 = partialTxn( - lsid, txnNum2, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2))); - auto prepareOp2 = prepare(lsid, txnNum2, StmtId(1), BSONArray(), partialOp2.getOpTime()); - auto commitOp2 = commitPrepared(lsid, txnNum2, StmtId(2), prepareOp2.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - // This also tests that we clear the partialTxnList for the session after applying the commit of - // the first prepared transaction. Otherwise, saving operations from the second transaction to - // the same partialTxnList as the first transaction will trigger an invariant because of the - // mismatching transaction numbers. - testOpsAreIdempotent({partialOp1, prepareOp1, commitOp1, partialOp2, prepareOp2, commitOp2}); - - // The transaction table should only contain the second transaction of the session. - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum2, - commitOp2.getOpTime(), - commitOp2.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, CommitPreparedTransactionWithPartialTxnOpsAndDataPartiallyApplied) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto prepareOp = prepare(lsid, - txnNum, - StmtId(1), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), - partialOp.getOpTime()); - auto commitOp = commitPrepared(lsid, txnNum, StmtId(2), prepareOp.getOpTime()); - - // Manually insert the first document so that the data will partially reflect the transaction - // when the commitTransaction oplog entry is applied during initial sync. This simulates the - // case where the transaction committed on the sync source at a point during the initial sync, - // such that we cloned 'doc' but missed 'doc2'. - ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), - nss, - {doc, commitOp.getOpTime().getTimestamp()}, - commitOp.getOpTime().getTerm())); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp, prepareOp, commitOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - commitOp.getOpTime(), - commitOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kCommitted); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); - ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, AbortPreparedTransactionWithPartialTxnOps) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto prepareOp = prepare(lsid, - txnNum, - StmtId(1), - BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc2)), - partialOp.getOpTime()); - auto abortOp = abortPrepared(lsid, txnNum, StmtId(2), prepareOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp, prepareOp, abortOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - abortOp.getOpTime(), - abortOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kAborted); - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc2)); -} - -TEST_F(IdempotencyTestTxns, AbortInProgressTransaction) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - auto uuid = createCollectionWithUuid(_opCtx.get(), nss); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto partialOp = partialTxn( - lsid, txnNum, StmtId(0), OpTime(), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); - auto abortOp = abortPrepared(lsid, txnNum, StmtId(1), partialOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({partialOp, abortOp}); - repl::checkTxnTable(_opCtx.get(), - lsid, - txnNum, - abortOp.getOpTime(), - abortOp.getWallClockTime(), - boost::none, - DurableTxnStateEnum::kAborted); - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); -} - -TEST_F(IdempotencyTestTxns, CommitUnpreparedTransactionIgnoresNamespaceNotFoundErrors) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - - // Instead of creating a collection, we generate an arbitrary UUID to use for the operations - // below. This simulates the case where, during initial sync, a document D was inserted into a - // collection C on the sync source and then collection C was dropped, after we started fetching - // oplog entries but before we started collection cloning. In this case, we would not clone - // collection C, but when we try to apply the insertion of document D after collection cloning - // has finished, the collection would not exist since we never created it. It is acceptable to - // ignore the NamespaceNotFound error in this case since we know the collection will be dropped - // later on. - auto uuid = UUID::gen(); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto commitOp = commitUnprepared( - lsid, txnNum, StmtId(1), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)), OpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({commitOp}); - - // The op should have thrown a NamespaceNotFound error, which should have been ignored, so the - // operation has no effect. - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); -} - -TEST_F(IdempotencyTestTxns, CommitPreparedTransactionIgnoresNamespaceNotFoundErrors) { - createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - - // Instead of creating a collection, we generate an arbitrary UUID to use for the operations - // below. This simulates the case where, during initial sync, a document D was inserted into a - // collection C on the sync source and then collection C was dropped, after we started fetching - // oplog entries but before we started collection cloning. In this case, we would not clone - // collection C, but when we try to apply the insertion of document D after collection cloning - // has finished, the collection would not exist since we never created it. It is acceptable to - // ignore the NamespaceNotFound error in this case since we know the collection will be dropped - // later on. - auto uuid = UUID::gen(); - auto lsid = makeLogicalSessionId(_opCtx.get()); - TxnNumber txnNum(0); - - auto prepareOp = prepare( - lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)), OpTime()); - auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); - - ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(MemberState::RS_RECOVERING)); - - testOpsAreIdempotent({prepareOp, commitOp}); - - // The op should have thrown a NamespaceNotFound error, which should have been ignored, so the - // operation has no effect. - ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); -} - -} // namespace -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp deleted file mode 100644 index bd5fe5c9d13..00000000000 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/sync_tail_test_fixture.h" - -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/curop.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/op_observer_registry.h" -#include "mongo/db/query/internal_plans.h" -#include "mongo/db/repl/drop_pending_collection_reaper.h" -#include "mongo/db/repl/oplog_applier.h" -#include "mongo/db/repl/replication_consistency_markers_mock.h" -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" -#include "mongo/db/repl/storage_interface.h" -#include "mongo/db/repl/storage_interface_impl.h" - -namespace mongo { -namespace repl { - -void SyncTailOpObserver::onInserts(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - bool fromMigrate) { - if (!onInsertsFn) { - return; - } - std::vector<BSONObj> docs; - for (auto it = begin; it != end; ++it) { - const InsertStatement& insertStatement = *it; - docs.push_back(insertStatement.doc.getOwned()); - } - onInsertsFn(opCtx, nss, docs); -} - -void SyncTailOpObserver::onDelete(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - StmtId stmtId, - bool fromMigrate, - const boost::optional<BSONObj>& deletedDoc) { - if (!onDeleteFn) { - return; - } - onDeleteFn(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc); -} - -void SyncTailOpObserver::onCreateCollection(OperationContext* opCtx, - Collection* coll, - const NamespaceString& collectionName, - const CollectionOptions& options, - const BSONObj& idIndex, - const OplogSlot& createOpTime) { - if (!onCreateCollectionFn) { - return; - } - onCreateCollectionFn(opCtx, coll, collectionName, options, idIndex); -} - -void SyncTailTest::setUp() { - ServiceContextMongoDTest::setUp(); - - serviceContext = getServiceContext(); - _opCtx = cc().makeOperationContext(); - - ReplicationCoordinator::set(serviceContext, - std::make_unique<ReplicationCoordinatorMock>(serviceContext)); - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - - StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>()); - - DropPendingCollectionReaper::set( - serviceContext, std::make_unique<DropPendingCollectionReaper>(getStorageInterface())); - repl::setOplogCollectionName(serviceContext); - repl::createOplog(_opCtx.get()); - - _consistencyMarkers = std::make_unique<ReplicationConsistencyMarkersMock>(); - - // Set up an OpObserver to track the documents SyncTail inserts. - auto opObserver = std::make_unique<SyncTailOpObserver>(); - _opObserver = opObserver.get(); - auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver()); - opObserverRegistry->addObserver(std::move(opObserver)); - - // Initialize the featureCompatibilityVersion server parameter. This is necessary because this - // test fixture does not create a featureCompatibilityVersion document from which to initialize - // the server parameter. - serverGlobalParams.featureCompatibility.setVersion( - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44); -} - -void SyncTailTest::tearDown() { - _opCtx.reset(); - _consistencyMarkers = {}; - DropPendingCollectionReaper::set(serviceContext, {}); - StorageInterface::set(serviceContext, {}); - ServiceContextMongoDTest::tearDown(); -} - -ReplicationConsistencyMarkers* SyncTailTest::getConsistencyMarkers() const { - return _consistencyMarkers.get(); -} - -StorageInterface* SyncTailTest::getStorageInterface() const { - return StorageInterface::get(serviceContext); -} - -// Since syncApply is being tested outside of its calling function (multiSyncApply), we recreate the -// necessary calling context. -Status SyncTailTest::_syncApplyWrapper(OperationContext* opCtx, - const OplogEntryBatch& batch, - OplogApplication::Mode oplogApplicationMode) { - UnreplicatedWritesBlock uwb(opCtx); - DisableDocumentValidation validationDisabler(opCtx); - return syncApply(opCtx, batch, oplogApplicationMode); -} - -void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError, - const OplogEntry& op, - bool expectedApplyOpCalled) { - bool applyOpCalled = false; - - auto checkOpCtx = [](OperationContext* opCtx) { - ASSERT_TRUE(opCtx); - ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode("test", MODE_IX)); - ASSERT_FALSE(opCtx->lockState()->isDbLockedForMode("test", MODE_X)); - ASSERT_TRUE( - opCtx->lockState()->isCollectionLockedForMode(NamespaceString("test.t"), MODE_IX)); - ASSERT_FALSE(opCtx->writesAreReplicated()); - ASSERT_TRUE(documentValidationDisabled(opCtx)); - }; - - _opObserver->onInsertsFn = - [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { - applyOpCalled = true; - checkOpCtx(opCtx); - ASSERT_EQUALS(NamespaceString("test.t"), nss); - ASSERT_EQUALS(1U, docs.size()); - ASSERT_BSONOBJ_EQ(op.getObject(), docs[0]); - return Status::OK(); - }; - - _opObserver->onDeleteFn = [&](OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - StmtId stmtId, - bool fromMigrate, - const boost::optional<BSONObj>& deletedDoc) { - applyOpCalled = true; - checkOpCtx(opCtx); - ASSERT_EQUALS(NamespaceString("test.t"), nss); - ASSERT(deletedDoc); - ASSERT_BSONOBJ_EQ(op.getObject(), *deletedDoc); - return Status::OK(); - }; - - ASSERT_EQ(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), - expectedError); - ASSERT_EQ(applyOpCalled, expectedApplyOpCalled); -} - -Status failedApplyCommand(OperationContext* opCtx, - const BSONObj& theOperation, - OplogApplication::Mode) { - FAIL("applyCommand unexpectedly invoked."); - return Status::OK(); -} - -Status SyncTailTest::runOpSteadyState(const OplogEntry& op) { - return runOpsSteadyState({op}); -} - -Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) { - SyncTail syncTail( - nullptr, getStorageInterface(), OplogApplier::Options(OplogApplication::Mode::kSecondary)); - MultiApplier::OperationPtrs opsPtrs; - for (auto& op : ops) { - opsPtrs.push_back(&op); - } - WorkerMultikeyPathInfo pathInfo; - return multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); -} - -Status SyncTailTest::runOpInitialSync(const OplogEntry& op) { - return runOpsInitialSync({op}); -} - -Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { - SyncTail syncTail(nullptr, - getStorageInterface(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); - // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD - // operations provided by idempotency tests. - for (auto& op : ops) { - MultiApplier::OperationPtrs opsPtrs; - opsPtrs.push_back(&op); - WorkerMultikeyPathInfo pathInfo; - auto status = multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); - if (!status.isOK()) { - return status; - } - } - return Status::OK(); -} - -Status SyncTailTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) { - SyncTail syncTail(nullptr, - getStorageInterface(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync)); - // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD - // operations provided by idempotency tests. - for (auto& op : ops) { - MultiApplier::OperationPtrs opsPtrs; - opsPtrs.push_back(op); - WorkerMultikeyPathInfo pathInfo; - auto status = multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); - if (!status.isOK()) { - return status; - } - } - return Status::OK(); -} - -void checkTxnTable(OperationContext* opCtx, - const LogicalSessionId& lsid, - const TxnNumber& txnNum, - const repl::OpTime& expectedOpTime, - Date_t expectedWallClock, - boost::optional<repl::OpTime> expectedStartOpTime, - boost::optional<DurableTxnStateEnum> expectedState) { - DBDirectClient client(opCtx); - auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), - {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())}); - ASSERT_FALSE(result.isEmpty()); - - auto txnRecord = - SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result); - - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime()); - ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate()); - if (expectedStartOpTime) { - ASSERT(txnRecord.getStartOpTime()); - ASSERT_EQ(*expectedStartOpTime, *txnRecord.getStartOpTime()); - } else { - ASSERT(!txnRecord.getStartOpTime()); - } - if (expectedState) { - ASSERT(*expectedState == txnRecord.getState()); - } -} - -CollectionReader::CollectionReader(OperationContext* opCtx, const NamespaceString& nss) - : _collToScan(opCtx, nss), - _exec(InternalPlanner::collectionScan(opCtx, - nss.ns(), - _collToScan.getCollection(), - PlanExecutor::NO_YIELD, - InternalPlanner::FORWARD)) {} - -StatusWith<BSONObj> CollectionReader::next() { - BSONObj obj; - - auto state = _exec->getNext(&obj, nullptr); - if (state == PlanExecutor::IS_EOF) { - return {ErrorCodes::CollectionIsEmpty, - str::stream() << "no more documents in " << _collToScan.getNss()}; - } - - // PlanExecutors that do not yield should only return ADVANCED or EOF. - invariant(state == PlanExecutor::ADVANCED); - return obj; -} - -bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) { - DBDirectClient client(opCtx); - auto result = client.findOne(nss.ns(), {doc}); - return !result.isEmpty(); -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h deleted file mode 100644 index 4f2965527d1..00000000000 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/status.h" -#include "mongo/db/concurrency/lock_manager_defs.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/logical_session_id.h" -#include "mongo/db/op_observer_noop.h" -#include "mongo/db/repl/replication_consistency_markers.h" -#include "mongo/db/repl/sync_tail.h" -#include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/db/session_txn_record_gen.h" - -namespace mongo { - -class BSONObj; -class OperationContext; - -namespace repl { - -/** - * OpObserver for SyncTail test fixture. - */ -class SyncTailOpObserver : public OpObserverNoop { -public: - /** - * This function is called whenever SyncTail inserts documents into a collection. - */ - void onInserts(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - bool fromMigrate) override; - - /** - * This function is called whenever SyncTail deletes a document from a collection. - */ - void onDelete(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - StmtId stmtId, - bool fromMigrate, - const boost::optional<BSONObj>& deletedDoc) override; - - /** - * Called when SyncTail creates a collection. - */ - void onCreateCollection(OperationContext* opCtx, - Collection* coll, - const NamespaceString& collectionName, - const CollectionOptions& options, - const BSONObj& idIndex, - const OplogSlot& createOpTime) override; - - // Hooks for OpObserver functions. Defaults to a no-op function but may be overridden to check - // actual documents mutated. - std::function<void(OperationContext*, const NamespaceString&, const std::vector<BSONObj>&)> - onInsertsFn; - - std::function<void(OperationContext*, - const NamespaceString&, - OptionalCollectionUUID, - StmtId, - bool, - const boost::optional<BSONObj>&)> - onDeleteFn; - - std::function<void(OperationContext*, - Collection*, - const NamespaceString&, - const CollectionOptions&, - const BSONObj&)> - onCreateCollectionFn; -}; - -class SyncTailTest : public ServiceContextMongoDTest { -protected: - void _testSyncApplyCrudOperation(ErrorCodes::Error expectedError, - const OplogEntry& op, - bool expectedApplyOpCalled); - - Status _syncApplyWrapper(OperationContext* opCtx, - const OplogEntryBatch& batch, - OplogApplication::Mode oplogApplicationMode); - - ServiceContext::UniqueOperationContext _opCtx; - std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; - ServiceContext* serviceContext; - SyncTailOpObserver* _opObserver = nullptr; - - // Implements the SyncTail::MultiSyncApplyFn interface and does nothing. - static Status noopApplyOperationFn(OperationContext*, - MultiApplier::OperationPtrs*, - SyncTail* st, - WorkerMultikeyPathInfo*) { - return Status::OK(); - } - - OpTime nextOpTime() { - static long long lastSecond = 1; - return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL); - } - - void setUp() override; - void tearDown() override; - - ReplicationConsistencyMarkers* getConsistencyMarkers() const; - StorageInterface* getStorageInterface() const; - - Status runOpSteadyState(const OplogEntry& op); - Status runOpsSteadyState(std::vector<OplogEntry> ops); - Status runOpInitialSync(const OplogEntry& entry); - Status runOpsInitialSync(std::vector<OplogEntry> ops); - Status runOpPtrsInitialSync(MultiApplier::OperationPtrs ops); - - UUID kUuid{UUID::gen()}; -}; - -// Utility class to allow easily scanning a collection. Scans in forward order, returns -// Status::CollectionIsEmpty when scan is exhausted. -class CollectionReader { -public: - CollectionReader(OperationContext* opCtx, const NamespaceString& nss); - - StatusWith<BSONObj> next(); - -private: - AutoGetCollectionForRead _collToScan; - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _exec; -}; - -Status failedApplyCommand(OperationContext* opCtx, - const BSONObj& theOperation, - OplogApplication::Mode); - -void checkTxnTable(OperationContext* opCtx, - const LogicalSessionId& lsid, - const TxnNumber& txnNum, - const repl::OpTime& expectedOpTime, - Date_t expectedWallClock, - boost::optional<repl::OpTime> expectedStartOpTime, - boost::optional<DurableTxnStateEnum> expectedState); - -bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc); - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 4c7bc1b51a5..591eafe50e5 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -45,7 +45,6 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/sync_tail.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/dbtests/dbtests.h" #include "mongo/transport/transport_layer_asio.h" diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index e8944a069a0..b2d7cdb8dc7 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -69,7 +69,6 @@ #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/repl/sync_tail.h" #include "mongo/db/repl/timestamp_block.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/service_context.h" @@ -783,8 +782,8 @@ public: } repl::OplogEntryBatch groupedInsertBatch(opPtrs.cbegin(), opPtrs.cend()); - ASSERT_OK( - repl::syncApply(_opCtx, groupedInsertBatch, repl::OplogApplication::Mode::kSecondary)); + ASSERT_OK(repl::applyOplogEntryBatch( + _opCtx, groupedInsertBatch, repl::OplogApplication::Mode::kSecondary)); for (std::int32_t idx = 0; idx < docsToInsert; ++idx) { OneOffRead oor(_opCtx, firstInsertTime.addTicks(idx).asTimestamp()); @@ -1312,7 +1311,7 @@ public: _coordinatorMock, _consistencyMarkers, storageInterface, - repl::multiSyncApply, + repl::applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops))); @@ -1397,7 +1396,7 @@ public: _coordinatorMock, _consistencyMarkers, storageInterface, - repl::multiSyncApply, + repl::applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), writerPool.get()); auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)); @@ -2413,7 +2412,7 @@ public: // threads can cleanly exit and this test case fails without crashing the entire suite. auto applyOperationFn = [&](OperationContext* opCtx, std::vector<const repl::OplogEntry*>* operationsToApply, - repl::SyncTail* st, + repl::OplogApplierImpl* oa, std::vector<MultikeyPathInfo>* pathInfo) -> Status { if (!_opCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, MODE_X)) { @@ -2421,7 +2420,7 @@ public: } // Insert the document. A reader without a PBWM lock should not see it yet. - auto status = repl::multiSyncApply(opCtx, operationsToApply, st, pathInfo); + auto status = repl::applyOplogGroup(opCtx, operationsToApply, oa, pathInfo); if (!status.isOK()) { return status; } |