/** * Copyright 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/applier.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/platform/compiler.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/unittest/barrier.h" namespace { using namespace mongo; using namespace mongo::repl; using Operations = Applier::Operations; class ApplierTest : public ReplicationExecutorTest { public: Applier* getApplier() const; protected: void setUp() override; void tearDown() override; /** * Test function to check behavior when we fail to apply one of the operations. */ void _testApplyOperationFailed(size_t opIndex, stdx::function fail); std::unique_ptr _applier; std::unique_ptr _barrier; }; void ApplierTest::setUp() { ReplicationExecutorTest::setUp(); launchExecutorThread(); auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; _applier.reset(new Applier(&getExecutor(), {BSON("ts" << Timestamp(Seconds(123), 0))}, apply, [this](const StatusWith&, const Operations&) { if (_barrier.get()) { _barrier->countDownAndWait(); } })); } void ApplierTest::tearDown() { ReplicationExecutorTest::tearDown(); _applier.reset(); _barrier.reset(); } Applier* ApplierTest::getApplier() const { return _applier.get(); } TEST_F(ApplierTest, InvalidConstruction) { const Operations operations{BSON("ts" << Timestamp(Seconds(123), 0))}; auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; auto callback = [](const StatusWith& status, const Operations& operations) { }; // Null executor. ASSERT_THROWS_CODE( Applier(nullptr, operations, apply, callback), UserException, ErrorCodes::BadValue); // Empty list of operations. ASSERT_THROWS_CODE( Applier(&getExecutor(), {}, apply, callback), UserException, ErrorCodes::BadValue); // Last operation missing timestamp field. ASSERT_THROWS_CODE( Applier(&getExecutor(), {BSONObj()}, apply, callback), UserException, ErrorCodes::FailedToParse); // "ts" field in last operation not a timestamp. ASSERT_THROWS_CODE( Applier(&getExecutor(), {BSON("ts" << 99)}, apply, callback), UserException, ErrorCodes::TypeMismatch); // Invalid apply operation function. ASSERT_THROWS_CODE( Applier(&getExecutor(), operations, Applier::ApplyOperationFn(), callback), UserException, ErrorCodes::BadValue); // Invalid callback function. ASSERT_THROWS_CODE( Applier(&getExecutor(), operations, apply, Applier::CallbackFn()), UserException, ErrorCodes::BadValue); } TEST_F(ApplierTest, GetDiagnosticString) { ASSERT_FALSE(getApplier()->getDiagnosticString().empty()); } TEST_F(ApplierTest, IsActiveAfterStart) { // Use a barrier to ensure that the callback blocks while // we check isActive(). _barrier.reset(new unittest::Barrier(2U)); ASSERT_FALSE(getApplier()->isActive()); ASSERT_OK(getApplier()->start()); ASSERT_TRUE(getApplier()->isActive()); _barrier->countDownAndWait(); } TEST_F(ApplierTest, StartWhenActive) { // Use a barrier to ensure that the callback blocks while // we check isActive(). _barrier.reset(new unittest::Barrier(2U)); ASSERT_OK(getApplier()->start()); ASSERT_TRUE(getApplier()->isActive()); ASSERT_NOT_OK(getApplier()->start()); ASSERT_TRUE(getApplier()->isActive()); _barrier->countDownAndWait(); } TEST_F(ApplierTest, CancelWithoutStart) { ASSERT_FALSE(getApplier()->isActive()); getApplier()->cancel(); ASSERT_FALSE(getApplier()->isActive()); } TEST_F(ApplierTest, WaitWithoutStart) { ASSERT_FALSE(getApplier()->isActive()); getApplier()->wait(); ASSERT_FALSE(getApplier()->isActive()); } TEST_F(ApplierTest, ShutdownBeforeStart) { getExecutor().shutdown(); ASSERT_NOT_OK(getApplier()->start()); ASSERT_FALSE(getApplier()->isActive()); } TEST_F(ApplierTest, CancelBeforeStartingDBWork) { // Schedule a blocking DB work item before the applier to allow us to cancel the applier // work item before the executor runs it. unittest::Barrier barrier(2U); using CallbackData = ReplicationExecutor::CallbackArgs; getExecutor().scheduleDBWork([&](const CallbackData& cbd) { barrier.countDownAndWait(); // generation 0 }); const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0)); stdx::mutex mutex; StatusWith result = getDetectableErrorStatus(); Applier::Operations operations; _applier.reset(new Applier( &getExecutor(), {operation}, [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); result = theResult; operations = theOperations; })); getApplier()->start(); getApplier()->cancel(); ASSERT_TRUE(getApplier()->isActive()); barrier.countDownAndWait(); // generation 0 getApplier()->wait(); ASSERT_FALSE(getApplier()->isActive()); stdx::lock_guard lock(mutex); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, result.getStatus().code()); ASSERT_EQUALS(1U, operations.size()); ASSERT_EQUALS(operation, operations.front()); } TEST_F(ApplierTest, DestroyBeforeStartingDBWork) { // Schedule a blocking DB work item before the applier to allow us to destroy the applier // before the executor runs the work item. unittest::Barrier barrier(2U); using CallbackData = ReplicationExecutor::CallbackArgs; getExecutor().scheduleDBWork([&](const CallbackData& cbd) { barrier.countDownAndWait(); // generation 0 // Give the main thread a head start in invoking the applier destructor. sleepmillis(1); }); const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0)); stdx::mutex mutex; StatusWith result = getDetectableErrorStatus(); Applier::Operations operations; _applier.reset(new Applier( &getExecutor(), {operation}, [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); result = theResult; operations = theOperations; })); getApplier()->start(); ASSERT_TRUE(getApplier()->isActive()); barrier.countDownAndWait(); // generation 0 // It is possible the executor may have invoked the callback before we // destroy the applier. Therefore both OK and CallbackCanceled are acceptable // statuses. _applier.reset(); stdx::lock_guard lock(mutex); if (result.isOK()) { ASSERT_TRUE(operations.empty()); } else { ASSERT_EQUALS(ErrorCodes::CallbackCanceled, result.getStatus().code()); ASSERT_EQUALS(1U, operations.size()); ASSERT_EQUALS(operation, operations.front()); } } TEST_F(ApplierTest, WaitForCompletion) { const Timestamp timestamp(Seconds(123), 0); stdx::mutex mutex; StatusWith result = getDetectableErrorStatus(); Applier::Operations operations; _applier.reset(new Applier( &getExecutor(), {BSON("ts" << timestamp)}, [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); result = theResult; operations = theOperations; })); getApplier()->start(); getApplier()->wait(); ASSERT_FALSE(getApplier()->isActive()); stdx::lock_guard lock(mutex); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(timestamp, result.getValue()); ASSERT_TRUE(operations.empty()); } TEST_F(ApplierTest, DestroyShouldBlockUntilInactive) { const Timestamp timestamp(Seconds(123), 0); unittest::Barrier barrier(2U); stdx::mutex mutex; StatusWith result = getDetectableErrorStatus(); Applier::Operations operations; _applier.reset(new Applier( &getExecutor(), {BSON("ts" << timestamp)}, [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); result = theResult; operations = theOperations; barrier.countDownAndWait(); })); getApplier()->start(); barrier.countDownAndWait(); _applier.reset(); stdx::lock_guard lock(mutex); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(timestamp, result.getValue()); ASSERT_TRUE(operations.empty()); } TEST_F(ApplierTest, ApplyOperationSuccessful) { // Bogus operations codes. Applier::Operations operationsToApply{ BSON("op" << "a" << "ts" << Timestamp(Seconds(123), 0)), BSON("op" << "b" << "ts" << Timestamp(Seconds(456), 0)), BSON("op" << "c" << "ts" << Timestamp(Seconds(789), 0)), }; stdx::mutex mutex; StatusWith result = getDetectableErrorStatus(); bool areWritesReplicationOnOperationContext = true; bool isLockBatchWriter = false; Applier::Operations operationsApplied; Applier::Operations operationsOnCompletion; auto apply = [&](OperationContext* txn, const BSONObj& operation) { stdx::lock_guard lock(mutex); areWritesReplicationOnOperationContext = txn->writesAreReplicated(); isLockBatchWriter = txn->lockState()->isBatchWriter(); operationsApplied.push_back(operation); return Status::OK(); }; auto callback = [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); result = theResult; operationsOnCompletion = theOperations; }; _applier.reset(new Applier(&getExecutor(), operationsToApply, apply, callback)); _applier->start(); _applier->wait(); stdx::lock_guard lock(mutex); ASSERT_FALSE(areWritesReplicationOnOperationContext); ASSERT_TRUE(isLockBatchWriter); ASSERT_EQUALS(operationsToApply.size(), operationsApplied.size()); ASSERT_EQUALS(operationsToApply[0], operationsApplied[0]); ASSERT_EQUALS(operationsToApply[1], operationsApplied[1]); ASSERT_EQUALS(operationsToApply[2], operationsApplied[2]); ASSERT_OK(result.getStatus()); ASSERT_EQUALS(operationsToApply[2]["ts"].timestamp(), result.getValue()); ASSERT_TRUE(operationsOnCompletion.empty()); } void ApplierTest::_testApplyOperationFailed(size_t opIndex, stdx::function fail) { // Bogus operations codes. Applier::Operations operationsToApply{ BSON("op" << "a" << "ts" << Timestamp(Seconds(123), 0)), BSON("op" << "b" << "ts" << Timestamp(Seconds(456), 0)), BSON("op" << "c" << "ts" << Timestamp(Seconds(789), 0)), }; stdx::mutex mutex; StatusWith result = getDetectableErrorStatus(); Applier::Operations operationsApplied; Applier::Operations operationsOnCompletion; auto apply = [&](OperationContext* txn, const BSONObj& operation) { stdx::lock_guard lock(mutex); if (operationsApplied.size() == opIndex) { return fail(); } operationsApplied.push_back(operation); return Status::OK(); }; auto callback = [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); result = theResult; operationsOnCompletion = theOperations; }; _applier.reset(new Applier(&getExecutor(), operationsToApply, apply, callback)); _applier->start(); _applier->wait(); stdx::lock_guard lock(mutex); ASSERT_EQUALS(opIndex, operationsApplied.size()); size_t i = 0; for (const auto& operation : operationsApplied) { ASSERT_EQUALS(operationsToApply[i], operation); i++; } ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); ASSERT_EQUALS(operationsToApply.size() - opIndex, operationsOnCompletion.size()); ASSERT_EQUALS(opIndex, i); for (const auto& operation : operationsOnCompletion) { ASSERT_EQUALS(operationsToApply[i], operation); i++; } } TEST_F(ApplierTest, ApplyOperationFailedOnFirstOperation) { _testApplyOperationFailed(0U, []() { return Status(ErrorCodes::OperationFailed, ""); }); } TEST_F(ApplierTest, ApplyOperationThrowsExceptionOnFirstOperation) { _testApplyOperationFailed(0U, []() { uasserted(ErrorCodes::OperationFailed, ""); MONGO_UNREACHABLE; return Status(ErrorCodes::InternalError, "unreachable"); }); } TEST_F(ApplierTest, ApplyOperationFailedOnSecondOperation) { _testApplyOperationFailed(1U, []() { return Status(ErrorCodes::OperationFailed, ""); }); } TEST_F(ApplierTest, ApplyOperationThrowsExceptionOnSecondOperation) { _testApplyOperationFailed(1U, []() { uasserted(ErrorCodes::OperationFailed, ""); MONGO_UNREACHABLE; return Status(ErrorCodes::InternalError, "unreachable"); }); } TEST_F(ApplierTest, ApplyOperationFailedOnLastOperation) { _testApplyOperationFailed(2U, []() { return Status(ErrorCodes::OperationFailed, ""); }); } TEST_F(ApplierTest, ApplyOperationThrowsExceptionOnLastOperation) { _testApplyOperationFailed(2U, []() { uasserted(ErrorCodes::OperationFailed, ""); MONGO_UNREACHABLE; return Status(ErrorCodes::InternalError, "unreachable"); }); } class ApplyUntilAndPauseTest : public ApplierTest {}; TEST_F(ApplyUntilAndPauseTest, EmptyOperations) { auto result = applyUntilAndPause( &getExecutor(), {}, [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, Timestamp(Seconds(123), 0), [] {}, [](const StatusWith& theResult, const Operations& theOperations) {}); ASSERT_EQUALS(ErrorCodes::BadValue, result.getStatus().code()); } TEST_F(ApplyUntilAndPauseTest, NoOperationsInRange) { auto result = applyUntilAndPause( &getExecutor(), { BSON("ts" << Timestamp(Seconds(456), 0)), BSON("ts" << Timestamp(Seconds(789), 0)), }, [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, Timestamp(Seconds(123), 0), [] {}, [](const StatusWith& theResult, const Operations& theOperations) {}); ASSERT_EQUALS(ErrorCodes::BadValue, result.getStatus().code()); } TEST_F(ApplyUntilAndPauseTest, OperationMissingTimestampField) { auto result = applyUntilAndPause( &getExecutor(), {BSONObj()}, [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, Timestamp(Seconds(123), 0), [] {}, [](const StatusWith& theResult, const Operations& theOperations) {}); ASSERT_EQUALS(ErrorCodes::FailedToParse, result.getStatus().code()); } TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperation) { Timestamp ts(Seconds(123), 0); const Operations operationsToApply{BSON("ts" << ts)}; stdx::mutex mutex; StatusWith completionResult = getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsOnCompletion; auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; auto pause = [&] { stdx::lock_guard lock(mutex); pauseCalled = true; }; auto callback = [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); completionResult = theResult; operationsOnCompletion = theOperations; }; auto result = applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts, pause, callback); ASSERT_OK(result.getStatus()); _applier = std::move(result.getValue().first); ASSERT_TRUE(_applier); const Applier::Operations& operationsDiscarded = result.getValue().second; ASSERT_TRUE(operationsDiscarded.empty()); _applier->start(); _applier->wait(); stdx::lock_guard lock(mutex); ASSERT_TRUE(pauseCalled); ASSERT_OK(completionResult.getStatus()); ASSERT_EQUALS(ts, completionResult.getValue()); ASSERT_TRUE(operationsOnCompletion.empty()); } TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationTimestampNotInOperations) { Timestamp ts(Seconds(123), 0); const Operations operationsToApply{BSON("ts" << ts)}; stdx::mutex mutex; StatusWith completionResult = getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsOnCompletion; auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; auto pause = [&] { stdx::lock_guard lock(mutex); pauseCalled = true; }; auto callback = [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); completionResult = theResult; operationsOnCompletion = theOperations; }; Timestamp ts2(Seconds(456), 0); auto result = applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts2, pause, callback); ASSERT_OK(result.getStatus()); _applier = std::move(result.getValue().first); ASSERT_TRUE(_applier); const Applier::Operations& operationsDiscarded = result.getValue().second; ASSERT_TRUE(operationsDiscarded.empty()); _applier->start(); _applier->wait(); stdx::lock_guard lock(mutex); ASSERT_FALSE(pauseCalled); ASSERT_OK(completionResult.getStatus()); ASSERT_EQUALS(ts, completionResult.getValue()); ASSERT_TRUE(operationsOnCompletion.empty()); } TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationAppliedFailed) { Timestamp ts(Seconds(123), 0); const Operations operationsToApply{BSON("ts" << ts)}; stdx::mutex mutex; StatusWith completionResult = getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsOnCompletion; auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status(ErrorCodes::OperationFailed, ""); }; auto pause = [&] { stdx::lock_guard lock(mutex); pauseCalled = true; }; auto callback = [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); completionResult = theResult; operationsOnCompletion = theOperations; }; auto result = applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts, pause, callback); ASSERT_OK(result.getStatus()); _applier = std::move(result.getValue().first); ASSERT_TRUE(_applier); const Applier::Operations& operationsDiscarded = result.getValue().second; ASSERT_TRUE(operationsDiscarded.empty()); _applier->start(); _applier->wait(); stdx::lock_guard lock(mutex); ASSERT_FALSE(pauseCalled); ASSERT_NOT_OK(completionResult.getStatus()); ASSERT_FALSE(operationsOnCompletion.empty()); } void _testApplyUntilAndPauseDiscardOperations(ReplicationExecutor* executor, const Timestamp& ts, bool expectedPauseCalled) { Applier::Operations operationsToApply{ BSON("op" << "a" << "ts" << Timestamp(Seconds(123), 0)), BSON("op" << "b" << "ts" << Timestamp(Seconds(456), 0)), BSON("op" << "c" << "ts" << Timestamp(Seconds(789), 0)), }; stdx::mutex mutex; StatusWith completionResult = ApplyUntilAndPauseTest::getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsApplied; Applier::Operations operationsOnCompletion; auto apply = [&](OperationContext* txn, const BSONObj& operation) { stdx::lock_guard lock(mutex); operationsApplied.push_back(operation); return Status::OK(); }; auto pause = [&] { stdx::lock_guard lock(mutex); pauseCalled = true; }; auto callback = [&](const StatusWith& theResult, const Operations& theOperations) { stdx::lock_guard lock(mutex); completionResult = theResult; operationsOnCompletion = theOperations; }; auto result = applyUntilAndPause(executor, operationsToApply, apply, ts, pause, callback); ASSERT_OK(result.getStatus()); ASSERT_TRUE(result.getValue().first); Applier& applier = *result.getValue().first; const Applier::Operations& operationsDiscarded = result.getValue().second; ASSERT_EQUALS(1U, operationsDiscarded.size()); ASSERT_EQUALS(operationsToApply[2], operationsDiscarded[0]); applier.start(); applier.wait(); stdx::lock_guard lock(mutex); ASSERT_EQUALS(2U, operationsApplied.size()); ASSERT_EQUALS(operationsToApply[0], operationsApplied[0]); ASSERT_EQUALS(operationsToApply[1], operationsApplied[1]); ASSERT_EQUALS(expectedPauseCalled, pauseCalled); ASSERT_OK(completionResult.getStatus()); ASSERT_TRUE(operationsOnCompletion.empty()); } TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseDiscardOperationsTimestampInOperations) { _testApplyUntilAndPauseDiscardOperations(&getExecutor(), Timestamp(Seconds(456), 0), true); } TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseDiscardOperationsTimestampNotInOperations) { _testApplyUntilAndPauseDiscardOperations(&getExecutor(), Timestamp(Seconds(500), 0), false); } } // namespace