/**
* Copyright 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include
#include
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface.h"
#include "mongo/db/repl/oplog_interface_mock.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/rollback_source.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
namespace {
using namespace mongo;
using namespace mongo::repl;
const auto kIndexVersion = IndexDescriptor::IndexVersion::kV2;
const OplogInterfaceMock::Operations kEmptyMockOperations;
ReplSettings createReplSettings() {
ReplSettings settings;
settings.setOplogSizeBytes(5 * 1024 * 1024);
settings.setReplSetString("mySet/node1:12345");
return settings;
}
class ReplicationCoordinatorRollbackMock : public ReplicationCoordinatorMock {
public:
ReplicationCoordinatorRollbackMock();
void resetLastOpTimesFromOplog(OperationContext* txn) override;
};
ReplicationCoordinatorRollbackMock::ReplicationCoordinatorRollbackMock()
: ReplicationCoordinatorMock(createReplSettings()) {}
void ReplicationCoordinatorRollbackMock::resetLastOpTimesFromOplog(OperationContext* txn) {}
class RollbackSourceMock : public RollbackSource {
public:
RollbackSourceMock(std::unique_ptr oplog);
int getRollbackId() const override;
const OplogInterface& getOplog() const override;
BSONObj getLastOperation() const override;
BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override;
void copyCollectionFromRemote(OperationContext* txn, const NamespaceString& nss) const override;
StatusWith getCollectionInfo(const NamespaceString& nss) const override;
private:
std::unique_ptr _oplog;
};
RollbackSourceMock::RollbackSourceMock(std::unique_ptr oplog)
: _oplog(std::move(oplog)) {}
const OplogInterface& RollbackSourceMock::getOplog() const {
return *_oplog;
}
int RollbackSourceMock::getRollbackId() const {
return 0;
}
BSONObj RollbackSourceMock::getLastOperation() const {
auto iter = _oplog->makeIterator();
auto result = iter->next();
ASSERT_OK(result.getStatus());
return result.getValue().first;
}
BSONObj RollbackSourceMock::findOne(const NamespaceString& nss, const BSONObj& filter) const {
return BSONObj();
}
void RollbackSourceMock::copyCollectionFromRemote(OperationContext* txn,
const NamespaceString& nss) const {}
StatusWith RollbackSourceMock::getCollectionInfo(const NamespaceString& nss) const {
return BSON("name" << nss.ns() << "options" << BSONObj());
}
class RSRollbackTest : public ServiceContextMongoDTest {
protected:
ServiceContext::UniqueOperationContext _txn;
// Owned by service context
ReplicationCoordinator* _coordinator;
private:
void setUp() override;
void tearDown() override;
};
void RSRollbackTest::setUp() {
ServiceContextMongoDTest::setUp();
_txn = cc().makeOperationContext();
_coordinator = new ReplicationCoordinatorRollbackMock();
auto serviceContext = getServiceContext();
ReplicationCoordinator::set(serviceContext,
std::unique_ptr(_coordinator));
StorageInterface::set(serviceContext, stdx::make_unique());
setOplogCollectionName();
repl::StorageInterface::get(_txn.get())->setAppliedThrough(_txn.get(), OpTime{});
repl::StorageInterface::get(_txn.get())->setMinValid(_txn.get(), OpTime{});
}
void RSRollbackTest::tearDown() {
_txn.reset();
ServiceContextMongoDTest::tearDown();
setGlobalReplicationCoordinator(nullptr);
}
void noSleep(Seconds seconds) {}
TEST_F(RSRollbackTest, InconsistentMinValid) {
repl::StorageInterface::get(_txn.get())
->setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0));
repl::StorageInterface::get(_txn.get())
->setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0));
auto status = syncRollback(_txn.get(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr(
new OplogInterfaceMock(kEmptyMockOperations))),
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
}
TEST_F(RSRollbackTest, SetFollowerModeFailed) {
class ReplicationCoordinatorSetFollowerModeMock : public ReplicationCoordinatorMock {
public:
ReplicationCoordinatorSetFollowerModeMock()
: ReplicationCoordinatorMock(createReplSettings()) {}
MemberState getMemberState() const override {
return MemberState::RS_DOWN;
}
bool setFollowerMode(const MemberState& newState) override {
return false;
}
};
_coordinator = new ReplicationCoordinatorSetFollowerModeMock();
setGlobalReplicationCoordinator(_coordinator);
ASSERT_EQUALS(ErrorCodes::OperationFailed,
syncRollback(_txn.get(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr(
new OplogInterfaceMock(kEmptyMockOperations))),
_coordinator,
noSleep)
.code());
}
TEST_F(RSRollbackTest, OplogStartMissing) {
OpTime ts(Timestamp(Seconds(1), 0), 0);
auto operation =
std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
syncRollback(_txn.get(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({
operation,
}))),
_coordinator,
noSleep)
.code());
}
TEST_F(RSRollbackTest, NoRemoteOpLog) {
OpTime ts(Timestamp(Seconds(1), 0), 0);
auto operation =
std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({operation}),
RollbackSourceMock(std::unique_ptr(
new OplogInterfaceMock(kEmptyMockOperations))),
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
}
TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) {
OpTime ts(Timestamp(Seconds(1), 0), 0);
auto operation =
std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)) {}
int getRollbackId() const override {
uassert(ErrorCodes::UnknownError, "getRollbackId() failed", false);
}
};
ASSERT_THROWS_CODE(syncRollback(_txn.get(),
OplogInterfaceMock({operation}),
RollbackSourceLocal(std::unique_ptr(
new OplogInterfaceMock(kEmptyMockOperations))),
_coordinator,
noSleep),
UserException,
ErrorCodes::UnknownError);
}
TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) {
createOplog(_txn.get());
OpTime ts(Timestamp(Seconds(1), 0), 1);
auto operation =
std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId(1));
ASSERT_OK(
syncRollback(_txn.get(),
OplogInterfaceMock({operation}),
RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({
operation,
}))),
_coordinator,
noSleep));
}
/**
* Create test collection.
* Returns collection.
*/
Collection* _createCollection(OperationContext* txn,
const NamespaceString& nss,
const CollectionOptions& options) {
Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X);
mongo::WriteUnitOfWork wuow(txn);
auto db = dbHolder().openDb(txn, nss.db());
ASSERT_TRUE(db);
db->dropCollection(txn, nss.ns());
auto coll = db->createCollection(txn, nss.ns(), options);
ASSERT_TRUE(coll);
wuow.commit();
return coll;
}
Collection* _createCollection(OperationContext* txn,
const std::string& nss,
const CollectionOptions& options) {
return _createCollection(txn, NamespaceString(nss), options);
}
/**
* Test function to roll back a delete operation.
* Returns number of records in collection after rolling back delete operation.
* If collection does not exist after rolling back, returns -1.
*/
int _testRollbackDelete(OperationContext* txn,
ReplicationCoordinator* coordinator,
const BSONObj& documentAtSource) {
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto deleteOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "d"
<< "ns"
<< "test.t"
<< "o"
<< BSON("_id" << 0)),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(const BSONObj& documentAtSource, std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)),
called(false),
_documentAtSource(documentAtSource) {}
BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const {
called = true;
return _documentAtSource;
}
mutable bool called;
private:
BSONObj _documentAtSource;
};
RollbackSourceLocal rollbackSource(documentAtSource,
std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
ASSERT_OK(syncRollback(txn,
OplogInterfaceMock({deleteOperation, commonOperation}),
rollbackSource,
coordinator,
noSleep));
ASSERT_TRUE(rollbackSource.called);
Lock::DBLock dbLock(txn->lockState(), "test", MODE_S);
Lock::CollectionLock collLock(txn->lockState(), "test.t", MODE_S);
auto db = dbHolder().get(txn, "test");
ASSERT_TRUE(db);
auto collection = db->getCollection("test.t");
if (!collection) {
return -1;
}
return collection->getRecordStore()->numRecords(txn);
}
TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionDoesNotExist) {
createOplog(_txn.get());
ASSERT_EQUALS(-1, _testRollbackDelete(_txn.get(), _coordinator, BSONObj()));
}
TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionExistsNonCapped) {
createOplog(_txn.get());
_createCollection(_txn.get(), "test.t", CollectionOptions());
_testRollbackDelete(_txn.get(), _coordinator, BSONObj());
ASSERT_EQUALS(0, _testRollbackDelete(_txn.get(), _coordinator, BSONObj()));
}
TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionExistsCapped) {
createOplog(_txn.get());
CollectionOptions options;
options.capped = true;
_createCollection(_txn.get(), "test.t", options);
ASSERT_EQUALS(0, _testRollbackDelete(_txn.get(), _coordinator, BSONObj()));
}
TEST_F(RSRollbackTest, RollbackDeleteRestoreDocument) {
createOplog(_txn.get());
_createCollection(_txn.get(), "test.t", CollectionOptions());
BSONObj doc = BSON("_id" << 0 << "a" << 1);
_testRollbackDelete(_txn.get(), _coordinator, doc);
ASSERT_EQUALS(1, _testRollbackDelete(_txn.get(), _coordinator, doc));
}
TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto insertDocumentOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "i"
<< "ns"
<< "test.t"
<< "o"
<< BSON("a" << 1)),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const {
called = true;
return BSONObj();
}
mutable bool called;
private:
BSONObj _documentAtSource;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
ASSERT_EQUALS(1, countLogLinesContaining("cannot rollback op with no _id. ns: test.t,"));
ASSERT_FALSE(rollbackSource.called);
}
TEST_F(RSRollbackTest, RollbackCreateIndexCommand) {
createOplog(_txn.get());
auto collection = _createCollection(_txn.get(), "test.t", CollectionOptions());
auto indexSpec = BSON("ns"
<< "test.t"
<< "key"
<< BSON("a" << 1)
<< "name"
<< "a_1"
<< "v"
<< static_cast(kIndexVersion));
{
Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X);
MultiIndexBlock indexer(_txn.get(), collection);
ASSERT_OK(indexer.init(indexSpec).getStatus());
WriteUnitOfWork wunit(_txn.get());
indexer.commit();
wunit.commit();
auto indexCatalog = collection->getIndexCatalog();
ASSERT(indexCatalog);
ASSERT_EQUALS(2, indexCatalog->numIndexesReady(_txn.get()));
}
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto insertDocumentOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "i"
<< "ns"
<< "test.system.indexes"
<< "o"
<< indexSpec),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
void copyCollectionFromRemote(OperationContext* txn,
const NamespaceString& nss) const override {
called = true;
}
mutable bool called;
private:
BSONObj _documentAtSource;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
// Repeat index creation operation and confirm that rollback attempts to drop index just once.
// This can happen when an index is re-created with different options.
startCapturingLogMessages();
ASSERT_OK(syncRollback(
_txn.get(),
OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep));
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("rollback drop index: collection: test.t. index: a_1"));
ASSERT_FALSE(rollbackSource.called);
{
Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S);
auto indexCatalog = collection->getIndexCatalog();
ASSERT(indexCatalog);
ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_txn.get()));
}
}
TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) {
createOplog(_txn.get());
auto collection = _createCollection(_txn.get(), "test.t", CollectionOptions());
auto indexSpec = BSON("ns"
<< "test.t"
<< "key"
<< BSON("a" << 1)
<< "name"
<< "a_1");
// Skip index creation to trigger warning during rollback.
{
Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S);
auto indexCatalog = collection->getIndexCatalog();
ASSERT(indexCatalog);
ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_txn.get()));
}
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto insertDocumentOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "i"
<< "ns"
<< "test.system.indexes"
<< "o"
<< indexSpec),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
void copyCollectionFromRemote(OperationContext* txn,
const NamespaceString& nss) const override {
called = true;
}
mutable bool called;
private:
BSONObj _documentAtSource;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
startCapturingLogMessages();
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep));
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("rollback drop index: collection: test.t. index: a_1"));
ASSERT_EQUALS(1, countLogLinesContaining("rollback failed to drop index a_1 in test.t"));
ASSERT_FALSE(rollbackSource.called);
{
Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S);
auto indexCatalog = collection->getIndexCatalog();
ASSERT(indexCatalog);
ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_txn.get()));
}
}
TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto insertDocumentOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "i"
<< "ns"
<< "test.system.indexes"
<< "o"
<< BSON("key" << BSON("a" << 1) << "name"
<< "a_1")),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
void copyCollectionFromRemote(OperationContext* txn,
const NamespaceString& nss) const override {
called = true;
}
mutable bool called;
private:
BSONObj _documentAtSource;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
ASSERT_EQUALS(
1, countLogLinesContaining("Missing collection namespace in system.indexes operation,"));
ASSERT_FALSE(rollbackSource.called);
}
TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto insertDocumentOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "i"
<< "ns"
<< "test.system.indexes"
<< "o"
<< BSON("ns"
<< "test."
<< "key"
<< BSON("a" << 1)
<< "name"
<< "a_1")),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
void copyCollectionFromRemote(OperationContext* txn,
const NamespaceString& nss) const override {
called = true;
}
mutable bool called;
private:
BSONObj _documentAtSource;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
ASSERT_EQUALS(
1, countLogLinesContaining("Invalid collection namespace in system.indexes operation,"));
ASSERT_FALSE(rollbackSource.called);
}
TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto insertDocumentOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "i"
<< "ns"
<< "test.system.indexes"
<< "o"
<< BSON("ns"
<< "test.t"
<< "key"
<< BSON("a" << 1))),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
void copyCollectionFromRemote(OperationContext* txn,
const NamespaceString& nss) const override {
called = true;
}
mutable bool called;
private:
BSONObj _documentAtSource;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
startCapturingLogMessages();
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
ASSERT_EQUALS(1, countLogLinesContaining("Missing index name in system.indexes operation,"));
ASSERT_FALSE(rollbackSource.called);
}
TEST_F(RSRollbackTest, RollbackUnknownCommand) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto unknownCommandOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "c"
<< "ns"
<< "test.t"
<< "o"
<< BSON("unknown_command"
<< "t")),
RecordId(2));
{
Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X);
mongo::WriteUnitOfWork wuow(_txn.get());
auto db = dbHolder().openDb(_txn.get(), "test");
ASSERT_TRUE(db);
ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t"));
wuow.commit();
}
auto status =
syncRollback(_txn.get(),
OplogInterfaceMock({unknownCommandOperation, commonOperation}),
RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
}))),
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18751, status.location());
}
TEST_F(RSRollbackTest, RollbackDropCollectionCommand) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto dropCollectionOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "c"
<< "ns"
<< "test.t"
<< "o"
<< BSON("drop"
<< "t")),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
void copyCollectionFromRemote(OperationContext* txn,
const NamespaceString& nss) const override {
called = true;
}
mutable bool called;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({dropCollectionOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep));
ASSERT_TRUE(rollbackSource.called);
}
BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list ops) {
BSONObjBuilder entry;
entry << "ts" << ts << "h" << 1LL << "op"
<< "c"
<< "ns"
<< "admin";
{
BSONObjBuilder cmd(entry.subobjStart("o"));
BSONArrayBuilder subops(entry.subarrayStart("applyOps"));
for (const auto& op : ops) {
subops << op;
}
}
return entry.obj();
}
OpTime getOpTimeFromOplogEntry(const BSONObj& entry) {
const BSONElement tsElement = entry["ts"];
const BSONElement termElement = entry["t"];
const BSONElement hashElement = entry["h"];
ASSERT_EQUALS(bsonTimestamp, tsElement.type()) << entry;
ASSERT_TRUE(hashElement.isNumber()) << entry;
ASSERT_TRUE(termElement.eoo() || termElement.isNumber()) << entry;
long long term = hashElement.numberLong();
if (!termElement.eoo()) {
term = termElement.numberLong();
}
return OpTime(tsElement.timestamp(), term);
}
TEST_F(RSRollbackTest, RollbackApplyOpsCommand) {
createOplog(_txn.get());
{
AutoGetOrCreateDb autoDb(_txn.get(), "test", MODE_X);
mongo::WriteUnitOfWork wuow(_txn.get());
auto coll = autoDb.getDb()->getCollection("test.t");
if (!coll) {
coll = autoDb.getDb()->createCollection(_txn.get(), "test.t");
}
ASSERT(coll);
OpDebug* const nullOpDebug = nullptr;
ASSERT_OK(
coll->insertDocument(_txn.get(), BSON("_id" << 1 << "v" << 2), nullOpDebug, false));
ASSERT_OK(
coll->insertDocument(_txn.get(), BSON("_id" << 2 << "v" << 4), nullOpDebug, false));
ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 4), nullOpDebug, false));
wuow.commit();
}
const auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
const auto applyOpsOperation =
std::make_pair(makeApplyOpsOplogEntry(Timestamp(Seconds(2), 0),
{BSON("op"
<< "u"
<< "ns"
<< "test.t"
<< "o2"
<< BSON("_id" << 1)
<< "o"
<< BSON("_id" << 1 << "v" << 2)),
BSON("op"
<< "u"
<< "ns"
<< "test.t"
<< "o2"
<< BSON("_id" << 2)
<< "o"
<< BSON("_id" << 2 << "v" << 4)),
BSON("op"
<< "d"
<< "ns"
<< "test.t"
<< "o"
<< BSON("_id" << 3)),
BSON("op"
<< "i"
<< "ns"
<< "test.t"
<< "o"
<< BSON("_id" << 4))}),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)) {}
BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override {
int numFields = 0;
for (const auto element : filter) {
++numFields;
ASSERT_EQUALS("_id", element.fieldNameStringData()) << filter;
}
ASSERT_EQUALS(1, numFields) << filter;
searchedIds.insert(filter.firstElement().numberInt());
switch (filter.firstElement().numberInt()) {
case 1:
return BSON("_id" << 1 << "v" << 1);
case 2:
return BSON("_id" << 2 << "v" << 3);
case 3:
return BSON("_id" << 3 << "v" << 5);
case 4:
return {};
}
FAIL("Unexpected findOne request") << filter;
return {}; // Unreachable; why doesn't compiler know?
}
mutable std::multiset searchedIds;
} rollbackSource(std::unique_ptr(new OplogInterfaceMock({commonOperation})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({applyOpsOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep));
ASSERT_EQUALS(4U, rollbackSource.searchedIds.size());
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1));
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2));
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(3));
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(4));
AutoGetCollectionForRead acr(_txn.get(), "test.t");
BSONObj result;
ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 1), result));
ASSERT_EQUALS(1, result["v"].numberInt()) << result;
ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 2), result));
ASSERT_EQUALS(3, result["v"].numberInt()) << result;
ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 3), result));
ASSERT_EQUALS(5, result["v"].numberInt()) << result;
ASSERT_FALSE(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 4), result))
<< result;
}
TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto createCollectionOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "c"
<< "ns"
<< "test.t"
<< "o"
<< BSON("create"
<< "t")),
RecordId(2));
RollbackSourceMock rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({createCollectionOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep));
{
Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S);
auto db = dbHolder().get(_txn.get(), "test");
ASSERT_TRUE(db);
ASSERT_FALSE(db->getCollection("test.t"));
}
}
TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto collectionModificationOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "c"
<< "ns"
<< "test.t"
<< "o"
<< BSON("collMod"
<< "t"
<< "noPadding"
<< false)),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)), called(false) {}
StatusWith getCollectionInfo(const NamespaceString& nss) const {
called = true;
return RollbackSourceMock::getCollectionInfo(nss);
}
mutable bool called;
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
startCapturingLogMessages();
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep));
stopCapturingLogMessages();
ASSERT_TRUE(rollbackSource.called);
for (const auto& message : getCapturedLogMessages()) {
ASSERT_TRUE(message.find("ignoring op with no _id during rollback. ns: test.t") ==
std::string::npos);
}
}
TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOptions) {
createOplog(_txn.get());
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
auto collectionModificationOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
<< "c"
<< "ns"
<< "test.t"
<< "o"
<< BSON("collMod"
<< "t"
<< "noPadding"
<< false)),
RecordId(2));
class RollbackSourceLocal : public RollbackSourceMock {
public:
RollbackSourceLocal(std::unique_ptr oplog)
: RollbackSourceMock(std::move(oplog)) {}
StatusWith getCollectionInfo(const NamespaceString& nss) const {
return BSON("name" << nss.ns() << "options" << 12345);
}
};
RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({
commonOperation,
})));
_createCollection(_txn.get(), "test.t", CollectionOptions());
auto status =
syncRollback(_txn.get(),
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
_coordinator,
noSleep);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18753, status.location());
}
} // namespace