/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/multiapplier.h"
#include
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/util/destructor_guard.h"
namespace mongo {
namespace repl {
MultiApplier::MultiApplier(ReplicationExecutor* executor,
const Operations& operations,
const ApplyOperationFn& applyOperation,
const MultiApplyFn& multiApply,
const CallbackFn& onCompletion)
: _executor(executor),
_operations(operations),
_applyOperation(applyOperation),
_multiApply(multiApply),
_onCompletion(onCompletion),
_active(false) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
uassert(ErrorCodes::FailedToParse,
str::stream() << "last operation missing 'ts' field: " << operations.back().raw,
operations.back().raw.hasField("ts"));
uassert(ErrorCodes::TypeMismatch,
str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw,
BSONType::bsonTimestamp == operations.back().raw.getField("ts").type());
uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
}
MultiApplier::~MultiApplier() {
DESTRUCTOR_GUARD(cancel(); wait(););
}
std::string MultiApplier::getDiagnosticString() const {
stdx::lock_guard lk(_mutex);
str::stream output;
output << "MultiApplier";
output << " executor: " << _executor->getDiagnosticString();
output << " active: " << _active;
return output;
}
bool MultiApplier::isActive() const {
stdx::lock_guard lk(_mutex);
return _active;
}
Status MultiApplier::start() {
stdx::lock_guard lk(_mutex);
if (_active) {
return Status(ErrorCodes::IllegalOperation, "applier already started");
}
auto scheduleResult = _executor->scheduleDBWork(
stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1));
if (!scheduleResult.isOK()) {
return scheduleResult.getStatus();
}
_active = true;
_dbWorkCallbackHandle = scheduleResult.getValue();
return Status::OK();
}
void MultiApplier::cancel() {
ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
{
stdx::lock_guard lk(_mutex);
if (!_active) {
return;
}
dbWorkCallbackHandle = _dbWorkCallbackHandle;
}
if (dbWorkCallbackHandle.isValid()) {
_executor->cancel(dbWorkCallbackHandle);
}
}
void MultiApplier::wait() {
stdx::unique_lock lk(_mutex);
while (_active) {
_condition.wait(lk);
}
}
// TODO change the passed in function to be multiapply instead of apply inlock
void MultiApplier::_callback(const ReplicationExecutor::CallbackArgs& cbd) {
if (!cbd.status.isOK()) {
_finishCallback(cbd.status, _operations);
return;
}
invariant(cbd.txn);
// Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp.
cbd.txn->setReplicatedWrites(false);
// allow us to get through the magic barrier
cbd.txn->lockState()->setIsBatchWriter(true);
StatusWith applyStatus(ErrorCodes::InternalError, "not mutated");
invariant(!_operations.empty());
try {
applyStatus = _multiApply(cbd.txn, _operations, _applyOperation);
} catch (...) {
applyStatus = exceptionToStatus();
}
if (!applyStatus.isOK()) {
_finishCallback(applyStatus.getStatus(), _operations);
return;
}
_finishCallback(applyStatus.getValue().getTimestamp(), Operations());
}
void MultiApplier::_finishCallback(const StatusWith& result,
const Operations& operations) {
_onCompletion(result, operations);
stdx::lock_guard lk(_mutex);
_active = false;
_condition.notify_all();
}
namespace {
void pauseBeforeCompletion(const StatusWith& result,
const MultiApplier::Operations& operationsOnCompletion,
const PauseDataReplicatorFn& pauseDataReplicator,
const MultiApplier::CallbackFn& onCompletion) {
if (result.isOK()) {
pauseDataReplicator();
}
onCompletion(result, operationsOnCompletion);
};
} // namespace
StatusWith, MultiApplier::Operations>> applyUntilAndPause(
ReplicationExecutor* executor,
const MultiApplier::Operations& operations,
const MultiApplier::ApplyOperationFn& applyOperation,
const MultiApplier::MultiApplyFn& multiApply,
const Timestamp& lastTimestampToApply,
const PauseDataReplicatorFn& pauseDataReplicator,
const MultiApplier::CallbackFn& onCompletion) {
try {
auto comp = [](const OplogEntry& left, const OplogEntry& right) {
uassert(ErrorCodes::FailedToParse,
str::stream() << "Operation missing 'ts' field': " << left.raw,
left.raw.hasField("ts"));
uassert(ErrorCodes::FailedToParse,
str::stream() << "Operation missing 'ts' field': " << right.raw,
right.raw.hasField("ts"));
return left.raw["ts"].timestamp() < right.raw["ts"].timestamp();
};
auto wrapped = OplogEntry(BSON("ts" << lastTimestampToApply));
auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp);
bool found = i != operations.cend() && !comp(wrapped, *i);
auto j = found ? i + 1 : i;
MultiApplier::Operations operationsInRange(operations.cbegin(), j);
MultiApplier::Operations operationsNotInRange(j, operations.cend());
if (!found) {
return std::make_pair(
std::unique_ptr(new MultiApplier(
executor, operationsInRange, applyOperation, multiApply, onCompletion)),
operationsNotInRange);
}
return std::make_pair(
std::unique_ptr(new MultiApplier(executor,
operationsInRange,
applyOperation,
multiApply,
stdx::bind(pauseBeforeCompletion,
stdx::placeholders::_1,
stdx::placeholders::_2,
pauseDataReplicator,
onCompletion))),
operationsNotInRange);
} catch (...) {
return exceptionToStatus();
}
MONGO_UNREACHABLE;
return Status(ErrorCodes::InternalError, "unreachable");
}
} // namespace repl
} // namespace mongo