/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/op_observer_impl.h" #include "keys_collection_client_sharded.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/locker_noop.h" #include "mongo/db/db_raii.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/logical_clock.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog.h" #include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h" #include "mongo/db/transaction_participant.h" #include "mongo/s/config_server_test_fixture.h" #include "mongo/unittest/death_test.h" #include "mongo/util/clock_source_mock.h" namespace mongo { using repl::OplogEntry; using unittest::assertGet; namespace { class OpObserverTest : public ServiceContextMongoDTest { public: void setUp() override { // Set up mongod. ServiceContextMongoDTest::setUp(); auto service = getServiceContext(); auto opCtx = cc().makeOperationContext(); // Set up ReplicationCoordinator and create oplog. repl::ReplicationCoordinator::set( service, stdx::make_unique(service, createReplSettings())); repl::setOplogCollectionName(service); repl::createOplog(opCtx.get()); // Ensure that we are primary. auto replCoord = repl::ReplicationCoordinator::get(opCtx.get()); ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); } protected: // Assert that oplog only has a single entry and return that oplog entry. BSONObj getSingleOplogEntry(OperationContext* opCtx) { repl::OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns()); auto oplogIter = oplogInterface.makeIterator(); auto opEntry = unittest::assertGet(oplogIter->next()); ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus()); return opEntry.first; } private: // Creates a reasonable set of ReplSettings for most tests. We need to be able to // override this to create a larger oplog. virtual repl::ReplSettings createReplSettings() { repl::ReplSettings settings; settings.setOplogSizeBytes(5 * 1024 * 1024); settings.setReplSetString("mySet/node1:12345"); return settings; } }; TEST_F(OpObserverTest, CollModWithCollectionOptionsAndTTLInfo) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); auto uuid = CollectionUUID::gen(); // Create 'collMod' command. NamespaceString nss("test.coll"); BSONObj collModCmd = BSON("collMod" << nss.coll() << "validationLevel" << "off" << "validationAction" << "warn" // We verify that 'onCollMod' ignores this field. << "index" << "indexData"); CollectionOptions oldCollOpts; oldCollOpts.validationLevel = "strict"; oldCollOpts.validationAction = "error"; oldCollOpts.flags = 2; oldCollOpts.flagsSet = true; TTLCollModInfo ttlInfo; ttlInfo.expireAfterSeconds = Seconds(10); ttlInfo.oldExpireAfterSeconds = Seconds(5); ttlInfo.indexName = "name_of_index"; // Write to the oplog. { AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.onCollMod(opCtx.get(), nss, uuid, collModCmd, oldCollOpts, ttlInfo); wunit.commit(); } auto oplogEntry = getSingleOplogEntry(opCtx.get()); // Ensure that collMod fields were properly added to the oplog entry. auto o = oplogEntry.getObjectField("o"); auto oExpected = BSON("collMod" << nss.coll() << "validationLevel" << "off" << "validationAction" << "warn" << "index" << BSON("name" << ttlInfo.indexName << "expireAfterSeconds" << durationCount(ttlInfo.expireAfterSeconds))); ASSERT_BSONOBJ_EQ(oExpected, o); // Ensure that the old collection metadata was saved. auto o2 = oplogEntry.getObjectField("o2"); auto o2Expected = BSON("collectionOptions_old" << BSON("flags" << oldCollOpts.flags << "validationLevel" << oldCollOpts.validationLevel << "validationAction" << oldCollOpts.validationAction) << "expireAfterSeconds_old" << durationCount(ttlInfo.oldExpireAfterSeconds)); ASSERT_BSONOBJ_EQ(o2Expected, o2); } TEST_F(OpObserverTest, CollModWithOnlyCollectionOptions) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); auto uuid = CollectionUUID::gen(); // Create 'collMod' command. NamespaceString nss("test.coll"); BSONObj collModCmd = BSON("collMod" << nss.coll() << "validationLevel" << "off" << "validationAction" << "warn"); CollectionOptions oldCollOpts; oldCollOpts.validationLevel = "strict"; oldCollOpts.validationAction = "error"; // Write to the oplog. { AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.onCollMod(opCtx.get(), nss, uuid, collModCmd, oldCollOpts, boost::none); wunit.commit(); } auto oplogEntry = getSingleOplogEntry(opCtx.get()); // Ensure that collMod fields were properly added to oplog entry. auto o = oplogEntry.getObjectField("o"); auto oExpected = collModCmd; ASSERT_BSONOBJ_EQ(oExpected, o); // Ensure that the old collection metadata was saved and that TTL info is not present. auto o2 = oplogEntry.getObjectField("o2"); auto o2Expected = BSON("collectionOptions_old" << BSON("validationLevel" << oldCollOpts.validationLevel << "validationAction" << oldCollOpts.validationAction)); ASSERT_BSONOBJ_EQ(o2Expected, o2); } TEST_F(OpObserverTest, OnDropCollectionReturnsDropOpTime) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); auto uuid = CollectionUUID::gen(); // Create 'drop' command. NamespaceString nss("test.coll"); auto dropCmd = BSON("drop" << nss.coll()); // Write to the oplog. repl::OpTime dropOpTime; { AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.onDropCollection(opCtx.get(), nss, uuid); dropOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front(); wunit.commit(); } auto oplogEntry = getSingleOplogEntry(opCtx.get()); // Ensure that drop fields were properly added to oplog entry. auto o = oplogEntry.getObjectField("o"); auto oExpected = dropCmd; ASSERT_BSONOBJ_EQ(oExpected, o); // Ensure that the drop optime returned is the same as the last optime in the ReplClientInfo. ASSERT_EQUALS(repl::ReplClientInfo::forClient(&cc()).getLastOp(), dropOpTime); } TEST_F(OpObserverTest, OnRenameCollectionReturnsRenameOpTime) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); auto uuid = CollectionUUID::gen(); auto dropTargetUuid = CollectionUUID::gen(); auto stayTemp = false; NamespaceString sourceNss("test.foo"); NamespaceString targetNss("test.bar"); // Write to the oplog. repl::OpTime renameOpTime; { AutoGetDb autoDb(opCtx.get(), sourceNss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.onRenameCollection( opCtx.get(), sourceNss, targetNss, uuid, dropTargetUuid, stayTemp); renameOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front(); wunit.commit(); } auto oplogEntry = getSingleOplogEntry(opCtx.get()); // Ensure that renameCollection fields were properly added to oplog entry. ASSERT_EQUALS(uuid, unittest::assertGet(UUID::parse(oplogEntry["ui"]))); auto o = oplogEntry.getObjectField("o"); auto oExpected = BSON( "renameCollection" << sourceNss.ns() << "to" << targetNss.ns() << "stayTemp" << stayTemp << "dropTarget" << dropTargetUuid); ASSERT_BSONOBJ_EQ(oExpected, o); // Ensure that the rename optime returned is the same as the last optime in the ReplClientInfo. ASSERT_EQUALS(repl::ReplClientInfo::forClient(&cc()).getLastOp(), renameOpTime); } TEST_F(OpObserverTest, OnRenameCollectionOmitsDropTargetFieldIfDropTargetUuidIsNull) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); auto uuid = CollectionUUID::gen(); auto stayTemp = true; NamespaceString sourceNss("test.foo"); NamespaceString targetNss("test.bar"); // Write to the oplog. { AutoGetDb autoDb(opCtx.get(), sourceNss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.onRenameCollection(opCtx.get(), sourceNss, targetNss, uuid, {}, stayTemp); wunit.commit(); } auto oplogEntry = getSingleOplogEntry(opCtx.get()); // Ensure that renameCollection fields were properly added to oplog entry. ASSERT_EQUALS(uuid, unittest::assertGet(UUID::parse(oplogEntry["ui"]))); auto o = oplogEntry.getObjectField("o"); auto oExpected = BSON( "renameCollection" << sourceNss.ns() << "to" << targetNss.ns() << "stayTemp" << stayTemp); ASSERT_BSONOBJ_EQ(oExpected, o); } /** * Test fixture for testing OpObserver behavior specific to the SessionCatalog. */ class OpObserverSessionCatalogTest : public OpObserverTest { public: void setUp() override { OpObserverTest::setUp(); auto opCtx = cc().makeOperationContext(); auto sessionCatalog = SessionCatalog::get(getServiceContext()); sessionCatalog->reset_forTest(); sessionCatalog->onStepUp(opCtx.get()); } /** * Simulate a new write occurring on given session with the given transaction number and * statement id. */ void simulateSessionWrite(OperationContext* opCtx, ScopedSession session, NamespaceString nss, TxnNumber txnNum, StmtId stmtId) { session->beginOrContinueTxn(opCtx, txnNum); { AutoGetCollection autoColl(opCtx, nss, MODE_IX); WriteUnitOfWork wuow(opCtx); auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp. session->onWriteOpCompletedOnPrimary( opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none); wuow.commit(); } } }; TEST_F(OpObserverSessionCatalogTest, OnRollbackInvalidatesSessionCatalogIfSessionOpsRolledBack) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); const NamespaceString nss("testDB", "testColl"); // Create a session. auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId); session->refreshFromStorageIfNeeded(opCtx.get()); // Simulate a write occurring on that session. const TxnNumber txnNum = 0; const StmtId stmtId = 1000; simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId); // Check that the statement executed. ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); // The OpObserver should invalidate in-memory session state, so the check after this should // fail. OpObserver::RollbackObserverInfo rbInfo; rbInfo.rollbackSessionIds = {UUID::gen()}; opObserver.onReplicationRollback(opCtx.get(), rbInfo); ASSERT_THROWS_CODE(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId), DBException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(OpObserverSessionCatalogTest, OnRollbackDoesntInvalidateSessionCatalogIfNoSessionOpsRolledBack) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); const NamespaceString nss("testDB", "testColl"); // Create a session. auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId); session->refreshFromStorageIfNeeded(opCtx.get()); // Simulate a write occurring on that session. const TxnNumber txnNum = 0; const StmtId stmtId = 1000; simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId); // Check that the statement executed. ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); // The OpObserver should not invalidate the in-memory session state, so the check after this // should still succeed. OpObserver::RollbackObserverInfo rbInfo; opObserver.onReplicationRollback(opCtx.get(), rbInfo); ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); } /** * Test fixture with sessions and an extra-large oplog for testing large transactions. */ class OpObserverLargeTransactionTest : public OpObserverSessionCatalogTest { private: repl::ReplSettings createReplSettings() override { repl::ReplSettings settings; // We need an oplog comfortably large enough to hold an oplog entry that exceeds the BSON // size limit. Otherwise we will get the wrong error code when trying to write one. settings.setOplogSizeBytes(BSONObjMaxInternalSize + 2 * 1024 * 1024); settings.setReplSetString("mySet/node1:12345"); return settings; } }; // Tests that a transaction aborts if it becomes too large only during the commit. TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); const NamespaceString nss("testDB", "testColl"); // Create a session. auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId); auto uuid = CollectionUUID::gen(); // Simulate adding transaction data to a session. const TxnNumber txnNum = 0; opCtx->setLogicalSessionId(sessionId); opCtx->setTxnNumber(txnNum); OperationContextSessionMongod opSession(opCtx.get(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx.get()); txnParticipant->unstashTransactionResources(opCtx.get(), "insert"); // This size is crafted such that two operations of this size are not too big to fit in a single // oplog entry, but two operations plus oplog overhead are too big to fit in a single oplog // entry. constexpr size_t kHalfTransactionSize = BSONObjMaxInternalSize / 2 - 175; std::unique_ptr halfTransactionData(new uint8_t[kHalfTransactionSize]()); auto operation = repl::OplogEntry::makeInsertOperation( nss, uuid, BSON( "_id" << 0 << "data" << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral))); txnParticipant->addTransactionOperation(opCtx.get(), operation); txnParticipant->addTransactionOperation(opCtx.get(), operation); ASSERT_THROWS_CODE(opObserver.onTransactionCommit(opCtx.get(), false), AssertionException, ErrorCodes::TransactionTooLarge); } TEST_F(OpObserverTest, OnRollbackInvalidatesAuthCacheWhenAuthNamespaceRolledBack) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); auto authMgr = AuthorizationManager::get(getServiceContext()); auto initCacheGen = authMgr->getCacheGeneration(); // Verify that the rollback op observer invalidates the user cache for each auth namespace by // checking that the cache generation changes after a call to the rollback observer method. auto nss = AuthorizationManager::rolesCollectionNamespace; OpObserver::RollbackObserverInfo rbInfo; rbInfo.rollbackNamespaces = {AuthorizationManager::rolesCollectionNamespace}; opObserver.onReplicationRollback(opCtx.get(), rbInfo); ASSERT_NE(initCacheGen, authMgr->getCacheGeneration()); initCacheGen = authMgr->getCacheGeneration(); rbInfo.rollbackNamespaces = {AuthorizationManager::usersCollectionNamespace}; opObserver.onReplicationRollback(opCtx.get(), rbInfo); ASSERT_NE(initCacheGen, authMgr->getCacheGeneration()); initCacheGen = authMgr->getCacheGeneration(); rbInfo.rollbackNamespaces = {AuthorizationManager::versionCollectionNamespace}; opObserver.onReplicationRollback(opCtx.get(), rbInfo); ASSERT_NE(initCacheGen, authMgr->getCacheGeneration()); } TEST_F(OpObserverTest, OnRollbackDoesntInvalidateAuthCacheWhenNoAuthNamespaceRolledBack) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); auto authMgr = AuthorizationManager::get(getServiceContext()); auto initCacheGen = authMgr->getCacheGeneration(); // Verify that the rollback op observer doesn't invalidate the user cache. auto nss = AuthorizationManager::rolesCollectionNamespace; OpObserver::RollbackObserverInfo rbInfo; opObserver.onReplicationRollback(opCtx.get(), rbInfo); auto newCacheGen = authMgr->getCacheGeneration(); ASSERT_EQ(newCacheGen, initCacheGen); } TEST_F(OpObserverTest, MultipleAboutToDeleteAndOnDelete) { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); NamespaceString nss = {"test", "coll"}; AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); } /** * Test fixture for testing OpObserver behavior specific to multi-document transactions. */ class OpObserverTransactionTest : public OpObserverSessionCatalogTest { public: void setUp() override { OpObserverSessionCatalogTest::setUp(); // Create a session. _opCtx = cc().makeOperationContext(); auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); _session = sessionCatalog->getOrCreateSession(opCtx(), sessionId); _session->get()->refreshFromStorageIfNeeded(opCtx()); opCtx()->setLogicalSessionId(sessionId); _opObserver.emplace(); _times.emplace(opCtx()); } void tearDown() override { _times.reset(); _opCtx.reset(); OpObserverSessionCatalogTest::tearDown(); } protected: void checkCommonFields(const BSONObj& oplogEntry) { ASSERT_EQ("c"_sd, oplogEntry.getStringField("op")); ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getStringField("ns")); ASSERT_BSONOBJ_EQ(session()->getSessionId().toBSON(), oplogEntry.getObjectField("lsid")); ASSERT_EQ(*opCtx()->getTxnNumber(), oplogEntry.getField("txnNumber").safeNumberLong()); ASSERT_EQ(0, oplogEntry.getIntField("stmtId")); } OperationContext* opCtx() { return _opCtx.get(); } Session* session() { return _session->get(); } OpObserverImpl& opObserver() { return *_opObserver; } private: class ExposeOpObserverTimes : public OpObserver { public: typedef OpObserver::ReservedTimes ReservedTimes; }; ServiceContext::UniqueOperationContext _opCtx; boost::optional _opObserver; boost::optional _times; boost::optional _session; }; TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { const NamespaceString nss1("testDB", "testColl"); const NamespaceString nss2("testDB2", "testColl2"); auto uuid1 = CollectionUUID::gen(); auto uuid2 = CollectionUUID::gen(); const TxnNumber txnNum = 2; opCtx()->setTxnNumber(txnNum); OperationContextSessionMongod opSession(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "insert"); WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); std::vector inserts1; inserts1.emplace_back(0, BSON("_id" << 0 << "data" << "x")); inserts1.emplace_back(1, BSON("_id" << 1 << "data" << "y")); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); OplogUpdateEntryArgs update2; update2.nss = nss2; update2.uuid = uuid2; update2.stmtId = 1; update2.updatedDoc = BSON("_id" << 0 << "data" << "y"); update2.update = BSON("$set" << BSON("data" << "y")); update2.criteria = BSON("_id" << 0); opObserver().onUpdate(opCtx(), update2); opObserver().aboutToDelete(opCtx(), nss1, BSON("_id" << 0 << "data" << "x")); opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none); txnParticipant->transitionToPreparedforTest(); { WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); opObserver().onTransactionPrepare(opCtx(), slot); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); auto o = oplogEntry.getObject(); auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" << "i" << "ns" << nss1.toString() << "ui" << uuid1 << "o" << BSON("_id" << 0 << "data" << "x")) << BSON("op" << "i" << "ns" << nss1.toString() << "ui" << uuid1 << "o" << BSON("_id" << 1 << "data" << "y")) << BSON("op" << "u" << "ns" << nss2.toString() << "ui" << uuid2 << "o" << BSON("$set" << BSON("data" << "y")) << "o2" << BSON("_id" << 0)) << BSON("op" << "d" << "ns" << nss1.toString() << "ui" << uuid1 << "o" << BSON("_id" << 0)))); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT(oplogEntry.getPrepare()); ASSERT(oplogEntry.getPrepare().get()); ASSERT_EQ(oplogEntry.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); } TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) { const TxnNumber txnNum = 2; opCtx()->setTxnNumber(txnNum); OperationContextSessionMongod opSession(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); txnParticipant->transitionToPreparedforTest(); { WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); opObserver().onTransactionPrepare(opCtx(), slot); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); auto o = oplogEntry.getObject(); auto oExpected = BSON("applyOps" << BSONArray()); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT(oplogEntry.getPrepare()); ASSERT(oplogEntry.getPrepare().get()); ASSERT_EQ(oplogEntry.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); } TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { const NamespaceString nss1("testDB", "testColl"); const NamespaceString nss2("testDB2", "testColl2"); auto uuid1 = CollectionUUID::gen(); auto uuid2 = CollectionUUID::gen(); const TxnNumber txnNum = 2; opCtx()->setTxnNumber(txnNum); OperationContextSessionMongod opSession(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "insert"); std::vector inserts1; inserts1.emplace_back(0, BSON("_id" << 0 << "data" << "x")); inserts1.emplace_back(1, BSON("_id" << 1 << "data" << "y")); std::vector inserts2; inserts2.emplace_back(0, BSON("_id" << 2 << "data" << "z")); inserts2.emplace_back(1, BSON("_id" << 3 << "data" << "w")); WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); opObserver().onTransactionCommit(opCtx(), false); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); auto o = oplogEntry.getObject(); auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" << "i" << "ns" << nss1.toString() << "ui" << uuid1 << "o" << BSON("_id" << 0 << "data" << "x")) << BSON("op" << "i" << "ns" << nss1.toString() << "ui" << uuid1 << "o" << BSON("_id" << 1 << "data" << "y")) << BSON("op" << "i" << "ns" << nss2.toString() << "ui" << uuid2 << "o" << BSON("_id" << 2 << "data" << "z")) << BSON("op" << "i" << "ns" << nss2.toString() << "ui" << uuid2 << "o" << BSON("_id" << 3 << "data" << "w")))); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT(!oplogEntry.getPrepare()); ASSERT_FALSE(oplogEntryObj.hasField("prepare")); } TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { const NamespaceString nss1("testDB", "testColl"); const NamespaceString nss2("testDB2", "testColl2"); auto uuid1 = CollectionUUID::gen(); auto uuid2 = CollectionUUID::gen(); const TxnNumber txnNum = 3; opCtx()->setTxnNumber(txnNum); OperationContextSessionMongod opSession(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "update"); OplogUpdateEntryArgs update1; update1.nss = nss1; update1.uuid = uuid1; update1.stmtId = 0; update1.updatedDoc = BSON("_id" << 0 << "data" << "x"); update1.update = BSON("$set" << BSON("data" << "x")); update1.criteria = BSON("_id" << 0); OplogUpdateEntryArgs update2; update2.nss = nss2; update2.uuid = uuid2; update2.stmtId = 1; update2.updatedDoc = BSON("_id" << 1 << "data" << "y"); update2.update = BSON("$set" << BSON("data" << "y")); update2.criteria = BSON("_id" << 1); WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); opObserver().onTransactionCommit(opCtx(), false); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" << "u" << "ns" << nss1.toString() << "ui" << uuid1 << "o" << BSON("$set" << BSON("data" << "x")) << "o2" << BSON("_id" << 0)) << BSON("op" << "u" << "ns" << nss2.toString() << "ui" << uuid2 << "o" << BSON("$set" << BSON("data" << "y")) << "o2" << BSON("_id" << 1)))); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT_FALSE(oplogEntry.hasField("prepare")); ASSERT_FALSE(oplogEntry.getBoolField("prepare")); } TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { const NamespaceString nss1("testDB", "testColl"); const NamespaceString nss2("testDB2", "testColl2"); auto uuid1 = CollectionUUID::gen(); auto uuid2 = CollectionUUID::gen(); const TxnNumber txnNum = 3; opCtx()->setTxnNumber(txnNum); OperationContextSessionMongod sessionTxnState(opCtx(), true, false, true); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->unstashTransactionResources(opCtx(), "delete"); WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().aboutToDelete(opCtx(), nss1, BSON("_id" << 0 << "data" << "x")); opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none); opObserver().aboutToDelete(opCtx(), nss2, BSON("_id" << 1 << "data" << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); opObserver().onTransactionCommit(opCtx(), false); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" << "d" << "ns" << nss1.toString() << "ui" << uuid1 << "o" << BSON("_id" << 0)) << BSON("op" << "d" << "ns" << nss2.toString() << "ui" << uuid2 << "o" << BSON("_id" << 1)))); ASSERT_BSONOBJ_EQ(oExpected, o); ASSERT_FALSE(oplogEntry.hasField("prepare")); ASSERT_FALSE(oplogEntry.getBoolField("prepare")); } DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); opCtx->swapLockState(stdx::make_unique()); NamespaceString nss = {"test", "coll"}; opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); } DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); opCtx->swapLockState(stdx::make_unique()); NamespaceString nss = {"test", "coll"}; opObserver.aboutToDelete(opCtx.get(), nss, {}); opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); } DEATH_TEST_F(OpObserverTest, NodeCrashesIfShardIdentityDocumentRolledBack, "Fatal Assertion 50712") { OpObserverImpl opObserver; auto opCtx = cc().makeOperationContext(); OpObserver::RollbackObserverInfo rbInfo; rbInfo.shardIdentityRolledBack = true; opObserver.onReplicationRollback(opCtx.get(), rbInfo); } } // namespace } // namespace mongo