/* @file rs_rollback.cpp
*
* Copyright (C) 2008-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/rs_rollback.h"
#include
#include
#include "mongo/bson/bsonelement_comparator.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/roll_back_local_operations.h"
#include "mongo/db/repl/rollback_source.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
/* Scenarios
*
* We went offline with ops not replicated out.
*
* F = node that failed and coming back.
* P = node that took over, new primary
*
* #1:
* F : a b c d e f g
* P : a b c d q
*
* The design is "keep P". One could argue here that "keep F" has some merits, however, in most
* cases P will have significantly more data. Also note that P may have a proper subset of F's
* stream if there were no subsequent writes.
*
* For now the model is simply : get F back in sync with P. If P was really behind or something, we
* should have just chosen not to fail over anyway.
*
* #2:
* F : a b c d e f g -> a b c d
* P : a b c d
*
* #3:
* F : a b c d e f g -> a b c d q r s t u v w x z
* P : a b c d.q r s t u v w x z
*
* Steps
* find an event in common. 'd'.
* undo our events beyond that by:
* (1) taking copy from other server of those objects
* (2) do not consider copy valid until we pass reach an optime after when we fetched the new
* version of object
* -- i.e., reset minvalid.
* (3) we could skip operations on objects that are previous in time to our capture of the object
* as an optimization.
*
*/
namespace mongo {
using std::shared_ptr;
using std::unique_ptr;
using std::endl;
using std::list;
using std::map;
using std::multimap;
using std::set;
using std::string;
using std::pair;
namespace repl {
// Failpoint which causes rollback to hang before finishing.
MONGO_FP_DECLARE(rollbackHangBeforeFinish);
MONGO_FP_DECLARE(rollbackHangThenFailAfterWritingMinValid);
using namespace rollback_internal;
bool DocID::operator<(const DocID& other) const {
int comp = strcmp(ns, other.ns);
if (comp < 0)
return true;
if (comp > 0)
return false;
const StringData::ComparatorInterface* stringComparator = nullptr;
BSONElementComparator eltCmp(BSONElementComparator::FieldNamesMode::kIgnore, stringComparator);
return eltCmp.evaluate(_id < other._id);
}
bool DocID::operator==(const DocID& other) const {
// Since this is only used for tests, going with the simple impl that reuses operator< which is
// used in the real code.
return !(*this < other || other < *this);
}
void FixUpInfo::removeAllDocsToRefetchFor(const std::string& collection) {
docsToRefetch.erase(docsToRefetch.lower_bound(DocID::minFor(collection.c_str())),
docsToRefetch.upper_bound(DocID::maxFor(collection.c_str())));
}
void FixUpInfo::removeRedundantOperations() {
// These loops and their bodies can be done in any order. The final result of the FixUpInfo
// members will be the same either way.
for (const auto& collection : collectionsToDrop) {
removeAllDocsToRefetchFor(collection);
indexesToDrop.erase(collection);
collectionsToResyncMetadata.erase(collection);
}
for (const auto& collection : collectionsToResyncData) {
removeAllDocsToRefetchFor(collection);
indexesToDrop.erase(collection);
collectionsToResyncMetadata.erase(collection);
collectionsToDrop.erase(collection);
}
}
Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo,
const BSONObj& ourObj) {
const char* op = ourObj.getStringField("op");
if (*op == 'n')
return Status::OK();
if (ourObj.objsize() > 512 * 1024 * 1024)
throw RSFatalException("rollback too large");
DocID doc;
doc.ownedObj = ourObj.getOwned();
doc.ns = doc.ownedObj.getStringField("ns");
if (*doc.ns == '\0') {
throw RSFatalException(str::stream() << "local op on rollback has no ns: "
<< redact(doc.ownedObj));
}
BSONObj obj = doc.ownedObj.getObjectField(*op == 'u' ? "o2" : "o");
if (obj.isEmpty()) {
throw RSFatalException(str::stream() << "local op on rollback has no object field: "
<< redact(doc.ownedObj));
}
if (*op == 'c') {
BSONElement first = obj.firstElement();
NamespaceString nss(doc.ns); // foo.$cmd
string cmdname = first.fieldName();
Command* cmd = Command::findCommand(cmdname.c_str());
if (cmd == NULL) {
severe() << "rollback no such command " << first.fieldName();
return Status(ErrorCodes::UnrecoverableRollbackError,
str::stream() << "rollback no such command " << first.fieldName(),
18751);
}
if (cmdname == "create") {
// Create collection operation
// { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc
fixUpInfo.collectionsToDrop.insert(ns);
return Status::OK();
} else if (cmdname == "drop") {
string ns = nss.db().toString() + '.' + first.valuestr();
fixUpInfo.collectionsToResyncData.insert(ns);
return Status::OK();
} else if (cmdname == "dropIndexes" || cmdname == "deleteIndexes") {
// TODO: this is bad. we simply full resync the collection here,
// which could be very slow.
warning() << "rollback of dropIndexes is slow in this version of "
<< "mongod";
string ns = nss.db().toString() + '.' + first.valuestr();
fixUpInfo.collectionsToResyncData.insert(ns);
return Status::OK();
} else if (cmdname == "renameCollection") {
// TODO: slow.
warning() << "rollback of renameCollection is slow in this version of "
<< "mongod";
string from = first.valuestr();
string to = obj["to"].String();
fixUpInfo.collectionsToResyncData.insert(from);
fixUpInfo.collectionsToResyncData.insert(to);
return Status::OK();
} else if (cmdname == "dropDatabase") {
severe() << "rollback : can't rollback drop database full resync "
<< "will be required";
log() << obj.toString();
throw RSFatalException();
} else if (cmdname == "collMod") {
const auto ns = NamespaceString(cmd->parseNs(nss.db().toString(), obj));
for (auto field : obj) {
const auto modification = field.fieldNameStringData();
if (modification == cmdname) {
continue; // Skipping command name.
}
if (modification == "validator" || modification == "validationAction" ||
modification == "validationLevel" || modification == "usePowerOf2Sizes" ||
modification == "noPadding") {
fixUpInfo.collectionsToResyncMetadata.insert(ns.ns());
continue;
}
severe() << "cannot rollback a collMod command: " << redact(obj);
throw RSFatalException();
}
return Status::OK();
} else if (cmdname == "applyOps") {
if (first.type() != Array) {
std::string message = str::stream()
<< "Expected applyOps argument to be an array; found " << redact(first);
severe() << message;
return Status(ErrorCodes::UnrecoverableRollbackError, message);
}
for (const auto& subopElement : first.Array()) {
if (subopElement.type() != Object) {
std::string message = str::stream()
<< "Expected applyOps operations to be of Object type, but found "
<< redact(subopElement);
severe() << message;
return Status(ErrorCodes::UnrecoverableRollbackError, message);
}
auto subStatus = updateFixUpInfoFromLocalOplogEntry(fixUpInfo, subopElement.Obj());
if (!subStatus.isOK()) {
return subStatus;
}
}
return Status::OK();
} else {
severe() << "can't rollback this command yet: " << redact(obj);
log() << "cmdname=" << cmdname;
throw RSFatalException();
}
}
NamespaceString nss(doc.ns);
if (nss.isSystemDotIndexes()) {
if (*op != 'i') {
severe() << "Unexpected operation type '" << *op << "' on system.indexes operation, "
<< "document: " << redact(doc.ownedObj);
throw RSFatalException();
}
string objNs;
auto status = bsonExtractStringField(obj, "ns", &objNs);
if (!status.isOK()) {
severe() << "Missing collection namespace in system.indexes operation, document: "
<< redact(doc.ownedObj);
throw RSFatalException();
}
NamespaceString objNss(objNs);
if (!objNss.isValid()) {
severe() << "Invalid collection namespace in system.indexes operation, document: "
<< redact(doc.ownedObj);
throw RSFatalException();
}
string indexName;
status = bsonExtractStringField(obj, "name", &indexName);
if (!status.isOK()) {
severe() << "Missing index name in system.indexes operation, document: "
<< redact(doc.ownedObj);
throw RSFatalException();
}
using ValueType = multimap::value_type;
ValueType pairToInsert = std::make_pair(objNs, indexName);
auto lowerBound = fixUpInfo.indexesToDrop.lower_bound(objNs);
auto upperBound = fixUpInfo.indexesToDrop.upper_bound(objNs);
auto matcher = [pairToInsert](const ValueType& pair) { return pair == pairToInsert; };
if (!std::count_if(lowerBound, upperBound, matcher)) {
fixUpInfo.indexesToDrop.insert(pairToInsert);
}
return Status::OK();
}
doc._id = obj["_id"];
if (doc._id.eoo()) {
severe() << "cannot rollback op with no _id. ns: " << doc.ns
<< ", document: " << redact(doc.ownedObj);
throw RSFatalException();
}
fixUpInfo.docsToRefetch.insert(doc);
return Status::OK();
}
namespace {
/**
* This must be called before making any changes to our local data and after fetching any
* information from the upstream node. If any information is fetched from the upstream node after we
* have written locally, the function must be called again.
*/
void checkRbidAndUpdateMinValid(OperationContext* opCtx,
const int rbid,
const RollbackSource& rollbackSource,
StorageInterface* storageInterface) {
// It is important that the steps are performed in order to avoid racing with upstream rollbacks
//
// 1) Get the last doc in their oplog.
// 2) Get their RBID and fail if it has changed.
// 3) Set our minValid to the previously fetched OpTime of the top of their oplog.
const auto newMinValidDoc = rollbackSource.getLastOperation();
if (newMinValidDoc.isEmpty()) {
uasserted(40361, "rollback error newest oplog entry on source is missing or empty");
}
if (rbid != rollbackSource.getRollbackId()) {
// Our source rolled back itself so the data we received isn't necessarily consistent.
uasserted(40365, "rollback rbid on source changed during rollback, canceling this attempt");
}
// we have items we are writing that aren't from a point-in-time. thus best not to come
// online until we get to that point in freshness.
OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValidDoc));
log() << "Setting minvalid to " << minValid;
storageInterface->setAppliedThrough(opCtx, {}); // Use top of oplog.
storageInterface->setMinValid(opCtx, minValid);
if (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) {
// This log output is used in js tests so please leave it.
log() << "rollback - rollbackHangThenFailAfterWritingMinValid fail point "
"enabled. Blocking until fail point is disabled.";
while (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) {
invariant(!globalInShutdownDeprecated()); // It is an error to shutdown while enabled.
mongo::sleepsecs(1);
}
uasserted(40378,
"failing rollback due to rollbackHangThenFailAfterWritingMinValid fail point");
}
}
void syncFixUp(OperationContext* opCtx,
const FixUpInfo& fixUpInfo,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface) {
// fetch all first so we needn't handle interruption in a fancy way
unsigned long long totalSize = 0;
// namespace -> doc id -> doc
map> goodVersions;
// fetch all the goodVersions of each document from current primary
unsigned long long numFetched = 0;
for (auto&& doc : fixUpInfo.docsToRefetch) {
invariant(!doc._id.eoo()); // This is checked when we insert to the set.
try {
// TODO : slow. lots of round trips.
numFetched++;
BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap());
totalSize += good.objsize();
if (totalSize >= 300 * 1024 * 1024) {
throw RSFatalException("replSet too much data to roll back");
}
// Note good might be empty, indicating we should delete it.
goodVersions[doc.ns][doc] = good;
} catch (const DBException& ex) {
// If the collection turned into a view, we might get an error trying to
// refetch documents, but these errors should be ignored, as we'll be creating
// the view during oplog replay.
if (ex.getCode() == ErrorCodes::CommandNotSupportedOnView)
continue;
log() << "rollback couldn't re-get from ns: " << doc.ns << " _id: " << redact(doc._id)
<< ' ' << numFetched << '/' << fixUpInfo.docsToRefetch.size() << ": "
<< redact(ex);
throw;
}
}
log() << "rollback 3.5";
checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, storageInterface);
// update them
log() << "rollback 4 n:" << goodVersions.size();
invariant(!fixUpInfo.commonPointOurDiskloc.isNull());
// any full collection resyncs required?
if (!fixUpInfo.collectionsToResyncData.empty() ||
!fixUpInfo.collectionsToResyncMetadata.empty()) {
for (const string& ns : fixUpInfo.collectionsToResyncData) {
log() << "rollback 4.1.1 coll resync " << ns;
invariant(!fixUpInfo.indexesToDrop.count(ns));
invariant(!fixUpInfo.collectionsToResyncMetadata.count(ns));
const NamespaceString nss(ns);
{
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
Database* db = dbHolder().openDb(opCtx, nss.db().toString());
invariant(db);
WriteUnitOfWork wunit(opCtx);
fassertStatusOK(40359, db->dropCollectionEvenIfSystem(opCtx, nss));
wunit.commit();
}
rollbackSource.copyCollectionFromRemote(opCtx, nss);
}
for (const string& ns : fixUpInfo.collectionsToResyncMetadata) {
log() << "rollback 4.1.2 coll metadata resync " << ns;
const NamespaceString nss(ns);
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
auto db = dbHolder().openDb(opCtx, nss.db().toString());
invariant(db);
auto collection = db->getCollection(ns);
invariant(collection);
auto cce = collection->getCatalogEntry();
auto infoResult = rollbackSource.getCollectionInfo(nss);
if (!infoResult.isOK()) {
// Collection dropped by "them" so we can't correctly change it here. If we get to
// the roll-forward phase, we will drop it then. If the drop is rolled-back upstream
// and we restart, we will be expected to still have the collection.
log() << ns << " not found on remote host, so not rolling back collmod operation."
" We will drop the collection soon.";
continue;
}
auto info = infoResult.getValue();
CollectionOptions options;
if (auto optionsField = info["options"]) {
if (optionsField.type() != Object) {
throw RSFatalException(str::stream() << "Failed to parse options " << info
<< ": expected 'options' to be an "
<< "Object, got "
<< typeName(optionsField.type()));
}
auto status = options.parse(optionsField.Obj(), CollectionOptions::parseForCommand);
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to parse options " << info
<< ": "
<< status.toString());
}
// TODO(SERVER-27992): Set options.uuid.
} else {
// Use default options.
}
WriteUnitOfWork wuow(opCtx);
if (options.flagsSet || cce->getCollectionOptions(opCtx).flagsSet) {
cce->updateFlags(opCtx, options.flags);
}
auto status = collection->setValidator(opCtx, options.validator);
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to set validator: "
<< status.toString());
}
status = collection->setValidationAction(opCtx, options.validationAction);
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to set validationAction: "
<< status.toString());
}
status = collection->setValidationLevel(opCtx, options.validationLevel);
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to set validationLevel: "
<< status.toString());
}
wuow.commit();
}
// we did more reading from primary, so check it again for a rollback (which would mess
// us up), and make minValid newer.
log() << "rollback 4.2";
checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, storageInterface);
}
log() << "rollback 4.6";
// drop collections to drop before doing individual fixups
for (set::iterator it = fixUpInfo.collectionsToDrop.begin();
it != fixUpInfo.collectionsToDrop.end();
it++) {
log() << "rollback drop: " << *it;
invariant(!fixUpInfo.indexesToDrop.count(*it));
const NamespaceString nss(*it);
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
Database* db = dbHolder().get(opCtx, nsToDatabaseSubstring(*it));
if (db) {
Helpers::RemoveSaver removeSaver("rollback", "", *it);
// perform a collection scan and write all documents in the collection to disk
std::unique_ptr exec(InternalPlanner::collectionScan(
opCtx, *it, db->getCollection(*it), PlanExecutor::YIELD_AUTO));
BSONObj curObj;
PlanExecutor::ExecState execState;
while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) {
auto status = removeSaver.goingToDelete(curObj);
if (!status.isOK()) {
severe() << "rolling back createCollection on " << *it
<< " failed to write document to remove saver file: " << status;
throw RSFatalException();
}
}
if (execState != PlanExecutor::IS_EOF) {
if (execState == PlanExecutor::FAILURE &&
WorkingSetCommon::isValidStatusMemberObject(curObj)) {
Status errorStatus = WorkingSetCommon::getMemberObjectStatus(curObj);
severe() << "rolling back createCollection on " << *it << " failed with "
<< errorStatus << ". A full resync is necessary.";
} else {
severe() << "rolling back createCollection on " << *it
<< " failed. A full resync is necessary.";
}
throw RSFatalException();
}
WriteUnitOfWork wunit(opCtx);
fassertStatusOK(40360, db->dropCollectionEvenIfSystem(opCtx, nss));
wunit.commit();
}
}
// Drop indexes.
for (auto it = fixUpInfo.indexesToDrop.begin(); it != fixUpInfo.indexesToDrop.end(); it++) {
const NamespaceString nss(it->first);
const string& indexName = it->second;
log() << "rollback drop index: collection: " << nss.toString() << ". index: " << indexName;
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
auto db = dbHolder().get(opCtx, nss.db());
if (!db) {
continue;
}
auto collection = db->getCollection(nss.toString());
if (!collection) {
continue;
}
auto indexCatalog = collection->getIndexCatalog();
if (!indexCatalog) {
continue;
}
bool includeUnfinishedIndexes = false;
auto indexDescriptor =
indexCatalog->findIndexByName(opCtx, indexName, includeUnfinishedIndexes);
if (!indexDescriptor) {
warning() << "rollback failed to drop index " << indexName << " in " << nss.toString()
<< ": index not found";
continue;
}
WriteUnitOfWork wunit(opCtx);
auto status = indexCatalog->dropIndex(opCtx, indexDescriptor);
if (!status.isOK()) {
severe() << "rollback failed to drop index " << indexName << " in " << nss.toString()
<< ": " << status;
throw RSFatalException();
}
wunit.commit();
}
log() << "rollback 4.7";
unsigned deletes = 0, updates = 0;
time_t lastProgressUpdate = time(0);
time_t progressUpdateGap = 10;
for (const auto& nsAndGoodVersionsByDocID : goodVersions) {
// Keep an archive of items rolled back if the collection has not been dropped
// while rolling back createCollection operations.
const auto& ns = nsAndGoodVersionsByDocID.first;
unique_ptr removeSaver;
invariant(!fixUpInfo.collectionsToDrop.count(ns));
removeSaver.reset(new Helpers::RemoveSaver("rollback", "", ns));
const auto& goodVersionsByDocID = nsAndGoodVersionsByDocID.second;
for (const auto& idAndDoc : goodVersionsByDocID) {
time_t now = time(0);
if (now - lastProgressUpdate > progressUpdateGap) {
log() << deletes << " delete and " << updates
<< " update operations processed out of " << goodVersions.size()
<< " total operations";
lastProgressUpdate = now;
}
const DocID& doc = idAndDoc.first;
BSONObj pattern = doc._id.wrap(); // { _id : ... }
try {
verify(doc.ns && *doc.ns);
invariant(!fixUpInfo.collectionsToResyncData.count(doc.ns));
// TODO: Lots of overhead in context. This can be faster.
const NamespaceString docNss(doc.ns);
Lock::DBLock docDbLock(opCtx, docNss.db(), MODE_X);
OldClientContext ctx(opCtx, doc.ns);
Collection* collection = ctx.db()->getCollection(doc.ns);
// Add the doc to our rollback file if the collection was not dropped while
// rolling back createCollection operations.
// Do not log an error when undoing an insert on a no longer existent
// collection.
// It is likely that the collection was dropped as part of rolling back a
// createCollection command and regardless, the document no longer exists.
if (collection && removeSaver) {
BSONObj obj;
bool found = Helpers::findOne(opCtx, collection, pattern, obj, false);
if (found) {
auto status = removeSaver->goingToDelete(obj);
if (!status.isOK()) {
severe() << "rollback cannot write document in namespace " << doc.ns
<< " to archive file: " << redact(status);
throw RSFatalException();
}
} else {
error() << "rollback cannot find object: " << pattern << " in namespace "
<< doc.ns;
}
}
if (idAndDoc.second.isEmpty()) {
// wasn't on the primary; delete.
// TODO 1.6 : can't delete from a capped collection. need to handle that
// here.
deletes++;
if (collection) {
if (collection->isCapped()) {
// can't delete from a capped collection - so we truncate instead.
// if
// this item must go, so must all successors!!!
try {
// TODO: IIRC cappedTruncateAfter does not handle completely
// empty.
// this will crazy slow if no _id index.
const auto clock = opCtx->getServiceContext()->getFastClockSource();
const auto findOneStart = clock->now();
RecordId loc = Helpers::findOne(opCtx, collection, pattern, false);
if (clock->now() - findOneStart > Milliseconds(200))
warning() << "roll back slow no _id index for " << doc.ns
<< " perhaps?";
// would be faster but requires index:
// RecordId loc = Helpers::findById(nsd, pattern);
if (!loc.isNull()) {
try {
collection->cappedTruncateAfter(opCtx, loc, true);
} catch (const DBException& e) {
if (e.getCode() == 13415) {
// hack: need to just make cappedTruncate do this...
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(opCtx);
uassertStatusOK(collection->truncate(opCtx));
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
opCtx, "truncate", collection->ns().ns());
} else {
throw e;
}
}
}
} catch (const DBException& e) {
// Replicated capped collections have many ways to become
// inconsistent. We rely on age-out to make these problems go away
// eventually.
warning() << "ignoring failure to roll back change to capped "
<< "collection " << doc.ns << " with _id "
<< redact(idAndDoc.first._id.toString(
/*includeFieldName*/ false))
<< ": " << redact(e);
}
} else {
deleteObjects(opCtx,
collection,
doc.ns,
pattern,
PlanExecutor::YIELD_MANUAL,
true, // justone
true); // god
}
}
} else {
// TODO faster...
updates++;
const NamespaceString requestNs(doc.ns);
UpdateRequest request(requestNs);
request.setQuery(pattern);
request.setUpdates(idAndDoc.second);
request.setGod();
request.setUpsert();
UpdateLifecycleImpl updateLifecycle(requestNs);
request.setLifecycle(&updateLifecycle);
update(opCtx, ctx.db(), request);
}
} catch (const DBException& e) {
log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' '
<< redact(e) << " ndeletes:" << deletes;
throw;
}
}
}
log() << "rollback 5 d:" << deletes << " u:" << updates;
log() << "rollback 6";
// clean up oplog
LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toString();
{
const NamespaceString oplogNss(rsOplogName);
Lock::DBLock oplogDbLock(opCtx, oplogNss.db(), MODE_IX);
Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X);
OldClientContext ctx(opCtx, rsOplogName);
Collection* oplogCollection = ctx.db()->getCollection(rsOplogName);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(13423,
Status(ErrorCodes::UnrecoverableRollbackError,
str::stream() << "Can't find " << rsOplogName));
}
// TODO: fatal error if this throws?
oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false);
}
Status status = getGlobalAuthorizationManager()->initialize(opCtx);
if (!status.isOK()) {
severe() << "Failed to reinitialize auth data after rollback: " << status;
fassertFailedNoTrace(40366);
}
// Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the
// lastAppliedHash value in bgsync to reflect our new last op.
replCoord->resetLastOpTimesFromOplog(opCtx);
log() << "rollback done";
}
Status _syncRollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface) {
invariant(!opCtx->lockState()->isLocked());
FixUpInfo how;
log() << "rollback 1";
how.rbid = rollbackSource.getRollbackId();
uassert(
40362, "Upstream node rolled back. Need to retry our rollback.", how.rbid == requiredRBID);
log() << "rollback 2 FindCommonPoint";
try {
auto processOperationForFixUp = [&how](const BSONObj& operation) {
return updateFixUpInfoFromLocalOplogEntry(how, operation);
};
auto res = syncRollBackLocalOperations(
localOplog, rollbackSource.getOplog(), processOperationForFixUp);
if (!res.isOK()) {
const auto status = res.getStatus();
switch (status.code()) {
case ErrorCodes::OplogStartMissing:
case ErrorCodes::UnrecoverableRollbackError:
return status;
default:
throw RSFatalException(status.toString());
}
}
how.commonPoint = res.getValue().first;
how.commonPointOurDiskloc = res.getValue().second;
how.removeRedundantOperations();
} catch (const RSFatalException& e) {
return Status(ErrorCodes::UnrecoverableRollbackError,
str::stream()
<< "need to rollback, but unable to determine common point between"
" local and remote oplog: "
<< e.what(),
18752);
}
log() << "rollback common point is " << how.commonPoint;
log() << "rollback 3 fixup";
try {
ON_BLOCK_EXIT([&] { replCoord->incrementRollbackID(); });
syncFixUp(opCtx, how, rollbackSource, replCoord, storageInterface);
} catch (const RSFatalException& e) {
return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753);
}
if (MONGO_FAIL_POINT(rollbackHangBeforeFinish)) {
// This log output is used in js tests so please leave it.
log() << "rollback - rollbackHangBeforeFinish fail point "
"enabled. Blocking until fail point is disabled.";
while (MONGO_FAIL_POINT(rollbackHangBeforeFinish)) {
invariant(!globalInShutdownDeprecated()); // It is an error to shutdown while enabled.
mongo::sleepsecs(1);
}
}
return Status::OK();
}
} // namespace
Status syncRollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface) {
invariant(opCtx);
invariant(replCoord);
log() << "beginning rollback" << rsLog;
DisableDocumentValidation validationDisabler(opCtx);
UnreplicatedWritesBlock replicationDisabler(opCtx);
Status status =
_syncRollback(opCtx, localOplog, rollbackSource, requiredRBID, replCoord, storageInterface);
log() << "rollback finished" << rsLog;
return status;
}
void rollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface,
stdx::function sleepSecsFn) {
// Set state to ROLLBACK while we are in this function. This prevents serving reads, even from
// the oplog. This can fail if we are elected PRIMARY, in which case we better not do any
// rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or
// after transitioning to RECOVERING. We always transition to RECOVERING regardless of success
// or (recoverable) failure since we may be in an inconsistent state. If rollback failed before
// writing anything, SyncTail will quickly take us to SECONDARY since are are still at our
// original MinValid, which is fine because we may choose a sync source that doesn't require
// rollback. If it failed after we wrote to MinValid, then we will pick a sync source that will
// cause us to roll back to the same common point, which is fine. If we succeeded, we will be
// consistent as soon as we apply up to/through MinValid and SyncTail will make us SECONDARY
// then.
{
log() << "rollback 0";
Lock::GlobalWrite globalWrite(opCtx);
if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) {
log() << "Cannot transition from " << replCoord->getMemberState().toString() << " to "
<< MemberState(MemberState::RS_ROLLBACK).toString();
return;
}
}
try {
auto status = syncRollback(
opCtx, localOplog, rollbackSource, requiredRBID, replCoord, storageInterface);
// Abort only when syncRollback detects we are in a unrecoverable state.
// WARNING: these statuses sometimes have location codes which are lost with uassertStatusOK
// so we need to check here first.
if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
severe() << "Unable to complete rollback. A full resync may be needed: "
<< redact(status);
fassertFailedNoTrace(28723);
}
// In other cases, we log the message contained in the error status and retry later.
uassertStatusOK(status);
} catch (const DBException& ex) {
// UnrecoverableRollbackError should only come from a returned status which is handled
// above.
invariant(ex.getCode() != ErrorCodes::UnrecoverableRollbackError);
warning() << "rollback cannot complete at this time (retrying later): " << redact(ex)
<< " appliedThrough=" << replCoord->getMyLastAppliedOpTime()
<< " minvalid=" << storageInterface->getMinValid(opCtx);
// Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If
// we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this
// will also prevent us from hot-looping and putting too much load on upstream nodes.
sleepSecsFn(5); // 5 seconds was chosen as a completely arbitrary amount of time.
} catch (...) {
std::terminate();
}
// At this point we are about to leave rollback. Before we do, wait for any writes done
// as part of rollback to be durable, and then do any necessary checks that we didn't
// wind up rolling back something illegal. We must wait for the rollback to be durable
// so that if we wind up shutting down uncleanly in response to something we rolled back
// we know that we won't wind up right back in the same situation when we start back up
// because the rollback wasn't durable.
opCtx->recoveryUnit()->waitUntilDurable();
// If we detected that we rolled back the shardIdentity document as part of this rollback
// then we must shut down to clear the in-memory ShardingState associated with the
// shardIdentity document.
if (ShardIdentityRollbackNotifier::get(opCtx)->didRollbackHappen()) {
severe() << "shardIdentity document rollback detected. Shutting down to clear "
"in-memory sharding state. Restarting this process should safely return it "
"to a healthy state";
fassertFailedNoTrace(40276);
}
if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
severe() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
<< "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK)
<< " but found self in " << replCoord->getMemberState();
fassertFailedNoTrace(40364);
}
}
} // namespace repl
} // namespace mongo