summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-06-03 06:41:29 -0400
committerBenety Goh <benety@mongodb.com>2015-06-05 10:23:50 -0400
commite8bff54a2234cc292d9faefeaa6fedc2d593611f (patch)
tree61d3b8e21b56587a51a54ac62110208537346046
parented3122d7e32fe928d40a8003b2fa88c93942230b (diff)
downloadmongo-e8bff54a2234cc292d9faefeaa6fedc2d593611f.tar.gz
SERVER-18037 added applyUntilAndPause to applier
-rw-r--r--src/mongo/db/repl/applier.cpp68
-rw-r--r--src/mongo/db/repl/applier.h31
-rw-r--r--src/mongo/db/repl/applier_test.cpp254
3 files changed, 343 insertions, 10 deletions
diff --git a/src/mongo/db/repl/applier.cpp b/src/mongo/db/repl/applier.cpp
index e9fe83b908c..05e5b0ca1f0 100644
--- a/src/mongo/db/repl/applier.cpp
+++ b/src/mongo/db/repl/applier.cpp
@@ -30,6 +30,8 @@
#include "mongo/db/repl/applier.h"
+#include <algorithm>
+
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/util/assert_util.h"
@@ -166,5 +168,71 @@ namespace repl {
_condition.notify_all();
}
+namespace {
+
+ void pauseBeforeCompletion(
+ const StatusWith<Timestamp>& result,
+ const Applier::Operations& operationsOnCompletion,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const Applier::CallbackFn& onCompletion) {
+
+ if (result.isOK()) {
+ pauseDataReplicator();
+ }
+ onCompletion(result, operationsOnCompletion);
+ };
+
+} // namespace
+
+ StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations> > applyUntilAndPause(
+ ReplicationExecutor* executor,
+ const Applier::Operations& operations,
+ const Applier::ApplyOperationFn& applyOperation,
+ const Timestamp& lastTimestampToApply,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const Applier::CallbackFn& onCompletion) {
+
+ try {
+ auto comp = [](const BSONObj& left, const BSONObj& right) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Operation missing 'ts' field': " << left,
+ left.hasField("ts"));
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Operation missing 'ts' field': " << left,
+ right.hasField("ts"));
+ return left["ts"].timestamp() < right["ts"].timestamp();
+ };
+ auto wrapped = BSON("ts" << lastTimestampToApply);
+ auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp);
+ bool found = i != operations.cend() && !comp(wrapped, *i);
+ auto j = found ? i+1 : i;
+ Applier::Operations operationsInRange(operations.cbegin(), j);
+ Applier::Operations operationsNotInRange(j, operations.cend());
+ if (!found) {
+ return std::make_pair(
+ std::unique_ptr<Applier>(
+ new Applier(executor, operationsInRange, applyOperation, onCompletion)),
+ operationsNotInRange);
+ }
+
+ return std::make_pair(
+ std::unique_ptr<Applier>(new Applier(
+ executor,
+ operationsInRange,
+ applyOperation,
+ stdx::bind(pauseBeforeCompletion,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ pauseDataReplicator,
+ onCompletion))),
+ operationsNotInRange);
+ }
+ catch (...) {
+ return exceptionToStatus();
+ }
+ MONGO_UNREACHABLE;
+ return Status(ErrorCodes::InternalError, "unreachable");
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/applier.h b/src/mongo/db/repl/applier.h
index 43ace1dfa23..2753837f862 100644
--- a/src/mongo/db/repl/applier.h
+++ b/src/mongo/db/repl/applier.h
@@ -28,13 +28,15 @@
#pragma once
+#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
-#include "mongo/bson/bsonobj.h"
+#include "mongo/db/jsobj.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
@@ -47,6 +49,9 @@ namespace repl {
MONGO_DISALLOW_COPYING(Applier);
public:
+ /**
+ * Operations sorted by timestamp in ascending order.
+ */
using Operations = std::vector<BSONObj>;
/**
@@ -140,7 +145,29 @@ namespace repl {
bool _active;
ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle;
-};
+ };
+
+
+ /**
+ * Applies operations (sorted by timestamp) up to and including 'lastTimestampToApply'.
+ * If 'lastTimestampToApply' is found in 'operations':
+ * - The applier will be given a subset of 'operations' (includes 'lastTimestampToApply').
+ * - On success, the applier will invoke the 'pause' function just before reporting
+ * completion status.
+ * Otherwise, all entries in 'operations' before 'lastTimestampToApply' will be forwarded to
+ * the applier and the 'pause' function will be ignored.
+ * If the applier is successfully created, returns the applier and a list of operations that
+ * are skipped (operations with 'ts' field value after 'lastTimestampToApply).
+ */
+ using PauseDataReplicatorFn = stdx::function<void ()>;
+
+ StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations> > applyUntilAndPause(
+ ReplicationExecutor* executor,
+ const Applier::Operations& operations,
+ const Applier::ApplyOperationFn& applyOperation,
+ const Timestamp& lastTimestampToApply,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const Applier::CallbackFn& onCompletion);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/applier_test.cpp b/src/mongo/db/repl/applier_test.cpp
index 690a785b846..b60212f9a2e 100644
--- a/src/mongo/db/repl/applier_test.cpp
+++ b/src/mongo/db/repl/applier_test.cpp
@@ -103,24 +103,40 @@ namespace {
auto callback = [](const StatusWith<Timestamp>& status, const Operations& operations) { };
// Null executor.
- ASSERT_THROWS(Applier(nullptr, operations, apply, callback), UserException);
+ ASSERT_THROWS_CODE(
+ Applier(nullptr, operations, apply, callback),
+ UserException,
+ ErrorCodes::BadValue);
// Empty list of operations.
- ASSERT_THROWS(Applier(&getExecutor(), {}, apply, callback), UserException);
+ ASSERT_THROWS_CODE(
+ Applier(&getExecutor(), {}, apply, callback),
+ UserException,
+ ErrorCodes::BadValue);
// Last operation missing timestamp field.
- ASSERT_THROWS(Applier(&getExecutor(), {BSONObj()}, apply, callback), UserException);
+ ASSERT_THROWS_CODE(
+ Applier(&getExecutor(), {BSONObj()}, apply, callback),
+ UserException,
+ ErrorCodes::FailedToParse);
// "ts" field in last operation not a timestamp.
- ASSERT_THROWS(Applier(&getExecutor(), {BSON("ts" << 99)}, apply, callback), UserException);
+ ASSERT_THROWS_CODE(
+ Applier(&getExecutor(), {BSON("ts" << 99)}, apply, callback),
+ UserException,
+ ErrorCodes::TypeMismatch);
// Invalid apply operation function.
- ASSERT_THROWS(Applier(&getExecutor(), operations, Applier::ApplyOperationFn(), callback),
- UserException);
+ ASSERT_THROWS_CODE(
+ Applier(&getExecutor(), operations, Applier::ApplyOperationFn(), callback),
+ UserException,
+ ErrorCodes::BadValue);
// Invalid callback function.
- ASSERT_THROWS(Applier(&getExecutor(), operations, apply, Applier::CallbackFn()),
- UserException);
+ ASSERT_THROWS_CODE(
+ Applier(&getExecutor(), operations, apply, Applier::CallbackFn()),
+ UserException,
+ ErrorCodes::BadValue);
}
TEST_F(ApplierTest, GetDiagnosticString) {
@@ -431,4 +447,226 @@ namespace {
});
}
+ class ApplyUntilAndPauseTest : public ApplierTest {};
+
+ TEST_F(ApplyUntilAndPauseTest, EmptyOperations) {
+ auto result =
+ applyUntilAndPause(
+ &getExecutor(),
+ {},
+ [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ Timestamp(Seconds(123), 0),
+ [] {},
+ [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {});
+ ASSERT_EQUALS(ErrorCodes::BadValue, result.getStatus().code());
+ }
+
+ TEST_F(ApplyUntilAndPauseTest, NoOperationsInRange) {
+ auto result =
+ applyUntilAndPause(
+ &getExecutor(),
+ {
+ BSON("ts" << Timestamp(Seconds(456), 0)),
+ BSON("ts" << Timestamp(Seconds(789), 0)),
+ },
+ [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ Timestamp(Seconds(123), 0),
+ [] {},
+ [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {});
+ ASSERT_EQUALS(ErrorCodes::BadValue, result.getStatus().code());
+ }
+
+ TEST_F(ApplyUntilAndPauseTest, OperationMissingTimestampField) {
+ auto result =
+ applyUntilAndPause(
+ &getExecutor(),
+ {BSONObj()},
+ [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ Timestamp(Seconds(123), 0),
+ [] {},
+ [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {});
+ ASSERT_EQUALS(ErrorCodes::FailedToParse, result.getStatus().code());
+ }
+
+ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperation) {
+ Timestamp ts(Seconds(123), 0);
+ const Operations operationsToApply{BSON("ts" << ts)};
+ stdx::mutex mutex;
+ StatusWith<Timestamp> completionResult = getDetectableErrorStatus();
+ bool pauseCalled = false;
+ Applier::Operations operationsOnCompletion;
+ auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); };
+ auto pause = [&] {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ pauseCalled = true;
+ };
+ auto callback = [&](const StatusWith<Timestamp>& theResult,
+ const Operations& theOperations) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ completionResult = theResult;
+ operationsOnCompletion = theOperations;
+ };
+
+ auto result =
+ applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts, pause, callback);
+ ASSERT_OK(result.getStatus());
+ _applier = std::move(result.getValue().first);
+ ASSERT_TRUE(_applier);
+
+ const Applier::Operations& operationsDiscarded = result.getValue().second;
+ ASSERT_TRUE(operationsDiscarded.empty());
+
+ _applier->start();
+ _applier->wait();
+
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ ASSERT_TRUE(pauseCalled);
+ ASSERT_OK(completionResult.getStatus());
+ ASSERT_EQUALS(ts, completionResult.getValue());
+ ASSERT_TRUE(operationsOnCompletion.empty());
+ }
+
+ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationTimestampNotInOperations) {
+ Timestamp ts(Seconds(123), 0);
+ const Operations operationsToApply{BSON("ts" << ts)};
+ stdx::mutex mutex;
+ StatusWith<Timestamp> completionResult = getDetectableErrorStatus();
+ bool pauseCalled = false;
+ Applier::Operations operationsOnCompletion;
+ auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); };
+ auto pause = [&] {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ pauseCalled = true;
+ };
+ auto callback = [&](const StatusWith<Timestamp>& theResult,
+ const Operations& theOperations) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ completionResult = theResult;
+ operationsOnCompletion = theOperations;
+ };
+
+ Timestamp ts2(Seconds(456), 0);
+ auto result =
+ applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts2, pause, callback);
+ ASSERT_OK(result.getStatus());
+ _applier = std::move(result.getValue().first);
+ ASSERT_TRUE(_applier);
+
+ const Applier::Operations& operationsDiscarded = result.getValue().second;
+ ASSERT_TRUE(operationsDiscarded.empty());
+
+ _applier->start();
+ _applier->wait();
+
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ ASSERT_FALSE(pauseCalled);
+ ASSERT_OK(completionResult.getStatus());
+ ASSERT_EQUALS(ts, completionResult.getValue());
+ ASSERT_TRUE(operationsOnCompletion.empty());
+ }
+
+ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationAppliedFailed) {
+ Timestamp ts(Seconds(123), 0);
+ const Operations operationsToApply{BSON("ts" << ts)};
+ stdx::mutex mutex;
+ StatusWith<Timestamp> completionResult = getDetectableErrorStatus();
+ bool pauseCalled = false;
+ Applier::Operations operationsOnCompletion;
+ auto apply = [](OperationContext* txn, const BSONObj& operation) {
+ return Status(ErrorCodes::OperationFailed, "");
+ };
+ auto pause = [&] {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ pauseCalled = true;
+ };
+ auto callback = [&](const StatusWith<Timestamp>& theResult,
+ const Operations& theOperations) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ completionResult = theResult;
+ operationsOnCompletion = theOperations;
+ };
+
+ auto result =
+ applyUntilAndPause(&getExecutor(), operationsToApply, apply, ts, pause, callback);
+ ASSERT_OK(result.getStatus());
+ _applier = std::move(result.getValue().first);
+ ASSERT_TRUE(_applier);
+
+ const Applier::Operations& operationsDiscarded = result.getValue().second;
+ ASSERT_TRUE(operationsDiscarded.empty());
+
+ _applier->start();
+ _applier->wait();
+
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ ASSERT_FALSE(pauseCalled);
+ ASSERT_NOT_OK(completionResult.getStatus());
+ ASSERT_FALSE(operationsOnCompletion.empty());
+ }
+
+ void _testApplyUntilAndPauseDiscardOperations(ReplicationExecutor* executor,
+ const Timestamp& ts,
+ bool expectedPauseCalled) {
+
+ Applier::Operations operationsToApply{
+ BSON("op" << "a" << "ts" << Timestamp(Seconds(123), 0)),
+ BSON("op" << "b" << "ts" << Timestamp(Seconds(456), 0)),
+ BSON("op" << "c" << "ts" << Timestamp(Seconds(789), 0)),
+ };
+ stdx::mutex mutex;
+ StatusWith<Timestamp> completionResult =
+ ApplyUntilAndPauseTest::getDetectableErrorStatus();
+ bool pauseCalled = false;
+ Applier::Operations operationsApplied;
+ Applier::Operations operationsOnCompletion;
+ auto apply = [&](OperationContext* txn, const BSONObj& operation) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationsApplied.push_back(operation);
+ return Status::OK();
+ };
+ auto pause = [&] {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ pauseCalled = true;
+ };
+ auto callback = [&](const StatusWith<Timestamp>& theResult,
+ const Operations& theOperations) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ completionResult = theResult;
+ operationsOnCompletion = theOperations;
+ };
+
+ auto result =
+ applyUntilAndPause(executor, operationsToApply, apply, ts, pause, callback);
+ ASSERT_OK(result.getStatus());
+ ASSERT_TRUE(result.getValue().first);
+ Applier& applier = *result.getValue().first;
+
+ const Applier::Operations& operationsDiscarded = result.getValue().second;
+ ASSERT_EQUALS(1U, operationsDiscarded.size());
+ ASSERT_EQUALS(operationsToApply[2], operationsDiscarded[0]);
+
+ applier.start();
+ applier.wait();
+
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ ASSERT_EQUALS(2U, operationsApplied.size());
+ ASSERT_EQUALS(operationsToApply[0], operationsApplied[0]);
+ ASSERT_EQUALS(operationsToApply[1], operationsApplied[1]);
+ ASSERT_EQUALS(expectedPauseCalled, pauseCalled);
+ ASSERT_OK(completionResult.getStatus());
+ ASSERT_TRUE(operationsOnCompletion.empty());
+ }
+
+ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseDiscardOperationsTimestampInOperations) {
+ _testApplyUntilAndPauseDiscardOperations(&getExecutor(),
+ Timestamp(Seconds(456), 0),
+ true);
+ }
+
+ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseDiscardOperationsTimestampNotInOperations) {
+ _testApplyUntilAndPauseDiscardOperations(&getExecutor(),
+ Timestamp(Seconds(500), 0),
+ false);
+ }
+
} // namespace