/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/rollback_checker.h" #include "mongo/stdx/mutex.h" #include "mongo/util/log.h" namespace mongo { namespace repl { using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using UniqueLock = stdx::unique_lock; RollbackChecker::RollbackChecker(executor::TaskExecutor* executor, HostAndPort syncSource) : _executor(executor), _syncSource(syncSource), _baseRBID(-1), _lastRBID(-1) { uassert(ErrorCodes::BadValue, "null task executor", executor); } RollbackChecker::~RollbackChecker() {} RollbackChecker::CallbackHandle RollbackChecker::checkForRollback(const CallbackFn& nextAction) { return _scheduleGetRollbackId( [this, nextAction](const RemoteCommandCallbackArgs& args) { if (args.response.getStatus() == ErrorCodes::CallbackCanceled) { return; } if (!args.response.isOK()) { nextAction(args.response.getStatus()); return; } if (auto rbidElement = args.response.getValue().data["rbid"]) { int remoteRBID = rbidElement.numberInt(); UniqueLock lk(_mutex); bool hadRollback = _checkForRollback_inlock(remoteRBID); lk.unlock(); if (hadRollback) { nextAction(Status(ErrorCodes::UnrecoverableRollbackError, "RollbackChecker detected rollback occurred")); } else { nextAction(Status::OK()); } } else { nextAction(Status(ErrorCodes::CommandFailed, "replSetGetRBID command failed when checking for rollback")); } }, nextAction); } bool RollbackChecker::hasHadRollback() { // Default to true in case the callback is not called in an error case. bool hasHadRollback = true; auto cbh = checkForRollback( [&hasHadRollback](const Status& status) { hasHadRollback = !status.isOK(); }); _executor->wait(cbh); return hasHadRollback; } RollbackChecker::CallbackHandle RollbackChecker::reset(const CallbackFn& nextAction) { return _scheduleGetRollbackId( [this, nextAction](const RemoteCommandCallbackArgs& args) { if (args.response.getStatus() == ErrorCodes::CallbackCanceled) { return; } if (!args.response.isOK()) { nextAction(args.response.getStatus()); return; } if (auto rbidElement = args.response.getValue().data["rbid"]) { int newRBID = rbidElement.numberInt(); UniqueLock lk(_mutex); _setRBID_inlock(newRBID); lk.unlock(); nextAction(Status::OK()); } else { nextAction(Status(ErrorCodes::CommandFailed, "replSetGetRBID command failed when checking for rollback")); } }, nextAction); } Status RollbackChecker::reset_sync() { // Default to an error in case the callback is not called in an error case. Status resetStatus = Status(ErrorCodes::CommandFailed, "RollbackChecker reset failed"); auto cbh = reset([&resetStatus](const Status& status) { resetStatus = status; }); _executor->wait(cbh); return resetStatus; } int RollbackChecker::getBaseRBID_forTest() { stdx::lock_guard lk(_mutex); return _baseRBID; } int RollbackChecker::getLastRBID_forTest() { stdx::lock_guard lk(_mutex); return _lastRBID; } bool RollbackChecker::_checkForRollback_inlock(int remoteRBID) { _lastRBID = remoteRBID; return remoteRBID != _baseRBID; } RollbackChecker::CallbackHandle RollbackChecker::_scheduleGetRollbackId( const RemoteCommandCallbackFn& nextAction, const CallbackFn& errorFn) { executor::RemoteCommandRequest getRollbackIDReq( _syncSource, "admin", BSON("replSetGetRBID" << 1)); auto cbh = _executor->scheduleRemoteCommand(getRollbackIDReq, nextAction); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { return RollbackChecker::CallbackHandle(); } if (!cbh.isOK()) { errorFn(cbh.getStatus()); return RollbackChecker::CallbackHandle(); } return cbh.getValue(); } void RollbackChecker::_setRBID_inlock(int rbid) { _baseRBID = rbid; _lastRBID = rbid; } } // namespace repl } // namespace mongo