/** * Copyright 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include #include #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_create.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface.h" #include "mongo/db/repl/oplog_interface_mock.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/rollback_source.h" #include "mongo/db/repl/rs_rollback.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" namespace { using namespace mongo; using namespace mongo::repl; const auto kIndexVersion = IndexDescriptor::IndexVersion::kV2; const OplogInterfaceMock::Operations kEmptyMockOperations; ReplSettings createReplSettings() { ReplSettings settings; settings.setOplogSizeBytes(5 * 1024 * 1024); settings.setReplSetString("mySet/node1:12345"); return settings; } class ReplicationCoordinatorRollbackMock : public ReplicationCoordinatorMock { public: ReplicationCoordinatorRollbackMock(); void resetLastOpTimesFromOplog(OperationContext* txn) override; }; ReplicationCoordinatorRollbackMock::ReplicationCoordinatorRollbackMock() : ReplicationCoordinatorMock(createReplSettings()) {} void ReplicationCoordinatorRollbackMock::resetLastOpTimesFromOplog(OperationContext* txn) {} class RollbackSourceMock : public RollbackSource { public: RollbackSourceMock(std::unique_ptr oplog); int getRollbackId() const override; const OplogInterface& getOplog() const override; BSONObj getLastOperation() const override; BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override; void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override; StatusWith getCollectionInfo(const NamespaceString& nss) const override; private: std::unique_ptr _oplog; }; RollbackSourceMock::RollbackSourceMock(std::unique_ptr oplog) : _oplog(std::move(oplog)) {} const OplogInterface& RollbackSourceMock::getOplog() const { return *_oplog; } int RollbackSourceMock::getRollbackId() const { return 0; } BSONObj RollbackSourceMock::getLastOperation() const { auto iter = _oplog->makeIterator(); auto result = iter->next(); ASSERT_OK(result.getStatus()); return result.getValue().first; } BSONObj RollbackSourceMock::findOne(const NamespaceString& nss, const BSONObj& filter) const { return BSONObj(); } void RollbackSourceMock::copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const {} StatusWith RollbackSourceMock::getCollectionInfo(const NamespaceString& nss) const { return BSON("name" << nss.ns() << "options" << BSONObj()); } class RSRollbackTest : public ServiceContextMongoDTest { protected: ServiceContext::UniqueOperationContext _txn; // Owned by service context ReplicationCoordinator* _coordinator; private: void setUp() override; void tearDown() override; }; void RSRollbackTest::setUp() { ServiceContextMongoDTest::setUp(); _txn = cc().makeOperationContext(); _coordinator = new ReplicationCoordinatorRollbackMock(); auto serviceContext = getServiceContext(); ReplicationCoordinator::set(serviceContext, std::unique_ptr(_coordinator)); StorageInterface::set(serviceContext, stdx::make_unique()); setOplogCollectionName(); repl::StorageInterface::get(_txn.get())->setAppliedThrough(_txn.get(), OpTime{}); repl::StorageInterface::get(_txn.get())->setMinValid(_txn.get(), OpTime{}); } void RSRollbackTest::tearDown() { _txn.reset(); ServiceContextMongoDTest::tearDown(); setGlobalReplicationCoordinator(nullptr); } void noSleep(Seconds seconds) {} TEST_F(RSRollbackTest, InconsistentMinValid) { repl::StorageInterface::get(_txn.get()) ->setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0)); repl::StorageInterface::get(_txn.get()) ->setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0)); auto status = syncRollback(_txn.get(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr( new OplogInterfaceMock(kEmptyMockOperations))), _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); } TEST_F(RSRollbackTest, SetFollowerModeFailed) { class ReplicationCoordinatorSetFollowerModeMock : public ReplicationCoordinatorMock { public: ReplicationCoordinatorSetFollowerModeMock() : ReplicationCoordinatorMock(createReplSettings()) {} MemberState getMemberState() const override { return MemberState::RS_DOWN; } bool setFollowerMode(const MemberState& newState) override { return false; } }; _coordinator = new ReplicationCoordinatorSetFollowerModeMock(); setGlobalReplicationCoordinator(_coordinator); ASSERT_EQUALS(ErrorCodes::OperationFailed, syncRollback(_txn.get(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr( new OplogInterfaceMock(kEmptyMockOperations))), _coordinator, noSleep) .code()); } TEST_F(RSRollbackTest, OplogStartMissing) { OpTime ts(Timestamp(Seconds(1), 0), 0); auto operation = std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); ASSERT_EQUALS( ErrorCodes::OplogStartMissing, syncRollback(_txn.get(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({ operation, }))), _coordinator, noSleep) .code()); } TEST_F(RSRollbackTest, NoRemoteOpLog) { OpTime ts(Timestamp(Seconds(1), 0), 0); auto operation = std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); auto status = syncRollback(_txn.get(), OplogInterfaceMock({operation}), RollbackSourceMock(std::unique_ptr( new OplogInterfaceMock(kEmptyMockOperations))), _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); } TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) { OpTime ts(Timestamp(Seconds(1), 0), 0); auto operation = std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)) {} int getRollbackId() const override { uassert(ErrorCodes::UnknownError, "getRollbackId() failed", false); } }; ASSERT_THROWS_CODE(syncRollback(_txn.get(), OplogInterfaceMock({operation}), RollbackSourceLocal(std::unique_ptr( new OplogInterfaceMock(kEmptyMockOperations))), _coordinator, noSleep), UserException, ErrorCodes::UnknownError); } TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) { createOplog(_txn.get()); OpTime ts(Timestamp(Seconds(1), 0), 1); auto operation = std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId(1)); ASSERT_OK( syncRollback(_txn.get(), OplogInterfaceMock({operation}), RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({ operation, }))), _coordinator, noSleep)); } /** * Create test collection. * Returns collection. */ Collection* _createCollection(OperationContext* txn, const NamespaceString& nss, const CollectionOptions& options) { Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); mongo::WriteUnitOfWork wuow(txn); auto db = dbHolder().openDb(txn, nss.db()); ASSERT_TRUE(db); db->dropCollection(txn, nss.ns()); auto coll = db->createCollection(txn, nss.ns(), options); ASSERT_TRUE(coll); wuow.commit(); return coll; } Collection* _createCollection(OperationContext* txn, const std::string& nss, const CollectionOptions& options) { return _createCollection(txn, NamespaceString(nss), options); } /** * Test function to roll back a delete operation. * Returns number of records in collection after rolling back delete operation. * If collection does not exist after rolling back, returns -1. */ int _testRollbackDelete(OperationContext* txn, ReplicationCoordinator* coordinator, const BSONObj& documentAtSource) { auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto deleteOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "d" << "ns" << "test.t" << "o" << BSON("_id" << 0)), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(const BSONObj& documentAtSource, std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false), _documentAtSource(documentAtSource) {} BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const { called = true; return _documentAtSource; } mutable bool called; private: BSONObj _documentAtSource; }; RollbackSourceLocal rollbackSource(documentAtSource, std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); ASSERT_OK(syncRollback(txn, OplogInterfaceMock({deleteOperation, commonOperation}), rollbackSource, coordinator, noSleep)); ASSERT_TRUE(rollbackSource.called); Lock::DBLock dbLock(txn->lockState(), "test", MODE_S); Lock::CollectionLock collLock(txn->lockState(), "test.t", MODE_S); auto db = dbHolder().get(txn, "test"); ASSERT_TRUE(db); auto collection = db->getCollection("test.t"); if (!collection) { return -1; } return collection->getRecordStore()->numRecords(txn); } TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionDoesNotExist) { createOplog(_txn.get()); ASSERT_EQUALS(-1, _testRollbackDelete(_txn.get(), _coordinator, BSONObj())); } TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionExistsNonCapped) { createOplog(_txn.get()); _createCollection(_txn.get(), "test.t", CollectionOptions()); _testRollbackDelete(_txn.get(), _coordinator, BSONObj()); ASSERT_EQUALS(0, _testRollbackDelete(_txn.get(), _coordinator, BSONObj())); } TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionExistsCapped) { createOplog(_txn.get()); CollectionOptions options; options.capped = true; _createCollection(_txn.get(), "test.t", options); ASSERT_EQUALS(0, _testRollbackDelete(_txn.get(), _coordinator, BSONObj())); } TEST_F(RSRollbackTest, RollbackDeleteRestoreDocument) { createOplog(_txn.get()); _createCollection(_txn.get(), "test.t", CollectionOptions()); BSONObj doc = BSON("_id" << 0 << "a" << 1); _testRollbackDelete(_txn.get(), _coordinator, doc); ASSERT_EQUALS(1, _testRollbackDelete(_txn.get(), _coordinator, doc)); } TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto insertDocumentOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "i" << "ns" << "test.t" << "o" << BSON("a" << 1)), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const { called = true; return BSONObj(); } mutable bool called; private: BSONObj _documentAtSource; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, noSleep); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); ASSERT_EQUALS(1, countLogLinesContaining("cannot rollback op with no _id. ns: test.t,")); ASSERT_FALSE(rollbackSource.called); } TEST_F(RSRollbackTest, RollbackCreateIndexCommand) { createOplog(_txn.get()); auto collection = _createCollection(_txn.get(), "test.t", CollectionOptions()); auto indexSpec = BSON("ns" << "test.t" << "key" << BSON("a" << 1) << "name" << "a_1" << "v" << static_cast(kIndexVersion)); { Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X); MultiIndexBlock indexer(_txn.get(), collection); ASSERT_OK(indexer.init(indexSpec).getStatus()); WriteUnitOfWork wunit(_txn.get()); indexer.commit(); wunit.commit(); auto indexCatalog = collection->getIndexCatalog(); ASSERT(indexCatalog); ASSERT_EQUALS(2, indexCatalog->numIndexesReady(_txn.get())); } auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto insertDocumentOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "i" << "ns" << "test.system.indexes" << "o" << indexSpec), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override { called = true; } mutable bool called; private: BSONObj _documentAtSource; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); // Repeat index creation operation and confirm that rollback attempts to drop index just once. // This can happen when an index is re-created with different options. startCapturingLogMessages(); ASSERT_OK(syncRollback( _txn.get(), OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, noSleep)); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("rollback drop index: collection: test.t. index: a_1")); ASSERT_FALSE(rollbackSource.called); { Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S); auto indexCatalog = collection->getIndexCatalog(); ASSERT(indexCatalog); ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_txn.get())); } } TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) { createOplog(_txn.get()); auto collection = _createCollection(_txn.get(), "test.t", CollectionOptions()); auto indexSpec = BSON("ns" << "test.t" << "key" << BSON("a" << 1) << "name" << "a_1"); // Skip index creation to trigger warning during rollback. { Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S); auto indexCatalog = collection->getIndexCatalog(); ASSERT(indexCatalog); ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_txn.get())); } auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto insertDocumentOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "i" << "ns" << "test.system.indexes" << "o" << indexSpec), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override { called = true; } mutable bool called; private: BSONObj _documentAtSource; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); startCapturingLogMessages(); ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, noSleep)); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("rollback drop index: collection: test.t. index: a_1")); ASSERT_EQUALS(1, countLogLinesContaining("rollback failed to drop index a_1 in test.t")); ASSERT_FALSE(rollbackSource.called); { Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S); auto indexCatalog = collection->getIndexCatalog(); ASSERT(indexCatalog); ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_txn.get())); } } TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto insertDocumentOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "i" << "ns" << "test.system.indexes" << "o" << BSON("key" << BSON("a" << 1) << "name" << "a_1")), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override { called = true; } mutable bool called; private: BSONObj _documentAtSource; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, noSleep); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); ASSERT_EQUALS( 1, countLogLinesContaining("Missing collection namespace in system.indexes operation,")); ASSERT_FALSE(rollbackSource.called); } TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto insertDocumentOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "i" << "ns" << "test.system.indexes" << "o" << BSON("ns" << "test." << "key" << BSON("a" << 1) << "name" << "a_1")), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override { called = true; } mutable bool called; private: BSONObj _documentAtSource; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, noSleep); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); ASSERT_EQUALS( 1, countLogLinesContaining("Invalid collection namespace in system.indexes operation,")); ASSERT_FALSE(rollbackSource.called); } TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto insertDocumentOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "i" << "ns" << "test.system.indexes" << "o" << BSON("ns" << "test.t" << "key" << BSON("a" << 1))), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override { called = true; } mutable bool called; private: BSONObj _documentAtSource; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); startCapturingLogMessages(); auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, _coordinator, noSleep); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); ASSERT_EQUALS(1, countLogLinesContaining("Missing index name in system.indexes operation,")); ASSERT_FALSE(rollbackSource.called); } TEST_F(RSRollbackTest, RollbackUnknownCommand) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto unknownCommandOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "c" << "ns" << "test.t" << "o" << BSON("unknown_command" << "t")), RecordId(2)); { Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X); mongo::WriteUnitOfWork wuow(_txn.get()); auto db = dbHolder().openDb(_txn.get(), "test"); ASSERT_TRUE(db); ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t")); wuow.commit(); } auto status = syncRollback(_txn.get(), OplogInterfaceMock({unknownCommandOperation, commonOperation}), RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))), _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18751, status.location()); } TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto dropCollectionOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "c" << "ns" << "test.t" << "o" << BSON("drop" << "t")), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override { called = true; } mutable bool called; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({dropCollectionOperation, commonOperation}), rollbackSource, _coordinator, noSleep)); ASSERT_TRUE(rollbackSource.called); } BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list ops) { BSONObjBuilder entry; entry << "ts" << ts << "h" << 1LL << "op" << "c" << "ns" << "admin"; { BSONObjBuilder cmd(entry.subobjStart("o")); BSONArrayBuilder subops(entry.subarrayStart("applyOps")); for (const auto& op : ops) { subops << op; } } return entry.obj(); } OpTime getOpTimeFromOplogEntry(const BSONObj& entry) { const BSONElement tsElement = entry["ts"]; const BSONElement termElement = entry["t"]; const BSONElement hashElement = entry["h"]; ASSERT_EQUALS(bsonTimestamp, tsElement.type()) << entry; ASSERT_TRUE(hashElement.isNumber()) << entry; ASSERT_TRUE(termElement.eoo() || termElement.isNumber()) << entry; long long term = hashElement.numberLong(); if (!termElement.eoo()) { term = termElement.numberLong(); } return OpTime(tsElement.timestamp(), term); } TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { createOplog(_txn.get()); { AutoGetOrCreateDb autoDb(_txn.get(), "test", MODE_X); mongo::WriteUnitOfWork wuow(_txn.get()); auto coll = autoDb.getDb()->getCollection("test.t"); if (!coll) { coll = autoDb.getDb()->createCollection(_txn.get(), "test.t"); } ASSERT(coll); OpDebug* const nullOpDebug = nullptr; ASSERT_OK( coll->insertDocument(_txn.get(), BSON("_id" << 1 << "v" << 2), nullOpDebug, false)); ASSERT_OK( coll->insertDocument(_txn.get(), BSON("_id" << 2 << "v" << 4), nullOpDebug, false)); ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 4), nullOpDebug, false)); wuow.commit(); } const auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); const auto applyOpsOperation = std::make_pair(makeApplyOpsOplogEntry(Timestamp(Seconds(2), 0), {BSON("op" << "u" << "ns" << "test.t" << "o2" << BSON("_id" << 1) << "o" << BSON("_id" << 1 << "v" << 2)), BSON("op" << "u" << "ns" << "test.t" << "o2" << BSON("_id" << 2) << "o" << BSON("_id" << 2 << "v" << 4)), BSON("op" << "d" << "ns" << "test.t" << "o" << BSON("_id" << 3)), BSON("op" << "i" << "ns" << "test.t" << "o" << BSON("_id" << 4))}), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)) {} BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override { int numFields = 0; for (const auto element : filter) { ++numFields; ASSERT_EQUALS("_id", element.fieldNameStringData()) << filter; } ASSERT_EQUALS(1, numFields) << filter; searchedIds.insert(filter.firstElement().numberInt()); switch (filter.firstElement().numberInt()) { case 1: return BSON("_id" << 1 << "v" << 1); case 2: return BSON("_id" << 2 << "v" << 3); case 3: return BSON("_id" << 3 << "v" << 5); case 4: return {}; } FAIL("Unexpected findOne request") << filter; return {}; // Unreachable; why doesn't compiler know? } mutable std::multiset searchedIds; } rollbackSource(std::unique_ptr(new OplogInterfaceMock({commonOperation}))); _createCollection(_txn.get(), "test.t", CollectionOptions()); ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({applyOpsOperation, commonOperation}), rollbackSource, _coordinator, noSleep)); ASSERT_EQUALS(4U, rollbackSource.searchedIds.size()); ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1)); ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2)); ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(3)); ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(4)); AutoGetCollectionForRead acr(_txn.get(), "test.t"); BSONObj result; ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 1), result)); ASSERT_EQUALS(1, result["v"].numberInt()) << result; ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 2), result)); ASSERT_EQUALS(3, result["v"].numberInt()) << result; ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 3), result)); ASSERT_EQUALS(5, result["v"].numberInt()) << result; ASSERT_FALSE(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 4), result)) << result; } TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto createCollectionOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "c" << "ns" << "test.t" << "o" << BSON("create" << "t")), RecordId(2)); RollbackSourceMock rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({createCollectionOperation, commonOperation}), rollbackSource, _coordinator, noSleep)); { Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S); auto db = dbHolder().get(_txn.get(), "test"); ASSERT_TRUE(db); ASSERT_FALSE(db->getCollection("test.t")); } } TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto collectionModificationOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "c" << "ns" << "test.t" << "o" << BSON("collMod" << "t" << "noPadding" << false)), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)), called(false) {} StatusWith getCollectionInfo(const NamespaceString& nss) const { called = true; return RollbackSourceMock::getCollectionInfo(nss); } mutable bool called; }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); startCapturingLogMessages(); ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, _coordinator, noSleep)); stopCapturingLogMessages(); ASSERT_TRUE(rollbackSource.called); for (const auto& message : getCapturedLogMessages()) { ASSERT_TRUE(message.find("ignoring op with no _id during rollback. ns: test.t") == std::string::npos); } } TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOptions) { createOplog(_txn.get()); auto commonOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); auto collectionModificationOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" << "c" << "ns" << "test.t" << "o" << BSON("collMod" << "t" << "noPadding" << false)), RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr oplog) : RollbackSourceMock(std::move(oplog)) {} StatusWith getCollectionInfo(const NamespaceString& nss) const { return BSON("name" << nss.ns() << "options" << 12345); } }; RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))); _createCollection(_txn.get(), "test.t", CollectionOptions()); auto status = syncRollback(_txn.get(), OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18753, status.location()); } } // namespace