/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/repl/multiapplier.h" #include #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" #include "mongo/util/destructor_guard.h" namespace mongo { namespace repl { MultiApplier::MultiApplier(executor::TaskExecutor* executor, const Operations& operations, const ApplyOperationFn& applyOperation, const MultiApplyFn& multiApply, const CallbackFn& onCompletion) : _executor(executor), _operations(operations), _applyOperation(applyOperation), _multiApply(multiApply), _onCompletion(onCompletion) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); uassert(ErrorCodes::FailedToParse, str::stream() << "last operation missing 'ts' field: " << operations.back().raw, operations.back().raw.hasField("ts")); uassert(ErrorCodes::TypeMismatch, str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw, BSONType::bsonTimestamp == operations.back().raw.getField("ts").type()); uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation); uassert(ErrorCodes::BadValue, "multi apply function cannot be null", multiApply); uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); } MultiApplier::~MultiApplier() { DESTRUCTOR_GUARD(shutdown(); join();); } bool MultiApplier::isActive() const { stdx::lock_guard lk(_mutex); return _isActive_inlock(); } bool MultiApplier::_isActive_inlock() const { return State::kRunning == _state || State::kShuttingDown == _state; } Status MultiApplier::startup() noexcept { stdx::lock_guard lk(_mutex); switch (_state) { case State::kPreStart: _state = State::kRunning; break; case State::kRunning: return Status(ErrorCodes::InternalError, "multi applier already started"); case State::kShuttingDown: return Status(ErrorCodes::ShutdownInProgress, "multi applier shutting down"); case State::kComplete: return Status(ErrorCodes::ShutdownInProgress, "multi applier completed"); } auto scheduleResult = _executor->scheduleWork(stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1)); if (!scheduleResult.isOK()) { _state = State::kComplete; return scheduleResult.getStatus(); } _dbWorkCallbackHandle = scheduleResult.getValue(); return Status::OK(); } void MultiApplier::shutdown() { stdx::lock_guard lk(_mutex); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. _state = State::kComplete; return; case State::kRunning: _state = State::kShuttingDown; break; case State::kShuttingDown: case State::kComplete: // Nothing to do if we are already in ShuttingDown or Complete state. return; } if (_dbWorkCallbackHandle.isValid()) { _executor->cancel(_dbWorkCallbackHandle); } } void MultiApplier::join() { stdx::unique_lock lk(_mutex); _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } MultiApplier::State MultiApplier::getState_forTest() const { stdx::lock_guard lk(_mutex); return _state; } void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) { if (!cbd.status.isOK()) { _finishCallback(cbd.status); return; } invariant(!_operations.empty()); StatusWith applyStatus(ErrorCodes::InternalError, "not mutated"); try { auto opCtx = cc().makeOperationContext(); applyStatus = _multiApply(opCtx.get(), _operations, _applyOperation); } catch (...) { applyStatus = exceptionToStatus(); } _finishCallback(applyStatus.getStatus()); } void MultiApplier::_finishCallback(const Status& result) { // After running callback function, clear '_onCompletion' to release any resources that might be // held by this function object. // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case // there is any logic that's invoked at the function object's destruction that might call into // this MultiApplier. 'onCompletion' must be declared before lock guard 'lock' so that it is // destroyed outside the lock. decltype(_onCompletion) onCompletion; { stdx::lock_guard lock(_mutex); invariant(_onCompletion); std::swap(_onCompletion, onCompletion); } onCompletion(result); stdx::lock_guard lk(_mutex); invariant(State::kComplete != _state); _state = State::kComplete; _condition.notify_all(); } std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state) { switch (state) { case MultiApplier::State::kPreStart: return os << "PreStart"; case MultiApplier::State::kRunning: return os << "Running"; case MultiApplier::State::kShuttingDown: return os << "ShuttingDown"; case MultiApplier::State::kComplete: return os << "Complete"; } MONGO_UNREACHABLE; } } // namespace repl } // namespace mongo