/** * Copyright 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include #include "mongo/client/connection_pool.h" #include "mongo/client/dbclientinterface.h" #include "mongo/client/dbclientmockcursor.h" #include "mongo/db/client.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/oplog_interface_mock.h" #include "mongo/db/repl/oplog_interface_remote.h" #include "mongo/db/repl/roll_back_local_operations.h" #include "mongo/unittest/unittest.h" namespace { using namespace mongo; using namespace mongo::repl; BSONObj makeOp(long long seconds, long long hash) { auto uuid = unittest::assertGet(UUID::parse("b4c66a44-c1ca-4d86-8d25-12e82fa2de5b")); return BSON("ts" << Timestamp(seconds, seconds) << "h" << hash << "t" << seconds << "op" << "n" << "o" << BSONObj() << "ns" << "roll_back_local_operations.test" << "ui" << uuid); } int recordId = 0; OplogInterfaceMock::Operation makeOpAndRecordId(long long seconds, long long hash) { return std::make_pair(makeOp(seconds, hash), RecordId(++recordId)); } TEST(RollBackLocalOperationsTest, InvalidLocalOplogIterator) { class InvalidOplogInterface : public OplogInterface { public: std::string toString() const override { return ""; } std::unique_ptr makeIterator() const override { return std::unique_ptr(); } HostAndPort hostAndPort() const override { return {}; } } invalidOplog; ASSERT_THROWS_CODE( RollBackLocalOperations(invalidOplog, [](const BSONObj&) { return Status::OK(); }), AssertionException, ErrorCodes::BadValue); } TEST(RollBackLocalOperationsTest, InvalidRollbackOperationFunction) { ASSERT_THROWS_CODE(RollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1, 0)}), RollBackLocalOperations::RollbackOperationFn()), AssertionException, ErrorCodes::BadValue); } TEST(RollBackLocalOperationsTest, EmptyLocalOplog) { OplogInterfaceMock localOplog; RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); auto result = finder.onRemoteOperation(makeOp(1, 0)); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, result.getStatus().code()); } TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(5, 1), makeOpAndRecordId(4, 1), makeOpAndRecordId(3, 1), makeOpAndRecordId(2, 1), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto i = localOperations.cbegin(); auto rollbackOperation = [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(i->first, operation); i++; return Status::OK(); }; RollBackLocalOperations finder(localOplog, rollbackOperation); auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().getOpTime()); ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); i++; ASSERT_TRUE(i == localOperations.cend()); } TEST(RollBackLocalOperationsTest, RollbackOperationFailed) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(2, 1), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto rollbackOperation = [&](const BSONObj& operation) { return Status(ErrorCodes::OperationFailed, ""); }; RollBackLocalOperations finder(localOplog, rollbackOperation); auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); } TEST(RollBackLocalOperationsTest, EndOfLocalOplog) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(2, 1), }); OplogInterfaceMock localOplog(localOperations); RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); } TEST(RollBackLocalOperationsTest, SkipRemoteOperations) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(5, 1), makeOpAndRecordId(4, 1), makeOpAndRecordId(2, 1), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto i = localOperations.cbegin(); auto rollbackOperation = [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(i->first, operation); i++; return Status::OK(); }; RollBackLocalOperations finder(localOplog, rollbackOperation); { auto result = finder.onRemoteOperation(makeOp(6, 1)); ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); ASSERT_TRUE(i == localOperations.cbegin()); } { auto result = finder.onRemoteOperation(makeOp(3, 1)); ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2); } auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().getOpTime()); ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); i++; ASSERT_TRUE(i == localOperations.cend()); } TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(1, 5), makeOpAndRecordId(1, 3), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto i = localOperations.cbegin(); auto rollbackOperation = [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(i->first, operation); i++; return Status::OK(); }; RollBackLocalOperations finder(localOplog, rollbackOperation); { auto result = finder.onRemoteOperation(makeOp(1, 4)); ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 1); } { auto result = finder.onRemoteOperation(makeOp(1, 2)); ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2); } auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().getOpTime()); ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); i++; ASSERT_TRUE(i == localOperations.cend()); } TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesRollbackOperationFailed) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(1, 3), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto rollbackOperation = [&](const BSONObj& operation) { return Status(ErrorCodes::OperationFailed, ""); }; RollBackLocalOperations finder(localOplog, rollbackOperation); auto result = finder.onRemoteOperation(makeOp(1, 2)); ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); } TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesEndOfLocalOplog) { OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(1, 3), }); OplogInterfaceMock localOplog(localOperations); RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); auto result = finder.onRemoteOperation(makeOp(1, 2)); ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); } TEST(SyncRollBackLocalOperationsTest, OplogStartMissing) { ASSERT_EQUALS(ErrorCodes::OplogStartMissing, syncRollBackLocalOperations(OplogInterfaceMock(), OplogInterfaceMock({makeOpAndRecordId(1, 0)}), [](const BSONObj&) { return Status::OK(); }) .getStatus() .code()); } TEST(SyncRollBackLocalOperationsTest, RemoteOplogMissing) { ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, syncRollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1, 0)}), OplogInterfaceMock(), [](const BSONObj&) { return Status::OK(); }) .getStatus() .code()); } TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ makeOpAndRecordId(3, 1), makeOpAndRecordId(2, 1), commonOperation, }); auto i = localOperations.cbegin(); auto result = syncRollBackLocalOperations(OplogInterfaceMock(localOperations), OplogInterfaceMock({commonOperation}), [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(i->first, operation); i++; return Status::OK(); }); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().getOpTime()); ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); i++; ASSERT_TRUE(i == localOperations.cend()); } TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) { auto commonOperation = makeOpAndRecordId(1, 1); auto remoteOperation = makeOpAndRecordId(2, 1); auto result = syncRollBackLocalOperations(OplogInterfaceMock({commonOperation}), OplogInterfaceMock({remoteOperation, commonOperation}), [&](const BSONObj& operation) { FAIL("should not reach here"); return Status::OK(); }); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().getOpTime()); ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); } TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) { auto commonOperation = makeOpAndRecordId(1, 1); auto localOperation = makeOpAndRecordId(1, 2); auto remoteOperation = makeOpAndRecordId(1, 3); bool called = false; auto result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation, commonOperation}), OplogInterfaceMock({remoteOperation, commonOperation}), [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(localOperation.first, operation); called = true; return Status::OK(); }); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().getOpTime()); ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); ASSERT_TRUE(called); } TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfLocalOplog) { auto commonOperation = makeOpAndRecordId(1, 1); auto localOperation = makeOpAndRecordId(1, 2); auto remoteOperation = makeOpAndRecordId(1, 3); bool called = false; auto result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation}), OplogInterfaceMock({remoteOperation, commonOperation}), [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(localOperation.first, operation); called = true; return Status::OK(); }); ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); ASSERT_STRING_CONTAINS(result.getStatus().reason(), "reached beginning of local oplog"); ASSERT_TRUE(called); } TEST(SyncRollBackLocalOperationsTest, SameTimestampRollbackOperationFailed) { auto commonOperation = makeOpAndRecordId(1, 1); auto localOperation = makeOpAndRecordId(1, 2); auto remoteOperation = makeOpAndRecordId(1, 3); auto result = syncRollBackLocalOperations( OplogInterfaceMock({localOperation, commonOperation}), OplogInterfaceMock({remoteOperation, commonOperation}), [&](const BSONObj& operation) { return Status(ErrorCodes::OperationFailed, ""); }); ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); } TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfRemoteOplog) { auto commonOperation = makeOpAndRecordId(1, 1); auto localOperation = makeOpAndRecordId(1, 2); auto remoteOperation = makeOpAndRecordId(1, 3); bool called = false; auto result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation, commonOperation}), OplogInterfaceMock({remoteOperation}), [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(localOperation.first, operation); called = true; return Status::OK(); }); ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); ASSERT_STRING_CONTAINS(result.getStatus().reason(), "reached beginning of remote oplog"); ASSERT_TRUE(called); } TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfLocalOplog) { auto commonOperation = makeOpAndRecordId(1, 1); auto localOperation = makeOpAndRecordId(3, 1); auto remoteOperation = makeOpAndRecordId(2, 1); bool called = false; auto result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation}), OplogInterfaceMock({remoteOperation, commonOperation}), [&](const BSONObj& operation) { ASSERT_BSONOBJ_EQ(localOperation.first, operation); called = true; return Status::OK(); }); ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); ASSERT_STRING_CONTAINS(result.getStatus().reason(), "reached beginning of local oplog"); ASSERT_TRUE(called); } TEST(SyncRollBackLocalOperationsTest, DifferentTimestampRollbackOperationFailed) { auto localOperation = makeOpAndRecordId(3, 1); auto remoteOperation = makeOpAndRecordId(2, 1); auto result = syncRollBackLocalOperations( OplogInterfaceMock({localOperation}), OplogInterfaceMock({remoteOperation}), [&](const BSONObj& operation) { return Status(ErrorCodes::OperationFailed, ""); }); ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); } TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfRemoteOplog) { auto commonOperation = makeOpAndRecordId(1, 1); auto localOperation = makeOpAndRecordId(2, 1); auto remoteOperation = makeOpAndRecordId(3, 1); auto result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation, commonOperation}), OplogInterfaceMock({remoteOperation}), [&](const BSONObj& operation) { FAIL("Should not reach here"); return Status::OK(); }); ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); ASSERT_STRING_CONTAINS(result.getStatus().reason(), "reached beginning of remote oplog"); } class DBClientConnectionForTest : public DBClientConnection { public: DBClientConnectionForTest(int numInitFailures) : _initFailuresLeft(numInitFailures) {} using DBClientConnection::query; std::unique_ptr query(const std::string& ns, Query query, int nToReturn, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, int batchSize) override { if (_initFailuresLeft > 0) { _initFailuresLeft--; unittest::log() << "Throwing DBException on DBClientCursorForTest::query(). Failures left: " << _initFailuresLeft; uasserted(50852, "Simulated network error"); MONGO_UNREACHABLE; } unittest::log() << "Returning success on DBClientCursorForTest::query()"; BSONArrayBuilder builder; builder.append(makeOp(1, 1)); builder.append(makeOp(2, 2)); return std::make_unique(this, builder.arr()); } private: int _initFailuresLeft; }; void checkRemoteIterator(int numNetworkFailures, bool expectedToSucceed) { DBClientConnectionForTest conn(numNetworkFailures); auto getConnection = [&]() -> DBClientBase* { return &conn; }; auto localOperation = makeOpAndRecordId(1, 1); OplogInterfaceRemote remoteOplogMock( HostAndPort("229w43rd", 10036), getConnection, "somecollection", 0); auto result = Status::OK(); try { result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation}), remoteOplogMock, [&](const BSONObj&) { return Status::OK(); }) .getStatus(); } catch (...) { // For the failure scenario. ASSERT_FALSE(expectedToSucceed); return; } // For the success scenario. ASSERT_TRUE(expectedToSucceed); ASSERT_OK(result); } TEST(SyncRollBackLocalOperationsTest, RemoteOplogMakeIteratorSucceedsWithNoNetworkFailures) { checkRemoteIterator(0, true); } TEST(SyncRollBackLocalOperationsTest, RemoteOplogMakeIteratorSucceedsWithOneNetworkFailure) { checkRemoteIterator(1, true); } TEST(SyncRollBackLocalOperationsTest, RemoteOplogMakeIteratorSucceedsWithTwoNetworkFailures) { checkRemoteIterator(2, true); } TEST(SyncRollBackLocalOperationsTest, RemoteOplogMakeIteratorFailsWithTooManyNetworkFailures) { checkRemoteIterator(3, false); } } // namespace