diff options
Diffstat (limited to 'src/mongo/db/repl/applier.cpp')
-rw-r--r-- | src/mongo/db/repl/applier.cpp | 318 |
1 files changed, 153 insertions, 165 deletions
diff --git a/src/mongo/db/repl/applier.cpp b/src/mongo/db/repl/applier.cpp index f1d6942e87a..9000e0ebf65 100644 --- a/src/mongo/db/repl/applier.cpp +++ b/src/mongo/db/repl/applier.cpp @@ -40,199 +40,187 @@ namespace mongo { namespace repl { - Applier::Applier(ReplicationExecutor* executor, - const Operations& operations, - const ApplyOperationFn& applyOperation, - const CallbackFn& onCompletion) - : _executor(executor), - _operations(operations), - _applyOperation(applyOperation), - _onCompletion(onCompletion), - _active(false) { - - uassert(ErrorCodes::BadValue, "null replication executor", executor); - uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); - uassert(ErrorCodes::FailedToParse, - str::stream() << "last operation missing 'ts' field: " << operations.back(), - operations.back().hasField("ts")); - uassert(ErrorCodes::TypeMismatch, - str::stream() << "'ts' in last operation not a timestamp: " << operations.back(), - BSONType::bsonTimestamp == operations.back().getField("ts").type()); - uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); +Applier::Applier(ReplicationExecutor* executor, + const Operations& operations, + const ApplyOperationFn& applyOperation, + const CallbackFn& onCompletion) + : _executor(executor), + _operations(operations), + _applyOperation(applyOperation), + _onCompletion(onCompletion), + _active(false) { + uassert(ErrorCodes::BadValue, "null replication executor", executor); + uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); + uassert(ErrorCodes::FailedToParse, + str::stream() << "last operation missing 'ts' field: " << operations.back(), + operations.back().hasField("ts")); + uassert(ErrorCodes::TypeMismatch, + str::stream() << "'ts' in last operation not a timestamp: " << operations.back(), + BSONType::bsonTimestamp == operations.back().getField("ts").type()); + uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); +} + +Applier::~Applier() { + DESTRUCTOR_GUARD(cancel(); wait();); +} + +std::string Applier::getDiagnosticString() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "Applier"; + output << " executor: " << _executor->getDiagnosticString(); + output << " active: " << _active; + return output; +} + +bool Applier::isActive() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _active; +} + +Status Applier::start() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_active) { + return Status(ErrorCodes::IllegalOperation, "applier already started"); } - Applier::~Applier() { - DESTRUCTOR_GUARD( - cancel(); - wait(); - ); + auto scheduleResult = + _executor->scheduleDBWork(stdx::bind(&Applier::_callback, this, stdx::placeholders::_1)); + if (!scheduleResult.isOK()) { + return scheduleResult.getStatus(); } - std::string Applier::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "Applier"; - output << " executor: " << _executor->getDiagnosticString(); - output << " active: " << _active; - return output; - } + _active = true; + _dbWorkCallbackHandle = scheduleResult.getValue(); - bool Applier::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; - } + return Status::OK(); +} - Status Applier::start() { +void Applier::cancel() { + ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; + { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { - return Status(ErrorCodes::IllegalOperation, "applier already started"); + if (!_active) { + return; } - auto scheduleResult = _executor->scheduleDBWork( - stdx::bind(&Applier::_callback, this, stdx::placeholders::_1)); - if (!scheduleResult.isOK()) { - return scheduleResult.getStatus(); - } + dbWorkCallbackHandle = _dbWorkCallbackHandle; + } - _active = true; - _dbWorkCallbackHandle = scheduleResult.getValue(); + if (dbWorkCallbackHandle.isValid()) { + _executor->cancel(dbWorkCallbackHandle); + } +} + +void Applier::wait() { + stdx::unique_lock<stdx::mutex> lk(_mutex); - return Status::OK(); + while (_active) { + _condition.wait(lk); } +} - void Applier::cancel() { - ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void Applier::_callback(const ReplicationExecutor::CallbackArgs& cbd) { + if (!cbd.status.isOK()) { + _finishCallback(cbd.status, _operations); + return; + } - if (!_active) { - return; - } + invariant(cbd.txn); - dbWorkCallbackHandle = _dbWorkCallbackHandle; - } + // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp. + cbd.txn->setReplicatedWrites(false); - if (dbWorkCallbackHandle.isValid()) { - _executor->cancel(dbWorkCallbackHandle); - } - } + // allow us to get through the magic barrier + cbd.txn->lockState()->setIsBatchWriter(true); - void Applier::wait() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + Status applyStatus(ErrorCodes::InternalError, "not mutated"); - while (_active) { - _condition.wait(lk); + invariant(!_operations.empty()); + for (auto i = _operations.cbegin(); i != _operations.cend(); ++i) { + try { + applyStatus = _applyOperation(cbd.txn, *i); + } catch (...) { + applyStatus = exceptionToStatus(); } - } - - void Applier::_callback(const ReplicationExecutor::CallbackArgs& cbd) { - if (!cbd.status.isOK()) { - _finishCallback(cbd.status, _operations); + if (!applyStatus.isOK()) { + // 'i' points to last operation that was not applied. + _finishCallback(applyStatus, Operations(i, _operations.cend())); return; } - - invariant(cbd.txn); - - // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp. - cbd.txn->setReplicatedWrites(false); - - // allow us to get through the magic barrier - cbd.txn->lockState()->setIsBatchWriter(true); - - Status applyStatus(ErrorCodes::InternalError, "not mutated"); - - invariant(!_operations.empty()); - for (auto i = _operations.cbegin(); i != _operations.cend(); ++i) { - try { - applyStatus = _applyOperation(cbd.txn, *i); - } - catch (...) { - applyStatus = exceptionToStatus(); - } - if (!applyStatus.isOK()) { - // 'i' points to last operation that was not applied. - _finishCallback(applyStatus, Operations(i, _operations.cend())); - return; - } - } - _finishCallback(_operations.back().getField("ts").timestamp(), Operations()); } + _finishCallback(_operations.back().getField("ts").timestamp(), Operations()); +} - void Applier::_finishCallback(const StatusWith<Timestamp>& result, - const Operations& operations) { - _onCompletion(result, operations); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; - _condition.notify_all(); - } +void Applier::_finishCallback(const StatusWith<Timestamp>& result, const Operations& operations) { + _onCompletion(result, operations); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); +} namespace { - void pauseBeforeCompletion( - const StatusWith<Timestamp>& result, - const Applier::Operations& operationsOnCompletion, - const PauseDataReplicatorFn& pauseDataReplicator, - const Applier::CallbackFn& onCompletion) { - - if (result.isOK()) { - pauseDataReplicator(); +void pauseBeforeCompletion(const StatusWith<Timestamp>& result, + const Applier::Operations& operationsOnCompletion, + const PauseDataReplicatorFn& pauseDataReplicator, + const Applier::CallbackFn& onCompletion) { + if (result.isOK()) { + pauseDataReplicator(); + } + onCompletion(result, operationsOnCompletion); +}; + +} // namespace + +StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations>> applyUntilAndPause( + ReplicationExecutor* executor, + const Applier::Operations& operations, + const Applier::ApplyOperationFn& applyOperation, + const Timestamp& lastTimestampToApply, + const PauseDataReplicatorFn& pauseDataReplicator, + const Applier::CallbackFn& onCompletion) { + try { + auto comp = [](const BSONObj& left, const BSONObj& right) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "Operation missing 'ts' field': " << left, + left.hasField("ts")); + uassert(ErrorCodes::FailedToParse, + str::stream() << "Operation missing 'ts' field': " << right, + right.hasField("ts")); + return left["ts"].timestamp() < right["ts"].timestamp(); + }; + auto wrapped = BSON("ts" << lastTimestampToApply); + auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp); + bool found = i != operations.cend() && !comp(wrapped, *i); + auto j = found ? i + 1 : i; + Applier::Operations operationsInRange(operations.cbegin(), j); + Applier::Operations operationsNotInRange(j, operations.cend()); + if (!found) { + return std::make_pair(std::unique_ptr<Applier>(new Applier( + executor, operationsInRange, applyOperation, onCompletion)), + operationsNotInRange); } - onCompletion(result, operationsOnCompletion); - }; - -} // namespace - - StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations> > applyUntilAndPause( - ReplicationExecutor* executor, - const Applier::Operations& operations, - const Applier::ApplyOperationFn& applyOperation, - const Timestamp& lastTimestampToApply, - const PauseDataReplicatorFn& pauseDataReplicator, - const Applier::CallbackFn& onCompletion) { - try { - auto comp = [](const BSONObj& left, const BSONObj& right) { - uassert(ErrorCodes::FailedToParse, - str::stream() << "Operation missing 'ts' field': " << left, - left.hasField("ts")); - uassert(ErrorCodes::FailedToParse, - str::stream() << "Operation missing 'ts' field': " << right, - right.hasField("ts")); - return left["ts"].timestamp() < right["ts"].timestamp(); - }; - auto wrapped = BSON("ts" << lastTimestampToApply); - auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp); - bool found = i != operations.cend() && !comp(wrapped, *i); - auto j = found ? i+1 : i; - Applier::Operations operationsInRange(operations.cbegin(), j); - Applier::Operations operationsNotInRange(j, operations.cend()); - if (!found) { - return std::make_pair( - std::unique_ptr<Applier>( - new Applier(executor, operationsInRange, applyOperation, onCompletion)), - operationsNotInRange); - } - - return std::make_pair( - std::unique_ptr<Applier>(new Applier( - executor, - operationsInRange, - applyOperation, - stdx::bind(pauseBeforeCompletion, - stdx::placeholders::_1, - stdx::placeholders::_2, - pauseDataReplicator, - onCompletion))), - operationsNotInRange); - } - catch (...) { - return exceptionToStatus(); - } - MONGO_UNREACHABLE; - return Status(ErrorCodes::InternalError, "unreachable"); + return std::make_pair( + std::unique_ptr<Applier>(new Applier(executor, + operationsInRange, + applyOperation, + stdx::bind(pauseBeforeCompletion, + stdx::placeholders::_1, + stdx::placeholders::_2, + pauseDataReplicator, + onCompletion))), + operationsNotInRange); + } catch (...) { + return exceptionToStatus(); } + MONGO_UNREACHABLE; + return Status(ErrorCodes::InternalError, "unreachable"); +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |