summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/applier.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/applier.cpp')
-rw-r--r--src/mongo/db/repl/applier.cpp318
1 files changed, 153 insertions, 165 deletions
diff --git a/src/mongo/db/repl/applier.cpp b/src/mongo/db/repl/applier.cpp
index f1d6942e87a..9000e0ebf65 100644
--- a/src/mongo/db/repl/applier.cpp
+++ b/src/mongo/db/repl/applier.cpp
@@ -40,199 +40,187 @@
namespace mongo {
namespace repl {
- Applier::Applier(ReplicationExecutor* executor,
- const Operations& operations,
- const ApplyOperationFn& applyOperation,
- const CallbackFn& onCompletion)
- : _executor(executor),
- _operations(operations),
- _applyOperation(applyOperation),
- _onCompletion(onCompletion),
- _active(false) {
-
- uassert(ErrorCodes::BadValue, "null replication executor", executor);
- uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "last operation missing 'ts' field: " << operations.back(),
- operations.back().hasField("ts"));
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "'ts' in last operation not a timestamp: " << operations.back(),
- BSONType::bsonTimestamp == operations.back().getField("ts").type());
- uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+Applier::Applier(ReplicationExecutor* executor,
+ const Operations& operations,
+ const ApplyOperationFn& applyOperation,
+ const CallbackFn& onCompletion)
+ : _executor(executor),
+ _operations(operations),
+ _applyOperation(applyOperation),
+ _onCompletion(onCompletion),
+ _active(false) {
+ uassert(ErrorCodes::BadValue, "null replication executor", executor);
+ uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "last operation missing 'ts' field: " << operations.back(),
+ operations.back().hasField("ts"));
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "'ts' in last operation not a timestamp: " << operations.back(),
+ BSONType::bsonTimestamp == operations.back().getField("ts").type());
+ uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+}
+
+Applier::~Applier() {
+ DESTRUCTOR_GUARD(cancel(); wait(););
+}
+
+std::string Applier::getDiagnosticString() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ str::stream output;
+ output << "Applier";
+ output << " executor: " << _executor->getDiagnosticString();
+ output << " active: " << _active;
+ return output;
+}
+
+bool Applier::isActive() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _active;
+}
+
+Status Applier::start() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_active) {
+ return Status(ErrorCodes::IllegalOperation, "applier already started");
}
- Applier::~Applier() {
- DESTRUCTOR_GUARD(
- cancel();
- wait();
- );
+ auto scheduleResult =
+ _executor->scheduleDBWork(stdx::bind(&Applier::_callback, this, stdx::placeholders::_1));
+ if (!scheduleResult.isOK()) {
+ return scheduleResult.getStatus();
}
- std::string Applier::getDiagnosticString() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- str::stream output;
- output << "Applier";
- output << " executor: " << _executor->getDiagnosticString();
- output << " active: " << _active;
- return output;
- }
+ _active = true;
+ _dbWorkCallbackHandle = scheduleResult.getValue();
- bool Applier::isActive() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
- }
+ return Status::OK();
+}
- Status Applier::start() {
+void Applier::cancel() {
+ ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
+ {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
- return Status(ErrorCodes::IllegalOperation, "applier already started");
+ if (!_active) {
+ return;
}
- auto scheduleResult = _executor->scheduleDBWork(
- stdx::bind(&Applier::_callback, this, stdx::placeholders::_1));
- if (!scheduleResult.isOK()) {
- return scheduleResult.getStatus();
- }
+ dbWorkCallbackHandle = _dbWorkCallbackHandle;
+ }
- _active = true;
- _dbWorkCallbackHandle = scheduleResult.getValue();
+ if (dbWorkCallbackHandle.isValid()) {
+ _executor->cancel(dbWorkCallbackHandle);
+ }
+}
+
+void Applier::wait() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
- return Status::OK();
+ while (_active) {
+ _condition.wait(lk);
}
+}
- void Applier::cancel() {
- ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void Applier::_callback(const ReplicationExecutor::CallbackArgs& cbd) {
+ if (!cbd.status.isOK()) {
+ _finishCallback(cbd.status, _operations);
+ return;
+ }
- if (!_active) {
- return;
- }
+ invariant(cbd.txn);
- dbWorkCallbackHandle = _dbWorkCallbackHandle;
- }
+ // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp.
+ cbd.txn->setReplicatedWrites(false);
- if (dbWorkCallbackHandle.isValid()) {
- _executor->cancel(dbWorkCallbackHandle);
- }
- }
+ // allow us to get through the magic barrier
+ cbd.txn->lockState()->setIsBatchWriter(true);
- void Applier::wait() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ Status applyStatus(ErrorCodes::InternalError, "not mutated");
- while (_active) {
- _condition.wait(lk);
+ invariant(!_operations.empty());
+ for (auto i = _operations.cbegin(); i != _operations.cend(); ++i) {
+ try {
+ applyStatus = _applyOperation(cbd.txn, *i);
+ } catch (...) {
+ applyStatus = exceptionToStatus();
}
- }
-
- void Applier::_callback(const ReplicationExecutor::CallbackArgs& cbd) {
- if (!cbd.status.isOK()) {
- _finishCallback(cbd.status, _operations);
+ if (!applyStatus.isOK()) {
+ // 'i' points to last operation that was not applied.
+ _finishCallback(applyStatus, Operations(i, _operations.cend()));
return;
}
-
- invariant(cbd.txn);
-
- // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp.
- cbd.txn->setReplicatedWrites(false);
-
- // allow us to get through the magic barrier
- cbd.txn->lockState()->setIsBatchWriter(true);
-
- Status applyStatus(ErrorCodes::InternalError, "not mutated");
-
- invariant(!_operations.empty());
- for (auto i = _operations.cbegin(); i != _operations.cend(); ++i) {
- try {
- applyStatus = _applyOperation(cbd.txn, *i);
- }
- catch (...) {
- applyStatus = exceptionToStatus();
- }
- if (!applyStatus.isOK()) {
- // 'i' points to last operation that was not applied.
- _finishCallback(applyStatus, Operations(i, _operations.cend()));
- return;
- }
- }
- _finishCallback(_operations.back().getField("ts").timestamp(), Operations());
}
+ _finishCallback(_operations.back().getField("ts").timestamp(), Operations());
+}
- void Applier::_finishCallback(const StatusWith<Timestamp>& result,
- const Operations& operations) {
- _onCompletion(result, operations);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
- _condition.notify_all();
- }
+void Applier::_finishCallback(const StatusWith<Timestamp>& result, const Operations& operations) {
+ _onCompletion(result, operations);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _active = false;
+ _condition.notify_all();
+}
namespace {
- void pauseBeforeCompletion(
- const StatusWith<Timestamp>& result,
- const Applier::Operations& operationsOnCompletion,
- const PauseDataReplicatorFn& pauseDataReplicator,
- const Applier::CallbackFn& onCompletion) {
-
- if (result.isOK()) {
- pauseDataReplicator();
+void pauseBeforeCompletion(const StatusWith<Timestamp>& result,
+ const Applier::Operations& operationsOnCompletion,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const Applier::CallbackFn& onCompletion) {
+ if (result.isOK()) {
+ pauseDataReplicator();
+ }
+ onCompletion(result, operationsOnCompletion);
+};
+
+} // namespace
+
+StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations>> applyUntilAndPause(
+ ReplicationExecutor* executor,
+ const Applier::Operations& operations,
+ const Applier::ApplyOperationFn& applyOperation,
+ const Timestamp& lastTimestampToApply,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const Applier::CallbackFn& onCompletion) {
+ try {
+ auto comp = [](const BSONObj& left, const BSONObj& right) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Operation missing 'ts' field': " << left,
+ left.hasField("ts"));
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Operation missing 'ts' field': " << right,
+ right.hasField("ts"));
+ return left["ts"].timestamp() < right["ts"].timestamp();
+ };
+ auto wrapped = BSON("ts" << lastTimestampToApply);
+ auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp);
+ bool found = i != operations.cend() && !comp(wrapped, *i);
+ auto j = found ? i + 1 : i;
+ Applier::Operations operationsInRange(operations.cbegin(), j);
+ Applier::Operations operationsNotInRange(j, operations.cend());
+ if (!found) {
+ return std::make_pair(std::unique_ptr<Applier>(new Applier(
+ executor, operationsInRange, applyOperation, onCompletion)),
+ operationsNotInRange);
}
- onCompletion(result, operationsOnCompletion);
- };
-
-} // namespace
-
- StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations> > applyUntilAndPause(
- ReplicationExecutor* executor,
- const Applier::Operations& operations,
- const Applier::ApplyOperationFn& applyOperation,
- const Timestamp& lastTimestampToApply,
- const PauseDataReplicatorFn& pauseDataReplicator,
- const Applier::CallbackFn& onCompletion) {
- try {
- auto comp = [](const BSONObj& left, const BSONObj& right) {
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Operation missing 'ts' field': " << left,
- left.hasField("ts"));
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Operation missing 'ts' field': " << right,
- right.hasField("ts"));
- return left["ts"].timestamp() < right["ts"].timestamp();
- };
- auto wrapped = BSON("ts" << lastTimestampToApply);
- auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp);
- bool found = i != operations.cend() && !comp(wrapped, *i);
- auto j = found ? i+1 : i;
- Applier::Operations operationsInRange(operations.cbegin(), j);
- Applier::Operations operationsNotInRange(j, operations.cend());
- if (!found) {
- return std::make_pair(
- std::unique_ptr<Applier>(
- new Applier(executor, operationsInRange, applyOperation, onCompletion)),
- operationsNotInRange);
- }
-
- return std::make_pair(
- std::unique_ptr<Applier>(new Applier(
- executor,
- operationsInRange,
- applyOperation,
- stdx::bind(pauseBeforeCompletion,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- pauseDataReplicator,
- onCompletion))),
- operationsNotInRange);
- }
- catch (...) {
- return exceptionToStatus();
- }
- MONGO_UNREACHABLE;
- return Status(ErrorCodes::InternalError, "unreachable");
+ return std::make_pair(
+ std::unique_ptr<Applier>(new Applier(executor,
+ operationsInRange,
+ applyOperation,
+ stdx::bind(pauseBeforeCompletion,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ pauseDataReplicator,
+ onCompletion))),
+ operationsNotInRange);
+ } catch (...) {
+ return exceptionToStatus();
}
+ MONGO_UNREACHABLE;
+ return Status(ErrorCodes::InternalError, "unreachable");
+}
-} // namespace repl
-} // namespace mongo
+} // namespace repl
+} // namespace mongo