/** * Copyright 2015 (C) MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include #include #include #include #include "mongo/bson/util/bson_extract.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/client.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/idempotency_test_fixture.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog.h" #include "mongo/stdx/mutex.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/md5.hpp" #include "mongo/util/scopeguard.h" #include "mongo/util/string_map.h" namespace mongo { namespace repl { namespace { /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ repl::OplogEntry makeOplogEntry(NamespaceString nss) { return repl::OplogEntry(OpTime(Timestamp(1, 1), 1), // optime 1LL, // hash OpTypeEnum::kDelete, // opType nss, // namespace boost::none, // uuid boost::none, // fromMigrate repl::OplogEntry::kOplogVersion, // version BSONObj(), // o boost::none, // o2 {}, // sessionInfo boost::none, // upsert boost::none, // wall clock time boost::none, // statement id boost::none, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime } repl::OplogEntry makeOplogEntry(StringData ns) { return makeOplogEntry(NamespaceString(ns)); } /** * Testing-only SyncTail that returns user-provided "document" for getMissingDoc(). */ class SyncTailWithLocalDocumentFetcher : public SyncTail { public: SyncTailWithLocalDocumentFetcher(const BSONObj& document); BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override; private: BSONObj _document; }; /** * Testing-only SyncTail that checks the operation context in fetchAndInsertMissingDocument(). */ class SyncTailWithOperationContextChecker : public SyncTail { public: SyncTailWithOperationContextChecker(); bool fetchAndInsertMissingDocument(OperationContext* opCtx, const OplogEntry& oplogEntry) override; }; SyncTailWithLocalDocumentFetcher::SyncTailWithLocalDocumentFetcher(const BSONObj& document) : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr), _document(document) {} BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const OplogEntry&) { return _document; } SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker() : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr) {} bool SyncTailWithOperationContextChecker::fetchAndInsertMissingDocument(OperationContext* opCtx, const OplogEntry&) { ASSERT_FALSE(opCtx->writesAreReplicated()); ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); ASSERT_TRUE(documentValidationDisabled(opCtx)); return false; } /** * Creates collection options suitable for oplog. */ CollectionOptions createOplogCollectionOptions() { CollectionOptions options; options.capped = true; options.cappedSize = 64 * 1024 * 1024LL; options.autoIndexId = CollectionOptions::NO; return options; } /** * Create test collection. * Returns collection. */ void createCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { Lock::DBLock dblk(opCtx, nss.db(), MODE_X); OldClientContext ctx(opCtx, nss.ns()); auto db = ctx.db(); ASSERT_TRUE(db); mongo::WriteUnitOfWork wuow(opCtx); auto coll = db->createCollection(opCtx, nss.ns(), options); ASSERT_TRUE(coll); wuow.commit(); }); } /** * Create test database. */ void createDatabase(OperationContext* opCtx, StringData dbName) { Lock::GlobalWrite globalLock(opCtx); bool justCreated; Database* db = dbHolder().openDb(opCtx, dbName, &justCreated); ASSERT_TRUE(db); ASSERT_TRUE(justCreated); } auto parseFromOplogEntryArray(const BSONObj& obj, int elem) { BSONElement tsArray; Status status = bsonExtractTypedField(obj, OpTime::kTimestampFieldName, BSONType::Array, &tsArray); ASSERT_OK(status); BSONElement termArray; status = bsonExtractTypedField(obj, OpTime::kTermFieldName, BSONType::Array, &termArray); ASSERT_OK(status); return OpTime(tsArray.Array()[elem].timestamp(), termArray.Array()[elem].Long()); }; TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) { const BSONObj op = BSON("op" << "x"); ASSERT_THROWS( SyncTail::syncApply( _opCtx.get(), op, OplogApplication::Mode::kInitialSync, _applyOp, _applyCmd, _incOps) .ignore(), ExceptionFor); ASSERT_EQUALS(0U, _opsApplied); } TEST_F(SyncTailTest, SyncApplyNoNamespaceNoOp) { ASSERT_OK(SyncTail::syncApply(_opCtx.get(), BSON("op" << "n"), OplogApplication::Mode::kInitialSync)); ASSERT_EQUALS(0U, _opsApplied); } TEST_F(SyncTailTest, SyncApplyBadOp) { const BSONObj op = BSON("op" << "x" << "ns" << "test.t"); ASSERT_THROWS( SyncTail::syncApply( _opCtx.get(), op, OplogApplication::Mode::kInitialSync, _applyOp, _applyCmd, _incOps) .ignore(), ExceptionFor); ASSERT_EQUALS(0U, _opsApplied); } TEST_F(SyncTailTest, SyncApplyNoOpInitialSync) { const BSONObj op = BSON("op" << "n" << "ns" << "test.t"); bool applyOpCalled = false; SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, bool alwaysUpsert, OplogApplication::Mode oplogApplicationMode, stdx::function) { applyOpCalled = true; ASSERT_TRUE(opCtx); ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode("test", MODE_X)); ASSERT_FALSE(opCtx->writesAreReplicated()); ASSERT_TRUE(documentValidationDisabled(opCtx)); ASSERT_TRUE(db); ASSERT_BSONOBJ_EQ(op, theOperation); ASSERT_FALSE(alwaysUpsert); ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kInitialSync); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, failedApplyCommand, _incOps)); ASSERT_TRUE(applyOpCalled); } TEST_F(SyncTailTest, SyncApplyNoOpNotInitialSync) { const BSONObj op = BSON("op" << "n" << "ns" << "test.t"); bool applyOpCalled = false; SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, bool alwaysUpsert, OplogApplication::Mode oplogApplicationMode, stdx::function) { applyOpCalled = true; ASSERT_TRUE(opCtx); ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode("test", MODE_X)); ASSERT_FALSE(opCtx->writesAreReplicated()); ASSERT_TRUE(documentValidationDisabled(opCtx)); ASSERT_TRUE(db); ASSERT_BSONOBJ_EQ(op, theOperation); ASSERT(alwaysUpsert); ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kSecondary); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kSecondary, applyOp, failedApplyCommand, _incOps)); ASSERT_TRUE(applyOpCalled); } TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) { ASSERT_THROWS_CODE(_testSyncApplyInsertDocument(ErrorCodes::OK), AssertionException, ErrorCodes::NamespaceNotFound); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentDatabaseMissing) { const BSONObj op = BSON("op" << "d" << "ns" << "test.othername"); _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); } TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) { const NamespaceString nss("test.t"); createDatabase(_opCtx.get(), nss.db()); const BSONObj op = BSON("op" << "i" << "ns" << nss.getSisterNS("othername") << "ui" << UUID::gen()); ASSERT_THROWS_CODE(_testSyncApplyCrudOperation(ErrorCodes::OK, op, true), AssertionException, ErrorCodes::NamespaceNotFound); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) { const NamespaceString nss("test.t"); createDatabase(_opCtx.get(), nss.db()); const BSONObj op = BSON("op" << "d" << "ns" << nss.getSisterNS("othername") << "ui" << UUID::gen()); _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); } TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) { { Lock::GlobalWrite globalLock(_opCtx.get()); bool justCreated = false; Database* db = dbHolder().openDb(_opCtx.get(), "test", &justCreated); ASSERT_TRUE(db); ASSERT_TRUE(justCreated); } // Even though the collection doesn't exist, this is handled in the actual application function, // which in the case of this test just ignores such errors. This tests mostly that we don't // implicitly create the collection and lock the database in MODE_X. _testSyncApplyInsertDocument(ErrorCodes::OK); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionMissing) { const NamespaceString nss("test.t"); createDatabase(_opCtx.get(), nss.db()); // Even though the collection doesn't exist, this is handled in the actual application function, // which in the case of this test just ignores such errors. This tests mostly that we don't // implicitly create the collection and lock the database in MODE_X. const BSONObj op = BSON("op" << "d" << "ns" << nss.ns()); _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); } TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionExists) { { Lock::GlobalWrite globalLock(_opCtx.get()); bool justCreated = false; Database* db = dbHolder().openDb(_opCtx.get(), "test", &justCreated); ASSERT_TRUE(db); ASSERT_TRUE(justCreated); WriteUnitOfWork wuow(_opCtx.get()); Collection* collection = db->createCollection(_opCtx.get(), "test.t"); wuow.commit(); ASSERT_TRUE(collection); } _testSyncApplyInsertDocument(ErrorCodes::OK); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionExists) { const NamespaceString nss("test.t"); createCollection(_opCtx.get(), nss, {}); const BSONObj op = BSON("op" << "d" << "ns" << nss.ns()); _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); } TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLockedByUUID) { CollectionOptions options; options.uuid = UUID::gen(); { Lock::GlobalWrite globalLock(_opCtx.get()); bool justCreated; Database* db = dbHolder().openDb(_opCtx.get(), "test", &justCreated); ASSERT_TRUE(db); ASSERT_TRUE(justCreated); WriteUnitOfWork wuow(_opCtx.get()); Collection* collection = db->createCollection(_opCtx.get(), "test.t", options); wuow.commit(); ASSERT_TRUE(collection); } // Test that the collection to lock is determined by the UUID and not the 'ns' field. const BSONObj op = BSON("op" << "i" << "ns" << "test.othername" << "ui" << options.uuid.get()); _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLockedByUUID) { const NamespaceString nss("test.t"); CollectionOptions options; options.uuid = UUID::gen(); createCollection(_opCtx.get(), nss, options); // Test that the collection to lock is determined by the UUID and not the 'ns' field. auto op = BSON("op" << "d" << "ns" << nss.getSisterNS("othername") << "ui" << options.uuid.get()); _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); } TEST_F(SyncTailTest, SyncApplyIndexBuild) { const BSONObj op = BSON("op" << "i" << "ns" << "test.system.indexes"); bool applyOpCalled = false; SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, bool alwaysUpsert, OplogApplication::Mode oplogApplicationMode, stdx::function) { applyOpCalled = true; ASSERT_TRUE(opCtx); ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode("test", MODE_X)); ASSERT_FALSE(opCtx->writesAreReplicated()); ASSERT_TRUE(documentValidationDisabled(opCtx)); ASSERT_TRUE(db); ASSERT_BSONOBJ_EQ(op, theOperation); ASSERT_FALSE(alwaysUpsert); ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kInitialSync); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, failedApplyCommand, _incOps)); ASSERT_TRUE(applyOpCalled); } TEST_F(SyncTailTest, SyncApplyCommand) { const BSONObj op = BSON("op" << "c" << "ns" << "test.t"); bool applyCmdCalled = false; SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, bool alwaysUpsert, OplogApplication::Mode oplogApplicationMode, stdx::function) { FAIL("applyOperation unexpectedly invoked."); return Status::OK(); }; SyncTail::ApplyCommandInLockFn applyCmd = [&](OperationContext* opCtx, const BSONObj& theOperation, OplogApplication::Mode oplogApplicationMode) { applyCmdCalled = true; ASSERT_TRUE(opCtx); ASSERT_TRUE(opCtx->lockState()->isW()); ASSERT_TRUE(opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(opCtx)); ASSERT_BSONOBJ_EQ(op, theOperation); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); ASSERT_OK(SyncTail::syncApply( _opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, applyCmd, _incOps)); ASSERT_TRUE(applyCmdCalled); ASSERT_EQUALS(1U, _opsApplied); } TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { const BSONObj op = BSON("op" << "c" << "ns" << "test.t"); int applyCmdCalled = 0; SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, bool alwaysUpsert, OplogApplication::Mode oplogApplicationMode, stdx::function) { FAIL("applyOperation unexpectedly invoked."); return Status::OK(); }; SyncTail::ApplyCommandInLockFn applyCmd = [&](OperationContext* opCtx, const BSONObj& theOperation, OplogApplication::Mode oplogApplicationMode) { applyCmdCalled++; if (applyCmdCalled < 5) { throw WriteConflictException(); } return Status::OK(); }; ASSERT_OK(SyncTail::syncApply( _opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, applyCmd, _incOps)); ASSERT_EQUALS(5, applyCmdCalled); ASSERT_EQUALS(1U, _opsApplied); } TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullOperationContext) { auto writerPool = SyncTail::makeWriterPool(); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}); auto status = multiApply(nullptr, writerPool.get(), {op}, noopApplyOperationFn).getStatus(); ASSERT_EQUALS(ErrorCodes::BadValue, status); ASSERT_STRING_CONTAINS(status.reason(), "invalid operation context"); } TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullWriterPool) { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}); auto status = multiApply(_opCtx.get(), nullptr, {op}, noopApplyOperationFn).getStatus(); ASSERT_EQUALS(ErrorCodes::BadValue, status); ASSERT_STRING_CONTAINS(status.reason(), "invalid worker pool"); } TEST_F(SyncTailTest, MultiApplyReturnsEmptyArrayOperationWhenNoOperationsAreGiven) { auto writerPool = SyncTail::makeWriterPool(); auto status = multiApply(_opCtx.get(), writerPool.get(), {}, noopApplyOperationFn).getStatus(); ASSERT_EQUALS(ErrorCodes::EmptyArrayOperation, status); ASSERT_STRING_CONTAINS(status.reason(), "no operations provided to multiApply"); } TEST_F(SyncTailTest, MultiApplyReturnsBadValueOnNullApplyOperation) { auto writerPool = SyncTail::makeWriterPool(); MultiApplier::ApplyOperationFn nullApplyOperationFn; auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}); auto status = multiApply(_opCtx.get(), writerPool.get(), {op}, nullApplyOperationFn).getStatus(); ASSERT_EQUALS(ErrorCodes::BadValue, status); ASSERT_STRING_CONTAINS(status.reason(), "invalid apply operation function"); } bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { auto writerPool = SyncTail::makeWriterPool(); MultiApplier::Operations operationsApplied; auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, MultiApplier::OperationPtrs* operationsToApply, WorkerMultikeyPathInfo*) -> Status { for (auto&& opPtr : *operationsToApply) { operationsApplied.push_back(*opPtr); } return Status::OK(); }; createCollection(opCtx, nss, options); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); ASSERT_FALSE(op.isForCappedCollection); auto lastOpTime = unittest::assertGet(multiApply(opCtx, writerPool.get(), {op}, applyOperationFn)); ASSERT_EQUALS(op.getOpTime(), lastOpTime); ASSERT_EQUALS(1U, operationsApplied.size()); const auto& opApplied = operationsApplied.front(); ASSERT_EQUALS(op, opApplied); // "isForCappedCollection" is not parsed from raw oplog entry document. return opApplied.isForCappedCollection; } TEST_F( SyncTailTest, MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); ASSERT_FALSE(_testOplogEntryIsForCappedCollection(_opCtx.get(), nss, CollectionOptions())); } TEST_F(SyncTailTest, MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); ASSERT_TRUE( _testOplogEntryIsForCappedCollection(_opCtx.get(), nss, createOplogCollectionOptions())); } TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceHash) { // This test relies on implementation details of how multiApply uses hashing to distribute ops // to threads. It is possible for this test to fail, even if the implementation of multiApply is // correct. If it fails, consider adjusting the namespace names (to adjust the hash values) or // the number of threads in the pool. NamespaceString nss1("test.t0"); NamespaceString nss2("test.t1"); auto writerPool = SyncTail::makeWriterPool(2); stdx::mutex mutex; std::vector operationsApplied; auto applyOperationFn = [&mutex, &operationsApplied](OperationContext* opCtx, MultiApplier::OperationPtrs* operationsForWriterThreadToApply, WorkerMultikeyPathInfo*) -> Status { stdx::lock_guard lock(mutex); operationsApplied.emplace_back(); for (auto&& opPtr : *operationsForWriterThreadToApply) { operationsApplied.back().push_back(*opPtr); } return Status::OK(); }; auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1)); auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2)); NamespaceString nssForInsert; std::vector operationsWrittenToOplog; _storageInterface->insertDocumentsFn = [&mutex, &nssForInsert, &operationsWrittenToOplog]( OperationContext* opCtx, const NamespaceString& nss, const std::vector& docs) { stdx::lock_guard lock(mutex); nssForInsert = nss; operationsWrittenToOplog = docs; return Status::OK(); }; auto lastOpTime = unittest::assertGet( multiApply(_opCtx.get(), writerPool.get(), {op1, op2}, applyOperationFn)); ASSERT_EQUALS(op2.getOpTime(), lastOpTime); // Each writer thread should be given exactly one operation to apply. std::vector seen; { stdx::lock_guard lock(mutex); ASSERT_EQUALS(operationsApplied.size(), 2U); for (auto&& operationsAppliedByThread : operationsApplied) { ASSERT_EQUALS(1U, operationsAppliedByThread.size()); const auto& oplogEntry = operationsAppliedByThread.front(); ASSERT_TRUE(std::find(seen.cbegin(), seen.cend(), oplogEntry.getOpTime()) == seen.cend()); ASSERT_TRUE(oplogEntry == op1 || oplogEntry == op2); seen.push_back(oplogEntry.getOpTime()); } } // Check ops in oplog. stdx::lock_guard lock(mutex); ASSERT_EQUALS(2U, operationsWrittenToOplog.size()); ASSERT_EQUALS(NamespaceString::kRsOplogNamespace, nssForInsert); ASSERT_EQUALS(op1, unittest::assertGet(OplogEntry::parse(operationsWrittenToOplog[0].doc))); ASSERT_EQUALS(op2, unittest::assertGet(OplogEntry::parse(operationsWrittenToOplog[1].doc))); } TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, nullptr, &pathInfo)); // Collection should be created after SyncTail::syncApply() processes operation. ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } void testWorkerMultikeyPaths(OperationContext* opCtx, const OplogEntry& op, unsigned long numPaths) { WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&op}; ASSERT_OK(multiSyncApply(opCtx, &ops, nullptr, &pathInfo)); ASSERT_EQ(pathInfo.size(), numPaths); } TEST_F(SyncTailTest, MultiSyncApplyAddsWorkerMultikeyPathInfoOnInsert) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("a" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, doc); testWorkerMultikeyPaths(_opCtx.get(), op, 1UL); } } TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("a" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("b" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "b_1", keyPattern); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto docA = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA); auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7)); auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&opA, &opB}; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, nullptr, &pathInfo)); ASSERT_EQ(pathInfo.size(), 2UL); } } TEST_F(SyncTailTest, MultiSyncApplyDoesNotAddWorkerMultikeyPathInfoOnCreateIndex) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("a" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "a_1", keyPattern); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto doc = BSON("_id" << 2 << "a" << BSON_ARRAY(6 << 7)); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc); testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } } DEATH_TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID, "Attempted to create a new collection") { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_SECONDARY)); NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); MultiApplier::OperationPtrs ops = {&op}; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, nullptr, nullptr)); } TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_STARTUP2)); NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); MultiApplier::OperationPtrs ops = {&op}; ASSERT_EQUALS(ErrorCodes::InvalidOptions, multiInitialSyncApply(_opCtx.get(), &ops, nullptr, nullptr, nullptr)); } TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto syncApply = [](OperationContext* opCtx, const BSONObj&, OplogApplication::Mode oplogApplicationMode) { ASSERT_FALSE(opCtx->writesAreReplicated()); ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); ASSERT_TRUE(documentValidationDisabled(opCtx)); ASSERT_EQUALS(OplogApplication::Mode::kSecondary, oplogApplicationMode); return Status::OK(); }; auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); } TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeOplogEntry(nss); auto syncApply = [](OperationContext*, const BSONObj&, OplogApplication::Mode) -> Status { return {ErrorCodes::OperationFailed, ""}; }; MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); } TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeOplogEntry(nss); auto syncApply = [](OperationContext*, const BSONObj&, OplogApplication::Mode) -> Status { uasserted(ErrorCodes::OperationFailed, ""); MONGO_UNREACHABLE; }; MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); } TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplying) { auto op1 = makeOplogEntry("test.t1"); auto op2 = makeOplogEntry("test.t1"); auto op3 = makeOplogEntry("test.t2"); auto op4 = makeOplogEntry("test.t3"); MultiApplier::Operations operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { operationsApplied.push_back(OplogEntry(op)); return Status::OK(); }; MultiApplier::OperationPtrs ops = {&op4, &op1, &op3, &op2}; WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); ASSERT_EQUALS(4U, operationsApplied.size()); ASSERT_EQUALS(op1, operationsApplied[0]); ASSERT_EQUALS(op2, operationsApplied[1]); ASSERT_EQUALS(op3, operationsApplied[2]); ASSERT_EQUALS(op4, operationsApplied[3]); } TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplying) { int seconds = 0; auto makeOp = [&seconds](const NamespaceString& nss) { return makeInsertDocumentOplogEntry( {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); }; NamespaceString nss1("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); NamespaceString nss2("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_2"); auto createOp1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss1); auto createOp2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss2); auto insertOp1a = makeOp(nss1); auto insertOp1b = makeOp(nss1); auto insertOp2a = makeOp(nss2); auto insertOp2b = makeOp(nss2); std::vector operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { operationsApplied.push_back(op.copy()); return Status::OK(); }; MultiApplier::OperationPtrs ops = { &createOp1, &createOp2, &insertOp1a, &insertOp2a, &insertOp1b, &insertOp2b}; WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); ASSERT_EQUALS(4U, operationsApplied.size()); ASSERT_EQUALS(createOp1, unittest::assertGet(OplogEntry::parse(operationsApplied[0]))); ASSERT_EQUALS(createOp2, unittest::assertGet(OplogEntry::parse(operationsApplied[1]))); // Check grouped insert operations in namespace "nss1". ASSERT_EQUALS(insertOp1a.getOpTime(), parseFromOplogEntryArray(operationsApplied[2], 0)); ASSERT_EQUALS(insertOp1a.getNamespace().ns(), operationsApplied[2]["ns"].valuestrsafe()); ASSERT_EQUALS(BSONType::Array, operationsApplied[2]["o"].type()); auto group1 = operationsApplied[2]["o"].Array(); ASSERT_EQUALS(2U, group1.size()); ASSERT_BSONOBJ_EQ(insertOp1a.getObject(), group1[0].Obj()); ASSERT_BSONOBJ_EQ(insertOp1b.getObject(), group1[1].Obj()); // Check grouped insert operations in namespace "nss2". ASSERT_EQUALS(insertOp2a.getOpTime(), parseFromOplogEntryArray(operationsApplied[3], 0)); ASSERT_EQUALS(insertOp2a.getNamespace().ns(), operationsApplied[3]["ns"].valuestrsafe()); ASSERT_EQUALS(BSONType::Array, operationsApplied[3]["o"].type()); auto group2 = operationsApplied[3]["o"].Array(); ASSERT_EQUALS(2U, group2.size()); ASSERT_BSONOBJ_EQ(insertOp2a.getObject(), group2[0].Obj()); ASSERT_BSONOBJ_EQ(insertOp2b.getObject(), group2[1].Obj()); } TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchCountWhenGroupingInsertOperation) { int seconds = 0; auto makeOp = [&seconds](const NamespaceString& nss) { return makeInsertDocumentOplogEntry( {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); }; NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); // Generate operations to apply: // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} std::size_t limit = 64; MultiApplier::Operations insertOps; for (std::size_t i = 0; i < limit + 1; ++i) { insertOps.push_back(makeOp(nss)); } MultiApplier::Operations operationsToApply; operationsToApply.push_back(createOp); std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); std::vector operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { operationsApplied.push_back(op.copy()); return Status::OK(); }; MultiApplier::OperationPtrs ops; for (auto&& op : operationsToApply) { ops.push_back(&op); } WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); // multiSyncApply should combine operations as follows: // {create}, {grouped_insert}, {insert_(limit+1)} ASSERT_EQUALS(3U, operationsApplied.size()); ASSERT_EQUALS(createOp, unittest::assertGet(OplogEntry::parse(operationsApplied[0]))); const auto& groupedInsertOp = operationsApplied[1]; ASSERT_EQUALS(insertOps.front().getOpTime(), parseFromOplogEntryArray(groupedInsertOp, 0)); ASSERT_EQUALS(insertOps.front().getNamespace().ns(), groupedInsertOp["ns"].valuestrsafe()); ASSERT_EQUALS(BSONType::Array, groupedInsertOp["o"].type()); auto groupedInsertDocuments = groupedInsertOp["o"].Array(); ASSERT_EQUALS(limit, groupedInsertDocuments.size()); for (std::size_t i = 0; i < limit; ++i) { const auto& insertOp = insertOps[i]; ASSERT_BSONOBJ_EQ(insertOp.getObject(), groupedInsertDocuments[i].Obj()); } // (limit + 1)-th insert operations should not be included in group of first (limit) inserts. ASSERT_EQUALS(insertOps.back(), unittest::assertGet(OplogEntry::parse(operationsApplied[2]))); } // Create an 'insert' oplog operation of an approximate size in bytes. The '_id' of the oplog entry // and its optime in seconds are given by the 'id' argument. OplogEntry makeSizedInsertOp(const NamespaceString& nss, int size, int id) { return makeInsertDocumentOplogEntry({Timestamp(Seconds(id), 0), 1LL}, nss, BSON("_id" << id << "data" << std::string(size, '*'))); }; TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchSizeWhenGroupingInsertOperations) { int seconds = 0; NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); // Create a sequence of insert ops that are too large to fit in one group. int maxBatchSize = insertVectorMaxBytes; int opsPerBatch = 3; int opSize = maxBatchSize / opsPerBatch - 500; // Leave some room for other oplog fields. // Create the insert ops. MultiApplier::Operations insertOps; int numOps = 4; for (int i = 0; i < numOps; i++) { insertOps.push_back(makeSizedInsertOp(nss, opSize, seconds++)); } MultiApplier::Operations operationsToApply; operationsToApply.push_back(createOp); std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); MultiApplier::OperationPtrs ops; for (auto&& op : operationsToApply) { ops.push_back(&op); } std::vector operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { operationsApplied.push_back(op.copy()); return Status::OK(); }; // Apply the ops. WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); // Applied ops should be as follows: // [ {create}, INSERT_GROUP{insert 1, insert 2, insert 3}, {insert 4} ] ASSERT_EQ(3U, operationsApplied.size()); auto groupedInsertOp = operationsApplied[1]; ASSERT_EQUALS(BSONType::Array, groupedInsertOp["o"].type()); // Make sure the insert group was created correctly. for (int i = 0; i < opsPerBatch; ++i) { auto groupedInsertOpArray = groupedInsertOp["o"].Array(); ASSERT_BSONOBJ_EQ(insertOps[i].getObject(), groupedInsertOpArray[i].Obj()); } // Check that the last op was applied individually. ASSERT_EQUALS(insertOps[3], unittest::assertGet(OplogEntry::parse(operationsApplied[2]))); } TEST_F(SyncTailTest, MultiSyncApplyAppliesOpIndividuallyWhenOpIndividuallyExceedsBatchSize) { int seconds = 0; NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); int maxBatchSize = insertVectorMaxBytes; // Create an insert op that exceeds the maximum batch size by itself. auto insertOpLarge = makeSizedInsertOp(nss, maxBatchSize, seconds++); auto insertOpSmall = makeSizedInsertOp(nss, 100, seconds++); MultiApplier::Operations operationsToApply = {createOp, insertOpLarge, insertOpSmall}; MultiApplier::OperationPtrs ops; for (auto&& op : operationsToApply) { ops.push_back(&op); } std::vector operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { operationsApplied.push_back(op.copy()); return Status::OK(); }; // Apply the ops. WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); // Applied ops should be as follows: // [ {create}, {large insert} {small insert} ] ASSERT_EQ(operationsToApply.size(), operationsApplied.size()); ASSERT_EQUALS(createOp, unittest::assertGet(OplogEntry::parse(operationsApplied[0]))); ASSERT_EQUALS(insertOpLarge, unittest::assertGet(OplogEntry::parse(operationsApplied[1]))); ASSERT_EQUALS(insertOpSmall, unittest::assertGet(OplogEntry::parse(operationsApplied[2]))); } TEST_F(SyncTailTest, MultiSyncApplyAppliesInsertOpsIndividuallyWhenUnableToCreateGroupByNamespace) { int seconds = 0; auto makeOp = [&seconds](const NamespaceString& nss) { return makeInsertDocumentOplogEntry( {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); }; auto testNs = "test." + _agent.getSuiteName() + "_" + _agent.getTestName(); // Create a sequence of 3 'insert' ops that can't be grouped because they are from different // namespaces. MultiApplier::Operations operationsToApply = {makeOp(NamespaceString(testNs + "_1")), makeOp(NamespaceString(testNs + "_2")), makeOp(NamespaceString(testNs + "_3"))}; std::vector operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { operationsApplied.push_back(op.copy()); return Status::OK(); }; MultiApplier::OperationPtrs ops; for (auto&& op : operationsToApply) { ops.push_back(&op); } // Apply the ops. WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); // Applied ops should be as follows i.e. no insert grouping: // [{insert 1}, {insert 2}, {insert 3}] ASSERT_EQ(operationsToApply.size(), operationsApplied.size()); for (std::size_t i = 0; i < operationsToApply.size(); i++) { ASSERT_EQUALS(operationsToApply[i], unittest::assertGet(OplogEntry::parse(operationsApplied[i]))); } } TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { int seconds = 0; auto makeOp = [&seconds](const NamespaceString& nss) { return makeInsertDocumentOplogEntry( {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++)); }; NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1"); auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss); // Generate operations to apply: // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} std::size_t limit = 64; MultiApplier::Operations insertOps; for (std::size_t i = 0; i < limit + 1; ++i) { insertOps.push_back(makeOp(nss)); } MultiApplier::Operations operationsToApply; operationsToApply.push_back(createOp); std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); std::size_t numFailedGroupedInserts = 0; MultiApplier::Operations operationsApplied; auto syncApply = [&numFailedGroupedInserts, &operationsApplied]( OperationContext*, const BSONObj& op, OplogApplication::Mode) -> Status { // Reject grouped insert operations. if (op["o"].type() == BSONType::Array) { numFailedGroupedInserts++; return {ErrorCodes::OperationFailed, "grouped inserts not supported"}; } operationsApplied.push_back(OplogEntry(op)); return Status::OK(); }; MultiApplier::OperationPtrs ops; for (auto&& op : operationsToApply) { ops.push_back(&op); } WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, &pathInfo, syncApply)); // On failing to apply the grouped insert operation, multiSyncApply should apply the operations // as given in "operationsToApply": // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)} ASSERT_EQUALS(limit + 2, operationsApplied.size()); ASSERT_EQUALS(createOp, operationsApplied[0]); for (std::size_t i = 0; i < limit + 1; ++i) { const auto& insertOp = insertOps[i]; ASSERT_EQUALS(insertOp, operationsApplied[i + 1]); } // Ensure that multiSyncApply does not attempt to group remaining operations in first failed // grouped insert operation. ASSERT_EQUALS(1U, numFailedGroupedInserts); } TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyingOperations) { SyncTailWithOperationContextChecker syncTail; NamespaceString nss("test.t"); createCollection(_opCtx.get(), nss, {}); auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 1U); } TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) { BSONObj emptyDoc; SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); NamespaceString nss("test.t"); { Lock::GlobalWrite globalLock(_opCtx.get()); bool justCreated = false; Database* db = dbHolder().openDb(_opCtx.get(), nss.db(), &justCreated); ASSERT_TRUE(db); ASSERT_TRUE(justCreated); } auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); // Since the missing document is not found on the sync source, the collection referenced by // the failed operation should not be automatically created. ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); ASSERT_EQUALS(fetchCount.load(), 1U); } TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) { BSONObj emptyDoc; SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); auto doc1 = BSON("_id" << 1); auto doc2 = BSON("_id" << 2); auto doc3 = BSON("_id" << 3); auto op0 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc1); auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, badNss, doc2); auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); auto iter = collectionReader.makeIterator(); ASSERT_BSONOBJ_EQ(doc3, unittest::assertGet(iter->next()).first); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(iter->next()).first); ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); } TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) { BSONObj emptyDoc; SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); auto doc1 = BSON("_id" << 1); auto keyPattern = BSON("a" << 1); auto doc3 = BSON("_id" << 3); auto op0 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc1); auto op2 = makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, badNss, "a_1", keyPattern); auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); auto iter = collectionReader.makeIterator(); ASSERT_BSONOBJ_EQ(doc3, unittest::assertGet(iter->next()).first); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(iter->next()).first); ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); // 'badNss' collection should not be implicitly created while attempting to create an index. ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), badNss).getCollection()); } TEST_F(SyncTailTest, MultiInitialSyncApplyFetchesMissingDocumentIfDocumentIsAvailableFromSyncSource) { SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1)); NamespaceString nss("test.t"); createCollection(_opCtx.get(), nss, {}); auto updatedDocument = BSON("_id" << 0 << "x" << 1); auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument); MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 1U); // The collection referenced by "ns" in the failed operation is automatically created to hold // the missing document fetched from the sync source. We verify the contents of the collection // with the OplogInterfaceLocal class. OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); auto iter = collectionReader.makeIterator(); ASSERT_BSONOBJ_EQ(updatedDocument, unittest::assertGet(iter->next()).first); ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); } TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnUpdate) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); auto indexOp = buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3)); auto ops = {insertOp, updateOp, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 16755); } TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnIndexing) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto indexOp = buildIndex(fromjson("{loc: '2dsphere'}"), BSON("2dsphereIndexVersion" << 3)); auto dropIndexOp = dropIndex("loc_index"); auto insertOp = insert(fromjson("{_id: 1, loc: 'hi'}")); auto ops = {indexOp, dropIndexOp, insertOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 16755); } TEST_F(IdempotencyTest, Geo2dIndex) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto insertOp = insert(fromjson("{_id: 1, loc: [1]}")); auto updateOp = update(1, fromjson("{$set: {loc: [1, 2]}}")); auto indexOp = buildIndex(fromjson("{loc: '2d'}")); auto ops = {insertOp, updateOp, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 13068); } TEST_F(IdempotencyTest, UniqueKeyIndex) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto insertOp = insert(fromjson("{_id: 1, x: 5}")); auto updateOp = update(1, fromjson("{$set: {x: 6}}")); auto insertOp2 = insert(fromjson("{_id: 2, x: 5}")); auto indexOp = buildIndex(fromjson("{x: 1}"), fromjson("{unique: true}")); auto ops = {insertOp, updateOp, insertOp2, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), ErrorCodes::DuplicateKey); } TEST_F(IdempotencyTest, ParallelArrayError) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); ASSERT_OK(runOpInitialSync(insert(fromjson("{_id: 1}")))); auto updateOp1 = update(1, fromjson("{$set: {x: [1, 2]}}")); auto updateOp2 = update(1, fromjson("{$set: {x: 1}}")); auto updateOp3 = update(1, fromjson("{$set: {y: [3, 4]}}")); auto indexOp = buildIndex(fromjson("{x: 1, y: 1}")); auto ops = {updateOp1, updateOp2, updateOp3, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), ErrorCodes::CannotIndexParallelArrays); } TEST_F(IdempotencyTest, IndexKeyTooLongError) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); ASSERT_OK(runOpInitialSync(insert(fromjson("{_id: 1}")))); // Key size limit is 1024 for ephemeral storage engine, so two 800 byte fields cannot // co-exist. std::string longStr(800, 'a'); auto updateOp1 = update(1, BSON("$set" << BSON("x" << longStr))); auto updateOp2 = update(1, fromjson("{$set: {x: 1}}")); auto updateOp3 = update(1, BSON("$set" << BSON("y" << longStr))); auto indexOp = buildIndex(fromjson("{x: 1, y: 1}")); auto ops = {updateOp1, updateOp2, updateOp3, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), ErrorCodes::KeyTooLong); } TEST_F(IdempotencyTest, IndexWithDifferentOptions) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); ASSERT_OK(runOpInitialSync(insert(fromjson("{_id: 1, x: 'hi'}")))); auto indexOp1 = buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'spanish'}")); auto dropIndexOp = dropIndex("x_index"); auto indexOp2 = buildIndex(fromjson("{x: 'text'}"), fromjson("{default_language: 'english'}")); auto ops = {indexOp1, dropIndexOp, indexOp2}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), ErrorCodes::IndexOptionsConflict); } TEST_F(IdempotencyTest, TextIndexDocumentHasNonStringLanguageField) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 1}")); auto updateOp = update(1, fromjson("{$unset: {language: 1}}")); auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj()); auto ops = {insertOp, updateOp, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 17261); } TEST_F(IdempotencyTest, InsertDocumentWithNonStringLanguageFieldWhenTextIndexExists) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj()); auto dropIndexOp = dropIndex("x_index"); auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 1}")); auto ops = {indexOp, dropIndexOp, insertOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 17261); } TEST_F(IdempotencyTest, TextIndexDocumentHasNonStringLanguageOverrideField) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', y: 1}")); auto updateOp = update(1, fromjson("{$unset: {y: 1}}")); auto indexOp = buildIndex(fromjson("{x: 'text'}"), fromjson("{language_override: 'y'}")); auto ops = {insertOp, updateOp, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 17261); } TEST_F(IdempotencyTest, InsertDocumentWithNonStringLanguageOverrideFieldWhenTextIndexExists) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto indexOp = buildIndex(fromjson("{x: 'text'}"), fromjson("{language_override: 'y'}")); auto dropIndexOp = dropIndex("x_index"); auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', y: 1}")); auto ops = {indexOp, dropIndexOp, insertOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 17261); } TEST_F(IdempotencyTest, TextIndexDocumentHasUnknownLanguage) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); auto insertOp = insert(fromjson("{_id: 1, x: 'words to index', language: 'bad'}")); auto updateOp = update(1, fromjson("{$unset: {language: 1}}")); auto indexOp = buildIndex(fromjson("{x: 'text'}"), BSONObj()); auto ops = {insertOp, updateOp, indexOp}; testOpsAreIdempotent(ops); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); auto status = runOpsInitialSync(ops); ASSERT_EQ(status.code(), 17262); } TEST_F(IdempotencyTest, CreateCollectionWithValidation) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); const BSONObj uuidObj = UUID::gen().toBSON(); auto runOpsAndValidate = [this, uuidObj]() { auto options1 = fromjson("{'validator' : {'phone' : {'$type' : 'string' } } }"); options1 = options1.addField(uuidObj.firstElement()); auto createColl1 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options1); auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); auto options2 = fromjson("{'validator' : {'phone' : {'$type' : 'number' } } }"); options2 = options2.addField(uuidObj.firstElement()); auto createColl2 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options2); auto ops = {createColl1, dropColl, createColl2}; ASSERT_OK(runOpsInitialSync(ops)); auto state = validate(); return state; }; auto state1 = runOpsAndValidate(); auto state2 = runOpsAndValidate(); ASSERT_EQUALS(state1, state2); } TEST_F(IdempotencyTest, CreateCollectionWithCollation) { ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) ->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); CollectionUUID uuid = UUID::gen(); auto runOpsAndValidate = [this, uuid]() { auto insertOp1 = insert(fromjson("{ _id: 'foo' }")); auto insertOp2 = insert(fromjson("{ _id: 'Foo', x: 1 }")); auto updateOp = update("foo", BSON("$set" << BSON("x" << 2))); auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); auto options = BSON("collation" << BSON("locale" << "en" << "caseLevel" << false << "caseFirst" << "off" << "strength" << 1 << "numericOrdering" << false << "alternate" << "non-ignorable" << "maxVariable" << "punct" << "normalization" << false << "backwards" << false << "version" << "57.1") << "uuid" << uuid); auto createColl = makeCreateCollectionOplogEntry(nextOpTime(), nss, options); auto ops = {insertOp1, insertOp2, updateOp, dropColl, createColl}; ASSERT_OK(runOpsInitialSync(ops)); auto state = validate(); return state; }; auto state1 = runOpsAndValidate(); auto state2 = runOpsAndValidate(); ASSERT_EQUALS(state1, state2); } TEST_F(IdempotencyTest, CreateCollectionWithIdIndex) { ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) ->setFollowerMode(MemberState::RS_RECOVERING)); CollectionUUID uuid = UUID::gen(); auto options1 = BSON("idIndex" << BSON("key" << fromjson("{_id: 1}") << "name" << "_id_" << "v" << 2 << "ns" << nss.ns()) << "uuid" << uuid); auto createColl1 = makeCreateCollectionOplogEntry(nextOpTime(), nss, options1); ASSERT_OK(runOpInitialSync(createColl1)); auto runOpsAndValidate = [this, uuid]() { auto insertOp = insert(BSON("_id" << Decimal128(1))); auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); auto createColl2 = createCollection(uuid); auto ops = {insertOp, dropColl, createColl2}; ASSERT_OK(runOpsInitialSync(ops)); auto state = validate(); return state; }; auto state1 = runOpsAndValidate(); auto state2 = runOpsAndValidate(); ASSERT_EQUALS(state1, state2); } TEST_F(IdempotencyTest, CreateCollectionWithView) { ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) ->setFollowerMode(MemberState::RS_RECOVERING)); CollectionOptions options; options.uuid = UUID::gen(); // Create data collection ASSERT_OK(runOpInitialSync(createCollection())); // Create "system.views" collection auto viewNss = NamespaceString(nss.db(), "system.views"); ASSERT_OK( runOpInitialSync(makeCreateCollectionOplogEntry(nextOpTime(), viewNss, options.toBSON()))); auto viewDoc = BSON("_id" << NamespaceString(nss.db(), "view").ns() << "viewOn" << nss.coll() << "pipeline" << fromjson("[ { '$project' : { 'x' : 1 } } ]")); auto insertViewOp = makeInsertDocumentOplogEntry(nextOpTime(), viewNss, viewDoc); auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); auto ops = {insertViewOp, dropColl}; testOpsAreIdempotent(ops); } TEST_F(IdempotencyTest, CollModNamespaceNotFound) { ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) ->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); ASSERT_OK( runOpInitialSync(buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600)))); auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd); auto dropCollOp = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); auto ops = {collModOp, dropCollOp}; testOpsAreIdempotent(ops); } TEST_F(IdempotencyTest, CollModIndexNotFound) { ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) ->setFollowerMode(MemberState::RS_RECOVERING)); ASSERT_OK(runOpInitialSync(createCollection())); ASSERT_OK( runOpInitialSync(buildIndex(BSON("createdAt" << 1), BSON("expireAfterSeconds" << 3600)))); auto indexChange = fromjson("{keyPattern: {createdAt:1}, expireAfterSeconds:4000}}"); auto collModCmd = BSON("collMod" << nss.coll() << "index" << indexChange); auto collModOp = makeCommandOplogEntry(nextOpTime(), nss, collModCmd); auto dropIndexOp = dropIndex("createdAt_index"); auto ops = {collModOp, dropIndexOp}; testOpsAreIdempotent(ops); } TEST_F(SyncTailTest, FailOnDropFCVCollection) { ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); auto fcvNS = NamespaceString(FeatureCompatibilityVersion::kCollection); auto cmd = BSON("drop" << fcvNS.coll()); auto op = makeCommandOplogEntry( nextOpTime(), NamespaceString(FeatureCompatibilityVersion::kCollection), cmd); ASSERT_EQUALS(runOpInitialSync(op), ErrorCodes::OplogOperationUnsupported); } TEST_F(SyncTailTest, FailOnInsertFCVDocument) { auto fcvNS = NamespaceString(FeatureCompatibilityVersion::kCollection); ::mongo::repl::createCollection(_opCtx.get(), fcvNS, CollectionOptions()); ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); auto op = makeInsertDocumentOplogEntry( nextOpTime(), fcvNS, BSON("_id" << FeatureCompatibilityVersionParser::kParameterName)); ASSERT_EQUALS(runOpInitialSync(op), ErrorCodes::OplogOperationUnsupported); } TEST_F(IdempotencyTest, InsertToFCVCollectionBesidesFCVDocumentSucceeds) { auto fcvNS = NamespaceString(FeatureCompatibilityVersion::kCollection); ::mongo::repl::createCollection(_opCtx.get(), fcvNS, CollectionOptions()); ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); auto op = makeInsertDocumentOplogEntry(nextOpTime(), fcvNS, BSON("_id" << "other")); ASSERT_OK(runOpInitialSync(op)); } TEST_F(IdempotencyTest, DropDatabaseSucceeds) { // Choose `system.profile` so the storage engine doesn't expect the drop to be timestamped. auto ns = NamespaceString("foo.system.profile"); ::mongo::repl::createCollection(_opCtx.get(), ns, CollectionOptions()); ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); auto op = makeCommandOplogEntry(nextOpTime(), ns, BSON("dropDatabase" << 1)); ASSERT_OK(runOpInitialSync(op)); } TEST_F(SyncTailTest, DropDatabaseSucceedsInRecovering) { // Choose `system.profile` so the storage engine doesn't expect the drop to be timestamped. auto ns = NamespaceString("foo.system.profile"); ::mongo::repl::createCollection(_opCtx.get(), ns, CollectionOptions()); ASSERT_OK( ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); auto op = makeCommandOplogEntry(nextOpTime(), ns, BSON("dropDatabase" << 1)); ASSERT_OK(runOpSteadyState(op)); } class SyncTailTxnTableTest : public SyncTailTest { public: void setUp() override { SyncTailTest::setUp(); SessionCatalog::create(_opCtx->getServiceContext()); SessionCatalog::get(_opCtx->getServiceContext())->onStepUp(_opCtx.get()); DBDirectClient client(_opCtx.get()); BSONObj result; ASSERT(client.runCommand(kNs.db().toString(), BSON("create" << kNs.coll()), result)); } void tearDown() override { SessionCatalog::reset_forTest(_opCtx->getServiceContext()); SyncTailTest::tearDown(); } /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ repl::OplogEntry makeOplogEntry(const NamespaceString& ns, repl::OpTime opTime, repl::OpTypeEnum opType, BSONObj object, boost::optional object2, const OperationSessionInfo& sessionInfo, Date_t wallClockTime) { return repl::OplogEntry(opTime, // optime 0, // hash opType, // opType ns, // namespace boost::none, // uuid boost::none, // fromMigrate 0, // version object, // o object2, // o2 sessionInfo, // sessionInfo boost::none, // false wallClockTime, // wall clock time boost::none, // statement id boost::none, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime } void checkTxnTable(const OperationSessionInfo& sessionInfo, const repl::OpTime& expectedOpTime, Date_t expectedWallClock) { invariant(sessionInfo.getSessionId()); invariant(sessionInfo.getTxnNumber()); DBDirectClient client(_opCtx.get()); auto result = client.findOne( NamespaceString::kSessionTransactionsTableNamespace.ns(), {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())}); ASSERT_FALSE(result.isEmpty()); auto txnRecord = SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result); ASSERT_EQ(*sessionInfo.getTxnNumber(), txnRecord.getTxnNum()); ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime()); ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate()); } static const NamespaceString& nss() { return kNs; } private: static const NamespaceString kNs; }; const NamespaceString SyncTailTxnTableTest::kNs("test.foo"); TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) { const auto sessionId = makeLogicalSessionIdForTest(); OperationSessionInfo sessionInfo; sessionInfo.setSessionId(sessionId); sessionInfo.setTxnNumber(3); const auto date = Date_t::now(); auto insertOp = makeOplogEntry(nss(), {Timestamp(1, 0), 1}, repl::OpTypeEnum::kInsert, BSON("_id" << 1), boost::none, sessionInfo, date); auto writerPool = SyncTail::makeWriterPool(); SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp})); checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date); } TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { const auto sessionId = makeLogicalSessionIdForTest(); OperationSessionInfo sessionInfo; sessionInfo.setSessionId(sessionId); sessionInfo.setTxnNumber(3); const auto date = Date_t::now(); auto insertOp = makeOplogEntry(nss(), {Timestamp(1, 0), 1}, repl::OpTypeEnum::kInsert, BSON("_id" << 1), boost::none, sessionInfo, date); auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, {Timestamp(2, 0), 1}, repl::OpTypeEnum::kDelete, BSON("_id" << sessionInfo.getSessionId()->toBSON()), boost::none, {}, Date_t::now()); auto writerPool = SyncTail::makeWriterPool(); SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp})); DBDirectClient client(_opCtx.get()); auto result = client.findOne( NamespaceString::kSessionTransactionsTableNamespace.ns(), {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())}); ASSERT_TRUE(result.isEmpty()); } TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectWriteToTxnTable) { const auto sessionId = makeLogicalSessionIdForTest(); OperationSessionInfo sessionInfo; sessionInfo.setSessionId(sessionId); sessionInfo.setTxnNumber(3); auto date = Date_t::now(); auto insertOp = makeOplogEntry(nss(), {Timestamp(1, 0), 1}, repl::OpTypeEnum::kInsert, BSON("_id" << 1), boost::none, sessionInfo, date); auto deleteOp = makeOplogEntry(NamespaceString::kSessionTransactionsTableNamespace, {Timestamp(2, 0), 1}, repl::OpTypeEnum::kDelete, BSON("_id" << sessionInfo.getSessionId()->toBSON()), boost::none, {}, Date_t::now()); date = Date_t::now(); sessionInfo.setTxnNumber(7); auto insertOp2 = makeOplogEntry(nss(), {Timestamp(3, 0), 2}, repl::OpTypeEnum::kInsert, BSON("_id" << 6), boost::none, sessionInfo, date); auto writerPool = SyncTail::makeWriterPool(); SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2})); checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date); } TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { NamespaceString ns0("test.0"); NamespaceString ns1("test.1"); NamespaceString ns2("test.2"); NamespaceString ns3("test.3"); DBDirectClient client(_opCtx.get()); BSONObj result; ASSERT(client.runCommand(ns0.db().toString(), BSON("create" << ns0.coll()), result)); ASSERT(client.runCommand(ns1.db().toString(), BSON("create" << ns1.coll()), result)); ASSERT(client.runCommand(ns2.db().toString(), BSON("create" << ns2.coll()), result)); ASSERT(client.runCommand(ns3.db().toString(), BSON("create" << ns3.coll()), result)); // Entries with a session id and a txnNumber update the transaction table. auto lsidSingle = makeLogicalSessionIdForTest(); auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( {Timestamp(Seconds(1), 0), 1LL}, ns0, BSON("_id" << 0), lsidSingle, 5LL, 0); // For entries with the same session, the entry with a larger txnNumber is saved. auto lsidDiffTxn = makeLogicalSessionIdForTest(); auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( {Timestamp(Seconds(2), 0), 1LL}, ns1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1); auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( {Timestamp(Seconds(3), 0), 1LL}, ns1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1); // For entries with the same session and txnNumber, the later optime is saved. auto lsidSameTxn = makeLogicalSessionIdForTest(); auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( {Timestamp(Seconds(6), 0), 1LL}, ns2, BSON("_id" << 0), lsidSameTxn, 30LL, 0); auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( {Timestamp(Seconds(5), 0), 1LL}, ns2, BSON("_id" << 1), lsidSameTxn, 30LL, 1); // Entries with a session id but no txnNumber do not lead to updates. auto lsidNoTxn = makeLogicalSessionIdForTest(); OperationSessionInfo info; info.setSessionId(lsidNoTxn); auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo( {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info); auto writerPool = SyncTail::makeWriterPool(); SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply( _opCtx.get(), {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn})); // The txnNum and optime of the only write were saved. auto resultSingleDoc = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON())); ASSERT_TRUE(!resultSingleDoc.isEmpty()); auto resultSingle = SessionTxnRecord::parse(IDLParserErrorContext("resultSingleDoc test"), resultSingleDoc); ASSERT_EQ(resultSingle.getTxnNum(), 5LL); ASSERT_EQ(resultSingle.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(1), 0), 1)); // The txnNum and optime of the write with the larger txnNum were saved. auto resultDiffTxnDoc = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON())); ASSERT_TRUE(!resultDiffTxnDoc.isEmpty()); auto resultDiffTxn = SessionTxnRecord::parse(IDLParserErrorContext("resultDiffTxnDoc test"), resultDiffTxnDoc); ASSERT_EQ(resultDiffTxn.getTxnNum(), 20LL); ASSERT_EQ(resultDiffTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(3), 0), 1)); // The txnNum and optime of the write with the later optime were saved. auto resultSameTxnDoc = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON())); ASSERT_TRUE(!resultSameTxnDoc.isEmpty()); auto resultSameTxn = SessionTxnRecord::parse(IDLParserErrorContext("resultSameTxnDoc test"), resultSameTxnDoc); ASSERT_EQ(resultSameTxn.getTxnNum(), 30LL); ASSERT_EQ(resultSameTxn.getLastWriteOpTime(), repl::OpTime(Timestamp(Seconds(6), 0), 1)); // There is no entry for the write with no txnNumber. auto resultNoTxn = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON())); ASSERT_TRUE(resultNoTxn.isEmpty()); } } // namespace } // namespace repl } // namespace mongo