diff options
author | Benety Goh <benety@mongodb.com> | 2015-06-03 06:41:29 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-06-05 10:23:50 -0400 |
commit | e8bff54a2234cc292d9faefeaa6fedc2d593611f (patch) | |
tree | 61d3b8e21b56587a51a54ac62110208537346046 | |
parent | ed3122d7e32fe928d40a8003b2fa88c93942230b (diff) | |
download | mongo-e8bff54a2234cc292d9faefeaa6fedc2d593611f.tar.gz |
SERVER-18037 added applyUntilAndPause to applier
-rw-r--r-- | src/mongo/db/repl/applier.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/repl/applier.h | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/applier_test.cpp | 254 |
3 files changed, 343 insertions, 10 deletions
diff --git a/src/mongo/db/repl/applier.cpp b/src/mongo/db/repl/applier.cpp index e9fe83b908c..05e5b0ca1f0 100644 --- a/src/mongo/db/repl/applier.cpp +++ b/src/mongo/db/repl/applier.cpp @@ -30,6 +30,8 @@ #include "mongo/db/repl/applier.h" +#include <algorithm> + #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/util/assert_util.h" @@ -166,5 +168,71 @@ namespace repl { _condition.notify_all(); } +namespace { + + void pauseBeforeCompletion( + const StatusWith<Timestamp>& result, + const Applier::Operations& operationsOnCompletion, + const PauseDataReplicatorFn& pauseDataReplicator, + const Applier::CallbackFn& onCompletion) { + + if (result.isOK()) { + pauseDataReplicator(); + } + onCompletion(result, operationsOnCompletion); + }; + +} // namespace + + StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations> > applyUntilAndPause( + ReplicationExecutor* executor, + const Applier::Operations& operations, + const Applier::ApplyOperationFn& applyOperation, + const Timestamp& lastTimestampToApply, + const PauseDataReplicatorFn& pauseDataReplicator, + const Applier::CallbackFn& onCompletion) { + + try { + auto comp = [](const BSONObj& left, const BSONObj& right) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "Operation missing 'ts' field': " << left, + left.hasField("ts")); + uassert(ErrorCodes::FailedToParse, + str::stream() << "Operation missing 'ts' field': " << left, + right.hasField("ts")); + return left["ts"].timestamp() < right["ts"].timestamp(); + }; + auto wrapped = BSON("ts" << lastTimestampToApply); + auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp); + bool found = i != operations.cend() && !comp(wrapped, *i); + auto j = found ? i+1 : i; + Applier::Operations operationsInRange(operations.cbegin(), j); + Applier::Operations operationsNotInRange(j, operations.cend()); + if (!found) { + return std::make_pair( + std::unique_ptr<Applier>( + new Applier(executor, operationsInRange, applyOperation, onCompletion)), + operationsNotInRange); + } + + return std::make_pair( + std::unique_ptr<Applier>(new Applier( + executor, + operationsInRange, + applyOperation, + stdx::bind(pauseBeforeCompletion, + stdx::placeholders::_1, + stdx::placeholders::_2, + pauseDataReplicator, + onCompletion))), + operationsNotInRange); + } + catch (...) { + return exceptionToStatus(); + } + MONGO_UNREACHABLE; + return Status(ErrorCodes::InternalError, "unreachable"); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/applier.h b/src/mongo/db/repl/applier.h index 43ace1dfa23..2753837f862 100644 --- a/src/mongo/db/repl/applier.h +++ b/src/mongo/db/repl/applier.h @@ -28,13 +28,15 @@ #pragma once +#include <memory> #include <string> +#include <utility> #include <vector> #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/base/status_with.h" -#include "mongo/bson/bsonobj.h" +#include "mongo/db/jsobj.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" @@ -47,6 +49,9 @@ namespace repl { MONGO_DISALLOW_COPYING(Applier); public: + /** + * Operations sorted by timestamp in ascending order. + */ using Operations = std::vector<BSONObj>; /** @@ -140,7 +145,29 @@ namespace repl { bool _active; ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle; -}; + }; + + + /** + * Applies operations (sorted by timestamp) up to and including 'lastTimestampToApply'. + * If 'lastTimestampToApply' is found in 'operations': + * - The applier will be given a subset of 'operations' (includes 'lastTimestampToApply'). + * - On success, the applier will invoke the 'pause' function just before reporting + * completion status. + * Otherwise, all entries in 'operations' before 'lastTimestampToApply' will be forwarded to + * the applier and the 'pause' function will be ignored. + * If the applier is successfully created, returns the applier and a list of operations that + * are skipped (operations with 'ts' field value after 'lastTimestampToApply). + */ + using PauseDataReplicatorFn = stdx::function<void ()>; + + StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations> > applyUntilAndPause( + ReplicationExecutor* executor, + const Applier::Operations& operations, + const Applier::ApplyOperationFn& applyOperation, + const Timestamp& lastTimestampToApply, + const PauseDataReplicatorFn& pauseDataReplicator, + const Applier::CallbackFn& onCompletion); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/applier_test.cpp b/src/mongo/db/repl/applier_test.cpp index 690a785b846..b60212f9a2e 100644 --- a/src/mongo/db/repl/applier_test.cpp +++ b/src/mongo/db/repl/applier_test.cpp @@ -103,24 +103,40 @@ namespace { auto callback = [](const StatusWith<Timestamp>& status, const Operations& operations) { }; // Null executor. - ASSERT_THROWS(Applier(nullptr, operations, apply, callback), UserException); + ASSERT_THROWS_CODE( + Applier(nullptr, operations, apply, callback), + UserException, + ErrorCodes::BadValue); // Empty list of operations. - ASSERT_THROWS(Applier(&getExecutor(), {}, apply, callback), UserException); + ASSERT_THROWS_CODE( + Applier(&getExecutor(), {}, apply, callback), + UserException, + ErrorCodes::BadValue); // Last operation missing timestamp field. - ASSERT_THROWS(Applier(&getExecutor(), {BSONObj()}, apply, callback), UserException); + ASSERT_THROWS_CODE( + Applier(&getExecutor(), {BSONObj()}, apply, callback), + UserException, + ErrorCodes::FailedToParse); // "ts" field in last operation not a timestamp. - ASSERT_THROWS(Applier(&getExecutor(), {BSON("ts" << 99)}, apply, callback), UserException); + ASSERT_THROWS_CODE( + Applier(&getExecutor(), {BSON("ts" << 99)}, apply, callback), + UserException, + ErrorCodes::TypeMismatch); // Invalid apply operation function. - ASSERT_THROWS(Applier(&getExecutor(), operations, Applier::ApplyOperationFn(), callback), - UserException); + ASSERT_THROWS_CODE( + Applier(&getExecutor(), operations, Applier::ApplyOperationFn(), callback), + UserException, + ErrorCodes::BadValue); // Invalid callback function. - ASSERT_THROWS(Applier(&getExecutor(), operations, apply, Applier::CallbackFn()), - UserException); + ASSERT_THROWS_CODE( + Applier(&getExecutor(), operations, apply, Applier::CallbackFn()), + UserException, + ErrorCodes::BadValue); } TEST_F(ApplierTest, GetDiagnosticString) { @@ -431,4 +447,226 @@ namespace { }); } + class ApplyUntilAndPauseTest : public ApplierTest {}; + + TEST_F(ApplyUntilAndPauseTest, EmptyOperations) { + auto result = + applyUntilAndPause( + &getExecutor(), + {}, + [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + Timestamp(Seconds(123), 0), + [] {}, + [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {}); + ASSERT_EQUALS(ErrorCodes::BadValue, result.getStatus().code()); + } + + TEST_F(ApplyUntilAndPauseTest, NoOperationsInRange) { + auto result = + applyUntilAndPause( + &getExecutor(), + { + BSON("ts" << Timestamp(Seconds(456), 0)), + BSON("ts" << Timestamp(Seconds(789), 0)), + }, + [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + Timestamp(Seconds(123), 0), + [] {}, + [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {}); + ASSERT_EQUALS(ErrorCodes::BadValue, result.getStatus().code()); + } + + TEST_F(ApplyUntilAndPauseTest, OperationMissingTimestampField) { + auto result = + applyUntilAndPause( + &getExecutor(), + {BSONObj()}, + [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + Timestamp(Seconds(123), 0), + [] {}, + [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {}); + ASSERT_EQUALS(ErrorCodes::FailedToParse, result.getStatus().code()); + } + + TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperation) { + Timestamp ts(Seconds(123), 0); + const Operations operationsToApply{BSON("ts" << ts)}; + stdx::mutex mutex; + StatusWith<Timestamp> completionResult = getDetectableErrorStatus(); + bool pauseCalled = false; + Applier::Operations operationsOnCompletion; + auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; + auto pause = [&] { + stdx::lock_guard<stdx::mutex> lock(mutex); + pauseCalled = true; + }; + auto callback = [&](const StatusWith<Timestamp>& theResult, + const Operations& theOperations) { + stdx::lock_guard<stdx::mutex> lock(mutex); + completionResult = theResult; + operationsOnCompletion = theOperations; + }; + + auto result = + applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts, pause, callback); + ASSERT_OK(result.getStatus()); + _applier = std::move(result.getValue().first); + ASSERT_TRUE(_applier); + + const Applier::Operations& operationsDiscarded = result.getValue().second; + ASSERT_TRUE(operationsDiscarded.empty()); + + _applier->start(); + _applier->wait(); + + stdx::lock_guard<stdx::mutex> lock(mutex); + ASSERT_TRUE(pauseCalled); + ASSERT_OK(completionResult.getStatus()); + ASSERT_EQUALS(ts, completionResult.getValue()); + ASSERT_TRUE(operationsOnCompletion.empty()); + } + + TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationTimestampNotInOperations) { + Timestamp ts(Seconds(123), 0); + const Operations operationsToApply{BSON("ts" << ts)}; + stdx::mutex mutex; + StatusWith<Timestamp> completionResult = getDetectableErrorStatus(); + bool pauseCalled = false; + Applier::Operations operationsOnCompletion; + auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; + auto pause = [&] { + stdx::lock_guard<stdx::mutex> lock(mutex); + pauseCalled = true; + }; + auto callback = [&](const StatusWith<Timestamp>& theResult, + const Operations& theOperations) { + stdx::lock_guard<stdx::mutex> lock(mutex); + completionResult = theResult; + operationsOnCompletion = theOperations; + }; + + Timestamp ts2(Seconds(456), 0); + auto result = + applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts2, pause, callback); + ASSERT_OK(result.getStatus()); + _applier = std::move(result.getValue().first); + ASSERT_TRUE(_applier); + + const Applier::Operations& operationsDiscarded = result.getValue().second; + ASSERT_TRUE(operationsDiscarded.empty()); + + _applier->start(); + _applier->wait(); + + stdx::lock_guard<stdx::mutex> lock(mutex); + ASSERT_FALSE(pauseCalled); + ASSERT_OK(completionResult.getStatus()); + ASSERT_EQUALS(ts, completionResult.getValue()); + ASSERT_TRUE(operationsOnCompletion.empty()); + } + + TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationAppliedFailed) { + Timestamp ts(Seconds(123), 0); + const Operations operationsToApply{BSON("ts" << ts)}; + stdx::mutex mutex; + StatusWith<Timestamp> completionResult = getDetectableErrorStatus(); + bool pauseCalled = false; + Applier::Operations operationsOnCompletion; + auto apply = [](OperationContext* txn, const BSONObj& operation) { + return Status(ErrorCodes::OperationFailed, ""); + }; + auto pause = [&] { + stdx::lock_guard<stdx::mutex> lock(mutex); + pauseCalled = true; + }; + auto callback = [&](const StatusWith<Timestamp>& theResult, + const Operations& theOperations) { + stdx::lock_guard<stdx::mutex> lock(mutex); + completionResult = theResult; + operationsOnCompletion = theOperations; + }; + + auto result = + applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts, pause, callback); + ASSERT_OK(result.getStatus()); + _applier = std::move(result.getValue().first); + ASSERT_TRUE(_applier); + + const Applier::Operations& operationsDiscarded = result.getValue().second; + ASSERT_TRUE(operationsDiscarded.empty()); + + _applier->start(); + _applier->wait(); + + stdx::lock_guard<stdx::mutex> lock(mutex); + ASSERT_FALSE(pauseCalled); + ASSERT_NOT_OK(completionResult.getStatus()); + ASSERT_FALSE(operationsOnCompletion.empty()); + } + + void _testApplyUntilAndPauseDiscardOperations(ReplicationExecutor* executor, + const Timestamp& ts, + bool expectedPauseCalled) { + + Applier::Operations operationsToApply{ + BSON("op" << "a" << "ts" << Timestamp(Seconds(123), 0)), + BSON("op" << "b" << "ts" << Timestamp(Seconds(456), 0)), + BSON("op" << "c" << "ts" << Timestamp(Seconds(789), 0)), + }; + stdx::mutex mutex; + StatusWith<Timestamp> completionResult = + ApplyUntilAndPauseTest::getDetectableErrorStatus(); + bool pauseCalled = false; + Applier::Operations operationsApplied; + Applier::Operations operationsOnCompletion; + auto apply = [&](OperationContext* txn, const BSONObj& operation) { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationsApplied.push_back(operation); + return Status::OK(); + }; + auto pause = [&] { + stdx::lock_guard<stdx::mutex> lock(mutex); + pauseCalled = true; + }; + auto callback = [&](const StatusWith<Timestamp>& theResult, + const Operations& theOperations) { + stdx::lock_guard<stdx::mutex> lock(mutex); + completionResult = theResult; + operationsOnCompletion = theOperations; + }; + + auto result = + applyUntilAndPause(executor, operationsToApply, apply, ts, pause, callback); + ASSERT_OK(result.getStatus()); + ASSERT_TRUE(result.getValue().first); + Applier& applier = *result.getValue().first; + + const Applier::Operations& operationsDiscarded = result.getValue().second; + ASSERT_EQUALS(1U, operationsDiscarded.size()); + ASSERT_EQUALS(operationsToApply[2], operationsDiscarded[0]); + + applier.start(); + applier.wait(); + + stdx::lock_guard<stdx::mutex> lock(mutex); + ASSERT_EQUALS(2U, operationsApplied.size()); + ASSERT_EQUALS(operationsToApply[0], operationsApplied[0]); + ASSERT_EQUALS(operationsToApply[1], operationsApplied[1]); + ASSERT_EQUALS(expectedPauseCalled, pauseCalled); + ASSERT_OK(completionResult.getStatus()); + ASSERT_TRUE(operationsOnCompletion.empty()); + } + + TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseDiscardOperationsTimestampInOperations) { + _testApplyUntilAndPauseDiscardOperations(&getExecutor(), + Timestamp(Seconds(456), 0), + true); + } + + TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseDiscardOperationsTimestampNotInOperations) { + _testApplyUntilAndPauseDiscardOperations(&getExecutor(), + Timestamp(Seconds(500), 0), + false); + } + } // namespace |