/* @file rs_rollback.cpp
*
* Copyright (C) 2008-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/rs_rollback.h"
#include
#include
#include "mongo/bson/bsonelement_comparator.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/roll_back_local_operations.h"
#include "mongo/db/repl/rollback_source.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/util/log.h"
/* Scenarios
*
* We went offline with ops not replicated out.
*
* F = node that failed and coming back.
* P = node that took over, new primary
*
* #1:
* F : a b c d e f g
* P : a b c d q
*
* The design is "keep P". One could argue here that "keep F" has some merits, however, in most
* cases P will have significantly more data. Also note that P may have a proper subset of F's
* stream if there were no subsequent writes.
*
* For now the model is simply : get F back in sync with P. If P was really behind or something, we
* should have just chosen not to fail over anyway.
*
* #2:
* F : a b c d e f g -> a b c d
* P : a b c d
*
* #3:
* F : a b c d e f g -> a b c d q r s t u v w x z
* P : a b c d.q r s t u v w x z
*
* Steps
* find an event in common. 'd'.
* undo our events beyond that by:
* (1) taking copy from other server of those objects
* (2) do not consider copy valid until we pass reach an optime after when we fetched the new
* version of object
* -- i.e., reset minvalid.
* (3) we could skip operations on objects that are previous in time to our capture of the object
* as an optimization.
*
*/
namespace mongo {
using std::shared_ptr;
using std::unique_ptr;
using std::endl;
using std::list;
using std::map;
using std::multimap;
using std::set;
using std::string;
using std::pair;
namespace repl {
namespace {
class RSFatalException : public std::exception {
public:
RSFatalException(std::string m = "replica set fatal exception") : msg(m) {}
virtual ~RSFatalException() throw(){};
virtual const char* what() const throw() {
return msg.c_str();
}
private:
std::string msg;
};
struct DocID {
// ns and _id both point into ownedObj's buffer
BSONObj ownedObj;
const char* ns;
BSONElement _id;
bool operator<(const DocID& other) const {
int comp = strcmp(ns, other.ns);
if (comp < 0)
return true;
if (comp > 0)
return false;
const StringData::ComparatorInterface* stringComparator = nullptr;
BSONElementComparator eltCmp(BSONElementComparator::FieldNamesMode::kIgnore,
stringComparator);
return eltCmp.evaluate(_id < other._id);
}
};
struct FixUpInfo {
// note this is a set -- if there are many $inc's on a single document we need to rollback,
// we only need to refetch it once.
set toRefetch;
// collections to drop
set toDrop;
// Indexes to drop.
// Key is collection namespace. Value is name of index to drop.
multimap indexesToDrop;
set collectionsToResyncData;
set collectionsToResyncMetadata;
OpTime commonPoint;
RecordId commonPointOurDiskloc;
int rbid; // remote server's current rollback sequence #
};
Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
const char* op = ourObj.getStringField("op");
if (*op == 'n')
return Status::OK();
if (ourObj.objsize() > 512 * 1024 * 1024)
throw RSFatalException("rollback too large");
DocID doc;
doc.ownedObj = ourObj.getOwned();
doc.ns = doc.ownedObj.getStringField("ns");
if (*doc.ns == '\0') {
warning() << "ignoring op on rollback no ns TODO : " << redact(doc.ownedObj);
return Status::OK();
}
BSONObj obj = doc.ownedObj.getObjectField(*op == 'u' ? "o2" : "o");
if (obj.isEmpty()) {
warning() << "ignoring op on rollback : " << redact(doc.ownedObj);
return Status::OK();
}
if (*op == 'c') {
BSONElement first = obj.firstElement();
NamespaceString nss(doc.ns); // foo.$cmd
string cmdname = first.fieldName();
Command* cmd = Command::findCommand(cmdname.c_str());
if (cmd == NULL) {
severe() << "rollback no such command " << first.fieldName();
return Status(ErrorCodes::UnrecoverableRollbackError,
str::stream() << "rollback no such command " << first.fieldName(),
18751);
}
if (cmdname == "create") {
// Create collection operation
// { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc
fixUpInfo.toDrop.insert(ns);
return Status::OK();
} else if (cmdname == "drop") {
string ns = nss.db().toString() + '.' + first.valuestr();
fixUpInfo.collectionsToResyncData.insert(ns);
return Status::OK();
} else if (cmdname == "dropIndexes" || cmdname == "deleteIndexes") {
// TODO: this is bad. we simply full resync the collection here,
// which could be very slow.
warning() << "rollback of dropIndexes is slow in this version of "
<< "mongod";
string ns = nss.db().toString() + '.' + first.valuestr();
fixUpInfo.collectionsToResyncData.insert(ns);
return Status::OK();
} else if (cmdname == "renameCollection") {
// TODO: slow.
warning() << "rollback of renameCollection is slow in this version of "
<< "mongod";
string from = first.valuestr();
string to = obj["to"].String();
fixUpInfo.collectionsToResyncData.insert(from);
fixUpInfo.collectionsToResyncData.insert(to);
return Status::OK();
} else if (cmdname == "dropDatabase") {
severe() << "rollback : can't rollback drop database full resync "
<< "will be required";
log() << obj.toString();
throw RSFatalException();
} else if (cmdname == "collMod") {
const auto ns = NamespaceString(cmd->parseNs(nss.db().toString(), obj));
for (auto field : obj) {
const auto modification = field.fieldNameStringData();
if (modification == cmdname) {
continue; // Skipping command name.
}
if (modification == "validator" || modification == "validationAction" ||
modification == "validationLevel" || modification == "usePowerOf2Sizes" ||
modification == "noPadding") {
fixUpInfo.collectionsToResyncMetadata.insert(ns.ns());
continue;
}
severe() << "cannot rollback a collMod command: " << redact(obj);
throw RSFatalException();
}
return Status::OK();
} else if (cmdname == "applyOps") {
if (first.type() != Array) {
std::string message = str::stream()
<< "Expected applyOps argument to be an array; found " << redact(first);
severe() << message;
return Status(ErrorCodes::UnrecoverableRollbackError, message);
}
for (const auto& subopElement : first.Array()) {
if (subopElement.type() != Object) {
std::string message = str::stream()
<< "Expected applyOps operations to be of Object type, but found "
<< redact(subopElement);
severe() << message;
return Status(ErrorCodes::UnrecoverableRollbackError, message);
}
auto subStatus = refetch(fixUpInfo, subopElement.Obj());
if (!subStatus.isOK()) {
return subStatus;
}
}
return Status::OK();
} else {
severe() << "can't rollback this command yet: " << redact(obj);
log() << "cmdname=" << cmdname;
throw RSFatalException();
}
}
NamespaceString nss(doc.ns);
if (nss.isSystemDotIndexes()) {
if (*op != 'i') {
severe() << "Unexpected operation type '" << *op << "' on system.indexes operation, "
<< "document: " << redact(doc.ownedObj);
throw RSFatalException();
}
string objNs;
auto status = bsonExtractStringField(obj, "ns", &objNs);
if (!status.isOK()) {
severe() << "Missing collection namespace in system.indexes operation, document: "
<< redact(doc.ownedObj);
throw RSFatalException();
}
NamespaceString objNss(objNs);
if (!objNss.isValid()) {
severe() << "Invalid collection namespace in system.indexes operation, document: "
<< redact(doc.ownedObj);
throw RSFatalException();
}
string indexName;
status = bsonExtractStringField(obj, "name", &indexName);
if (!status.isOK()) {
severe() << "Missing index name in system.indexes operation, document: "
<< redact(doc.ownedObj);
throw RSFatalException();
}
using ValueType = multimap::value_type;
ValueType pairToInsert = std::make_pair(objNs, indexName);
auto lowerBound = fixUpInfo.indexesToDrop.lower_bound(objNs);
auto upperBound = fixUpInfo.indexesToDrop.upper_bound(objNs);
auto matcher = [pairToInsert](const ValueType& pair) { return pair == pairToInsert; };
if (!std::count_if(lowerBound, upperBound, matcher)) {
fixUpInfo.indexesToDrop.insert(pairToInsert);
}
return Status::OK();
}
doc._id = obj["_id"];
if (doc._id.eoo()) {
severe() << "cannot rollback op with no _id. ns: " << doc.ns
<< ", document: " << redact(doc.ownedObj);
throw RSFatalException();
}
fixUpInfo.toRefetch.insert(doc);
return Status::OK();
}
void syncFixUp(OperationContext* txn,
FixUpInfo& fixUpInfo,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord) {
// fetch all first so we needn't handle interruption in a fancy way
unsigned long long totalSize = 0;
// namespace -> doc id -> doc
map> goodVersions;
BSONObj newMinValid;
// fetch all the goodVersions of each document from current primary
DocID doc;
unsigned long long numFetched = 0;
try {
for (set::iterator it = fixUpInfo.toRefetch.begin(); it != fixUpInfo.toRefetch.end();
it++) {
doc = *it;
verify(!doc._id.eoo());
{
// TODO : slow. lots of round trips.
numFetched++;
BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap());
totalSize += good.objsize();
uassert(13410, "replSet too much data to roll back", totalSize < 300 * 1024 * 1024);
// note good might be eoo, indicating we should delete it
goodVersions[doc.ns][doc] = good;
}
}
newMinValid = rollbackSource.getLastOperation();
if (newMinValid.isEmpty()) {
error() << "rollback error newMinValid empty?";
return;
}
} catch (const DBException& e) {
LOG(1) << "rollback re-get objects: " << redact(e);
error() << "rollback couldn't re-get ns:" << doc.ns << " _id:" << redact(doc._id) << ' '
<< numFetched << '/' << fixUpInfo.toRefetch.size();
throw e;
}
log() << "rollback 3.5";
if (fixUpInfo.rbid != rollbackSource.getRollbackId()) {
// Our source rolled back itself so the data we received isn't necessarily consistent.
warning() << "rollback rbid on source changed during rollback, "
<< "cancelling this attempt";
return;
}
// update them
log() << "rollback 4 n:" << goodVersions.size();
bool warn = false;
invariant(!fixUpInfo.commonPointOurDiskloc.isNull());
// we have items we are writing that aren't from a point-in-time. thus best not to come
// online until we get to that point in freshness.
// TODO this is still wrong because we don't record that we are in rollback, and we can't really
// recover.
OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
StorageInterface::get(txn)->setAppliedThrough(txn, {}); // Use top of oplog.
StorageInterface::get(txn)->setMinValid(txn, minValid);
// any full collection resyncs required?
if (!fixUpInfo.collectionsToResyncData.empty() ||
!fixUpInfo.collectionsToResyncMetadata.empty()) {
for (const string& ns : fixUpInfo.collectionsToResyncData) {
log() << "rollback 4.1.1 coll resync " << ns;
fixUpInfo.indexesToDrop.erase(ns);
fixUpInfo.collectionsToResyncMetadata.erase(ns);
const NamespaceString nss(ns);
{
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X);
Database* db = dbHolder().openDb(txn, nss.db().toString());
invariant(db);
WriteUnitOfWork wunit(txn);
db->dropCollection(txn, ns);
wunit.commit();
}
rollbackSource.copyCollectionFromRemote(txn, nss);
}
for (const string& ns : fixUpInfo.collectionsToResyncMetadata) {
log() << "rollback 4.1.2 coll metadata resync " << ns;
const NamespaceString nss(ns);
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X);
auto db = dbHolder().openDb(txn, nss.db().toString());
invariant(db);
auto collection = db->getCollection(ns);
invariant(collection);
auto cce = collection->getCatalogEntry();
auto infoResult = rollbackSource.getCollectionInfo(nss);
if (!infoResult.isOK()) {
// Collection dropped by "them" so we should drop it too.
log() << ns << " not found on remote host, dropping";
fixUpInfo.toDrop.insert(ns);
continue;
}
auto info = infoResult.getValue();
CollectionOptions options;
if (auto optionsField = info["options"]) {
if (optionsField.type() != Object) {
throw RSFatalException(str::stream() << "Failed to parse options " << info
<< ": expected 'options' to be an "
<< "Object, got "
<< typeName(optionsField.type()));
}
auto status = options.parse(optionsField.Obj());
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to parse options " << info
<< ": "
<< status.toString());
}
} else {
// Use default options.
}
WriteUnitOfWork wuow(txn);
if (options.flagsSet || cce->getCollectionOptions(txn).flagsSet) {
cce->updateFlags(txn, options.flags);
}
auto status = collection->setValidator(txn, options.validator);
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to set validator: "
<< status.toString());
}
status = collection->setValidationAction(txn, options.validationAction);
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to set validationAction: "
<< status.toString());
}
status = collection->setValidationLevel(txn, options.validationLevel);
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to set validationLevel: "
<< status.toString());
}
wuow.commit();
}
// we did more reading from primary, so check it again for a rollback (which would mess
// us up), and make minValid newer.
log() << "rollback 4.2";
string err;
try {
newMinValid = rollbackSource.getLastOperation();
if (newMinValid.isEmpty()) {
err = "can't get minvalid from sync source";
} else {
OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
StorageInterface::get(txn)->setMinValid(txn, minValid);
StorageInterface::get(txn)->setAppliedThrough(txn, fixUpInfo.commonPoint);
}
} catch (const DBException& e) {
err = "can't get/set minvalid: ";
err += e.what();
}
if (fixUpInfo.rbid != rollbackSource.getRollbackId()) {
// our source rolled back itself. so the data we received isn't necessarily
// consistent. however, we've now done writes. thus we have a problem.
err += "rbid at primary changed during resync/rollback";
}
if (!err.empty()) {
severe() << "rolling back : " << redact(err) << ". A full resync will be necessary.";
// TODO: reset minvalid so that we are permanently in fatal state
// TODO: don't be fatal, but rather, get all the data first.
throw RSFatalException();
}
log() << "rollback 4.3";
}
log() << "rollback 4.6";
// drop collections to drop before doing individual fixups - that might make things faster
// below actually if there were subsequent inserts to rollback
for (set::iterator it = fixUpInfo.toDrop.begin(); it != fixUpInfo.toDrop.end(); it++) {
log() << "rollback drop: " << *it;
fixUpInfo.indexesToDrop.erase(*it);
ScopedTransaction transaction(txn, MODE_IX);
const NamespaceString nss(*it);
Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X);
Database* db = dbHolder().get(txn, nsToDatabaseSubstring(*it));
if (db) {
WriteUnitOfWork wunit(txn);
Helpers::RemoveSaver removeSaver("rollback", "", *it);
// perform a collection scan and write all documents in the collection to disk
std::unique_ptr exec(InternalPlanner::collectionScan(
txn, *it, db->getCollection(*it), PlanExecutor::YIELD_MANUAL));
BSONObj curObj;
PlanExecutor::ExecState execState;
while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) {
auto status = removeSaver.goingToDelete(curObj);
if (!status.isOK()) {
severe() << "rolling back createCollection on " << *it
<< " failed to write document to remove saver file: " << status;
throw RSFatalException();
}
}
if (execState != PlanExecutor::IS_EOF) {
if (execState == PlanExecutor::FAILURE &&
WorkingSetCommon::isValidStatusMemberObject(curObj)) {
Status errorStatus = WorkingSetCommon::getMemberObjectStatus(curObj);
severe() << "rolling back createCollection on " << *it << " failed with "
<< errorStatus << ". A full resync is necessary.";
} else {
severe() << "rolling back createCollection on " << *it
<< " failed. A full resync is necessary.";
}
throw RSFatalException();
}
db->dropCollection(txn, *it);
wunit.commit();
}
}
// Drop indexes.
for (auto it = fixUpInfo.indexesToDrop.begin(); it != fixUpInfo.indexesToDrop.end(); it++) {
const NamespaceString nss(it->first);
const string& indexName = it->second;
log() << "rollback drop index: collection: " << nss.toString() << ". index: " << indexName;
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X);
auto db = dbHolder().get(txn, nss.db());
if (!db) {
continue;
}
auto collection = db->getCollection(nss.toString());
if (!collection) {
continue;
}
auto indexCatalog = collection->getIndexCatalog();
if (!indexCatalog) {
continue;
}
bool includeUnfinishedIndexes = false;
auto indexDescriptor =
indexCatalog->findIndexByName(txn, indexName, includeUnfinishedIndexes);
if (!indexDescriptor) {
warning() << "rollback failed to drop index " << indexName << " in " << nss.toString()
<< ": index not found";
continue;
}
WriteUnitOfWork wunit(txn);
auto status = indexCatalog->dropIndex(txn, indexDescriptor);
if (!status.isOK()) {
severe() << "rollback failed to drop index " << indexName << " in " << nss.toString()
<< ": " << status;
throw RSFatalException();
}
wunit.commit();
}
log() << "rollback 4.7";
unsigned deletes = 0, updates = 0;
time_t lastProgressUpdate = time(0);
time_t progressUpdateGap = 10;
for (const auto& nsAndGoodVersionsByDocID : goodVersions) {
// Keep an archive of items rolled back if the collection has not been dropped
// while rolling back createCollection operations.
const auto& ns = nsAndGoodVersionsByDocID.first;
unique_ptr removeSaver;
if (!fixUpInfo.toDrop.count(ns)) {
removeSaver.reset(new Helpers::RemoveSaver("rollback", "", ns));
}
const auto& goodVersionsByDocID = nsAndGoodVersionsByDocID.second;
for (const auto& idAndDoc : goodVersionsByDocID) {
time_t now = time(0);
if (now - lastProgressUpdate > progressUpdateGap) {
log() << deletes << " delete and " << updates
<< " update operations processed out of " << goodVersions.size()
<< " total operations";
lastProgressUpdate = now;
}
const DocID& doc = idAndDoc.first;
BSONObj pattern = doc._id.wrap(); // { _id : ... }
try {
verify(doc.ns && *doc.ns);
if (fixUpInfo.collectionsToResyncData.count(doc.ns)) {
// We just synced this entire collection.
continue;
}
// TODO: Lots of overhead in context. This can be faster.
const NamespaceString docNss(doc.ns);
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock docDbLock(txn->lockState(), docNss.db(), MODE_X);
OldClientContext ctx(txn, doc.ns);
Collection* collection = ctx.db()->getCollection(doc.ns);
// Add the doc to our rollback file if the collection was not dropped while
// rolling back createCollection operations.
// Do not log an error when undoing an insert on a no longer existent
// collection.
// It is likely that the collection was dropped as part of rolling back a
// createCollection command and regardless, the document no longer exists.
if (collection && removeSaver) {
BSONObj obj;
bool found = Helpers::findOne(txn, collection, pattern, obj, false);
if (found) {
auto status = removeSaver->goingToDelete(obj);
if (!status.isOK()) {
severe() << "rollback cannot write document in namespace " << doc.ns
<< " to archive file: " << redact(status);
throw RSFatalException();
}
} else {
error() << "rollback cannot find object: " << pattern << " in namespace "
<< doc.ns;
}
}
if (idAndDoc.second.isEmpty()) {
// wasn't on the primary; delete.
// TODO 1.6 : can't delete from a capped collection. need to handle that
// here.
deletes++;
if (collection) {
if (collection->isCapped()) {
// can't delete from a capped collection - so we truncate instead.
// if
// this item must go, so must all successors!!!
try {
// TODO: IIRC cappedTruncateAfter does not handle completely
// empty.
// this will crazy slow if no _id index.
const auto clock = txn->getServiceContext()->getFastClockSource();
const auto findOneStart = clock->now();
RecordId loc = Helpers::findOne(txn, collection, pattern, false);
if (clock->now() - findOneStart > Milliseconds(200))
warning() << "roll back slow no _id index for " << doc.ns
<< " perhaps?";
// would be faster but requires index:
// RecordId loc = Helpers::findById(nsd, pattern);
if (!loc.isNull()) {
try {
collection->temp_cappedTruncateAfter(txn, loc, true);
} catch (const DBException& e) {
if (e.getCode() == 13415) {
// hack: need to just make cappedTruncate do this...
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
uassertStatusOK(collection->truncate(txn));
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
txn, "truncate", collection->ns().ns());
} else {
throw e;
}
}
}
} catch (const DBException& e) {
error() << "rolling back capped collection rec " << doc.ns << ' '
<< redact(e);
}
} else {
deleteObjects(txn,
collection,
doc.ns,
pattern,
PlanExecutor::YIELD_MANUAL,
true, // justone
true); // god
}
// did we just empty the collection? if so let's check if it even
// exists on the source.
if (collection->numRecords(txn) == 0) {
try {
NamespaceString nss(doc.ns);
auto infoResult = rollbackSource.getCollectionInfo(nss);
if (!infoResult.isOK()) {
// we should drop
WriteUnitOfWork wunit(txn);
ctx.db()->dropCollection(txn, doc.ns);
wunit.commit();
}
} catch (const DBException& ex) {
// Failed to run listCollections command on sync source.
// This isn't *that* big a deal, but is bad.
warning() << "rollback error querying for existence of " << doc.ns
<< " at the primary, ignoring: " << ex;
}
}
}
} else {
// TODO faster...
updates++;
const NamespaceString requestNs(doc.ns);
UpdateRequest request(requestNs);
request.setQuery(pattern);
request.setUpdates(idAndDoc.second);
request.setGod();
request.setUpsert();
UpdateLifecycleImpl updateLifecycle(requestNs);
request.setLifecycle(&updateLifecycle);
update(txn, ctx.db(), request);
}
} catch (const DBException& e) {
log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' '
<< redact(e) << " ndeletes:" << deletes;
warn = true;
}
}
}
log() << "rollback 5 d:" << deletes << " u:" << updates;
log() << "rollback 6";
// clean up oplog
LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toString();
{
const NamespaceString oplogNss(rsOplogName);
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock oplogDbLock(txn->lockState(), oplogNss.db(), MODE_IX);
Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X);
OldClientContext ctx(txn, rsOplogName);
Collection* oplogCollection = ctx.db()->getCollection(rsOplogName);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(13423,
Status(ErrorCodes::UnrecoverableRollbackError,
str::stream() << "Can't find " << rsOplogName));
}
// TODO: fatal error if this throws?
oplogCollection->temp_cappedTruncateAfter(txn, fixUpInfo.commonPointOurDiskloc, false);
}
Status status = getGlobalAuthorizationManager()->initialize(txn);
if (!status.isOK()) {
warning() << "Failed to reinitialize auth data after rollback: " << status;
warn = true;
}
// Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the
// lastAppliedHash value in bgsync to reflect our new last op.
replCoord->resetLastOpTimesFromOplog(txn);
// done
if (warn)
warning() << "issues during syncRollback, see log";
else
log() << "rollback done";
}
Status _syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord,
const SleepSecondsFn& sleepSecondsFn) {
invariant(!txn->lockState()->isLocked());
log() << "rollback 0";
/** by doing this, we will not service reads (return an error as we aren't in secondary
* state. that perhaps is moot because of the write lock above, but that write lock
* probably gets deferred or removed or yielded later anyway.
*
* also, this is better for status reporting - we know what is happening.
*/
{
Lock::GlobalWrite globalWrite(txn->lockState());
if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) {
return Status(ErrorCodes::OperationFailed,
str::stream() << "Cannot transition from "
<< replCoord->getMemberState().toString()
<< " to "
<< MemberState(MemberState::RS_ROLLBACK).toString());
}
}
FixUpInfo how;
log() << "rollback 1";
how.rbid = rollbackSource.getRollbackId();
{
log() << "rollback 2 FindCommonPoint";
try {
auto processOperationForFixUp = [&how](const BSONObj& operation) {
return refetch(how, operation);
};
auto res = syncRollBackLocalOperations(
localOplog, rollbackSource.getOplog(), processOperationForFixUp);
if (!res.isOK()) {
const auto status = res.getStatus();
switch (status.code()) {
case ErrorCodes::OplogStartMissing:
case ErrorCodes::UnrecoverableRollbackError:
sleepSecondsFn(Seconds(1));
return status;
default:
throw RSFatalException(status.toString());
}
} else {
how.commonPoint = res.getValue().first;
how.commonPointOurDiskloc = res.getValue().second;
}
} catch (const RSFatalException& e) {
error() << redact(e.what());
return Status(ErrorCodes::UnrecoverableRollbackError,
str::stream()
<< "need to rollback, but unable to determine common point between"
" local and remote oplog: "
<< e.what(),
18752);
} catch (const DBException& e) {
warning() << "rollback 2 exception " << redact(e) << "; sleeping 1 min";
sleepSecondsFn(Seconds(60));
throw;
}
}
log() << "rollback 3 fixup";
replCoord->incrementRollbackID();
try {
syncFixUp(txn, how, rollbackSource, replCoord);
} catch (const RSFatalException& e) {
error() << "exception during rollback: " << redact(e.what());
return Status(ErrorCodes::UnrecoverableRollbackError,
str::stream() << "exception during rollback: " << e.what(),
18753);
} catch (...) {
replCoord->incrementRollbackID();
throw;
}
replCoord->incrementRollbackID();
// Success; leave "ROLLBACK" state intact until applier thread has reloaded the new minValid.
// Otherwise, the applier could transition the node to SECONDARY with an out-of-date minValid.
return Status::OK();
}
} // namespace
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord,
const SleepSecondsFn& sleepSecondsFn) {
invariant(txn);
invariant(replCoord);
log() << "beginning rollback" << rsLog;
DisableDocumentValidation validationDisabler(txn);
txn->setReplicatedWrites(false);
Status status = _syncRollback(txn, localOplog, rollbackSource, replCoord, sleepSecondsFn);
log() << "rollback finished" << rsLog;
return status;
}
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord) {
return syncRollback(txn, localOplog, rollbackSource, replCoord, [](Seconds seconds) {
sleepsecs(durationCount(seconds));
});
}
} // namespace repl
} // namespace mongo