/** * @file rs_rollback.cpp * * Copyright (C) 2008-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/rs_rollback.h" #include #include #include "mongo/bson/bsonelement_comparator.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/roll_back_local_operations.h" #include "mongo/db/repl/rollback_source.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { using std::shared_ptr; using std::unique_ptr; using std::endl; using std::list; using std::map; using std::multimap; using std::set; using std::string; using std::pair; namespace repl { using namespace rollback_internal; bool DocID::operator<(const DocID& other) const { int comp = strcmp(ns, other.ns); if (comp < 0) return true; if (comp > 0) return false; const StringData::ComparatorInterface* stringComparator = nullptr; BSONElementComparator eltCmp(BSONElementComparator::FieldNamesMode::kIgnore, stringComparator); return eltCmp.evaluate(_id < other._id); } bool DocID::operator==(const DocID& other) const { // Since this is only used for tests, going with the simple impl that reuses operator< which is // used in the real code. return !(*this < other || other < *this); } void FixUpInfo::removeAllDocsToRefetchFor(const std::string& collection) { docsToRefetch.erase(docsToRefetch.lower_bound(DocID::minFor(collection.c_str())), docsToRefetch.upper_bound(DocID::maxFor(collection.c_str()))); } void FixUpInfo::removeRedundantOperations() { // These loops and their bodies can be done in any order. The final result of the FixUpInfo // members will be the same. for (const auto& collection : collectionsToDrop) { removeAllDocsToRefetchFor(collection); indexesToDrop.erase(collection); collectionsToResyncMetadata.erase(collection); } for (const auto& collection : collectionsToResyncData) { removeAllDocsToRefetchFor(collection); indexesToDrop.erase(collection); collectionsToResyncMetadata.erase(collection); collectionsToDrop.erase(collection); } } Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { // Checks that the oplog entry is smaller than 512 MB. We do not roll back if the // oplog entry is larger than 512 MB. if (ourObj.objsize() > 512 * 1024 * 1024) throw RSFatalException(str::stream() << "Rollback too large, oplog size: " << ourObj.objsize()); auto oplogEntry = OplogEntry(ourObj); NamespaceString nss = oplogEntry.getNamespace(); auto uuid = oplogEntry.getUuid(); if (oplogEntry.getOpType() == OpTypeEnum::kNoop) return Status::OK(); DocID doc; doc.ownedObj = oplogEntry.raw; doc.ns = oplogEntry.raw.getStringField("ns"); if (oplogEntry.getNamespace().isEmpty()) { throw RSFatalException(str::stream() << "Local op on rollback has no ns: " << redact(oplogEntry.toBSON())); } BSONObj obj = oplogEntry.raw.getObjectField(oplogEntry.getOpType() == OpTypeEnum::kUpdate ? "o2" : "o"); if (obj.isEmpty()) { throw RSFatalException(str::stream() << "Local op on rollback has no object field: " << redact(oplogEntry.toBSON())); } if (oplogEntry.getOpType() == OpTypeEnum::kCommand) { // The first element of the object is the name of the command // and the collection it is acting on, e.x. {renameCollection: "test.x"}. BSONElement first = obj.firstElement(); switch (oplogEntry.getCommandType()) { case OplogEntry::CommandType::kCreate: { // Create collection operation // { // ts: ..., // h: ..., // op: "c", // ns: "foo.$cmd", // o: { // create: "abc", ... // } // ... // } string ns = nss.db().toString() + '.' + first.valuestr(); // -> foo.abc fixUpInfo.collectionsToDrop.insert(ns); return Status::OK(); } case OplogEntry::CommandType::kDrop: { // Drop collection operation // { // ts: ..., // h: ..., // op: "c", // ns: "foo.$cmd", // o: { // drop: "abc" // } // ... // } // TODO: Delete this when UUIDs are enabled. (SERVER-29815) if (!uuid) { string ns = nss.db().toString() + '.' + first.valuestr(); fixUpInfo.collectionsToResyncData.insert(ns); return Status::OK(); } string collName = first.valuestr(); fixUpInfo.collectionsToRollBackPendingDrop.emplace( *uuid, std::make_pair(oplogEntry.getOpTime(), collName)); return Status::OK(); } case OplogEntry::CommandType::kDropIndexes: { // TODO: This is bad. We simply full resync the collection here, // which could be very slow. warning() << "Rollback of dropIndexes is slow in this version of " << "mongod."; string ns = nss.db().toString() + '.' + first.valuestr(); fixUpInfo.collectionsToResyncData.insert(ns); return Status::OK(); } case OplogEntry::CommandType::kRenameCollection: { // TODO: Slow. warning() << "Rollback of renameCollection is slow in this version of " << "mongod."; string from = first.valuestr(); string to = obj["to"].String(); fixUpInfo.collectionsToResyncData.insert(from); fixUpInfo.collectionsToResyncData.insert(to); return Status::OK(); } case OplogEntry::CommandType::kDropDatabase: { // Since we wait for all internal collection drops to be committed before recording // a 'dropDatabase' oplog entry, this will always create an empty database. // Creating an empty database doesn't mean anything, so we do nothing. return Status::OK(); } case OplogEntry::CommandType::kCollMod: { const auto ns = nss.db().toString() + '.' + first.valuestr(); // -> foo.abc for (auto field : obj) { // Example collMod obj // o:{ // collMod : "x", // validationLevel : "off", // index: { // name: "indexName_1", // expireAfterSeconds: 600 // } // } const auto modification = field.fieldNameStringData(); if (modification == "collMod") { continue; // Skips the command name. The first field in the obj will be the // command name. } if (modification == "validator" || modification == "validationAction" || modification == "validationLevel" || modification == "usePowerOf2Sizes" || modification == "noPadding") { fixUpInfo.collectionsToResyncMetadata.insert(ns); continue; } // Some collMod fields cannot be rolled back, such as the index field. string message = "Cannot roll back a collMod command: "; severe() << message << redact(obj); throw RSFatalException(message); } return Status::OK(); } case OplogEntry::CommandType::kApplyOps: { if (first.type() != Array) { std::string message = str::stream() << "Expected applyOps argument to be an array; found " << redact(first); severe() << message; return Status(ErrorCodes::UnrecoverableRollbackError, message); } for (const auto& subopElement : first.Array()) { if (subopElement.type() != Object) { std::string message = str::stream() << "Expected applyOps operations to be of Object type, but found " << redact(subopElement); severe() << message; return Status(ErrorCodes::UnrecoverableRollbackError, message); } // In applyOps, the object contains an array of different oplog entries, we call // updateFixUpInfoFromLocalOplogEntry here in order to record the information // needed for rollback that is contained within the applyOps, creating a nested // call. auto subStatus = updateFixUpInfoFromLocalOplogEntry(fixUpInfo, subopElement.Obj()); if (!subStatus.isOK()) { return subStatus; } } return Status::OK(); } default: { std::string message = str::stream() << "Can't roll back this command yet: " << " cmdname = " << first.fieldName(); severe() << message << " document: " << redact(obj); throw RSFatalException(message); } } } if (nss.isSystemDotIndexes()) { if (oplogEntry.getOpType() != OpTypeEnum::kInsert) { std::string message = str::stream() << "Unexpected operation type '" << oplogEntry.raw.getStringField("op") << "' on system.indexes operation, " << "document: "; severe() << message << redact(doc.ownedObj); throw RSFatalException(message); } string objNs; auto status = bsonExtractStringField(obj, "ns", &objNs); if (!status.isOK()) { severe() << "Missing collection namespace in system.indexes operation, document: " << redact(doc.ownedObj); throw RSFatalException("Missing collection namespace in system.indexes operation."); } NamespaceString objNss(objNs); if (!objNss.isValid()) { severe() << "Invalid collection namespace in system.indexes operation, document: " << redact(doc.ownedObj); throw RSFatalException( str::stream() << "Invalid collection namespace in system.indexes operation, namespace: " << doc.ns); } string indexName; status = bsonExtractStringField(obj, "name", &indexName); if (!status.isOK()) { severe() << "Missing index name in system.indexes operation, document: " << redact(doc.ownedObj); throw RSFatalException("Missing index name in system.indexes operation."); } using ValueType = multimap::value_type; ValueType pairToInsert = std::make_pair(objNs, indexName); auto lowerBound = fixUpInfo.indexesToDrop.lower_bound(objNs); auto upperBound = fixUpInfo.indexesToDrop.upper_bound(objNs); auto matcher = [pairToInsert](const ValueType& pair) { return pair == pairToInsert; }; if (!std::count_if(lowerBound, upperBound, matcher)) { fixUpInfo.indexesToDrop.insert(pairToInsert); } return Status::OK(); } doc._id = oplogEntry.getIdElement(); if (doc._id.eoo()) { std::string message = str::stream() << "Cannot roll back op with no _id. ns: " << doc.ns; severe() << message << ", document: " << redact(oplogEntry.toBSON()); throw RSFatalException(message); } fixUpInfo.docsToRefetch.insert(doc); return Status::OK(); } namespace { /** * This must be called before making any changes to our local data and after fetching any * information from the upstream node. If any information is fetched from the upstream node after we * have written locally, the function must be called again. */ void checkRbidAndUpdateMinValid(OperationContext* opCtx, const int rbid, const RollbackSource& rollbackSource, ReplicationProcess* replicationProcess) { // It is important that the steps are performed in order to avoid racing with upstream // rollbacks. // 1. Gets the last doc in their oplog. // 2. Gets their RBID and fail if it has changed. // 3. Sets our minValid to the previously fetched OpTime of the top of their oplog. const auto newMinValidDoc = rollbackSource.getLastOperation(); if (newMinValidDoc.isEmpty()) { uasserted(40500, "rollback error newest oplog entry on source is missing or empty"); } if (rbid != rollbackSource.getRollbackId()) { // Our source rolled back so the data we received is not necessarily consistent. uasserted(40508, "rollback rbid on source changed during rollback, canceling this attempt"); } // We have items we are writing that aren't from a point-in-time. Thus, it is best not to come // online until we get to that point in freshness. In other words, we do not transition from // RECOVERING state to SECONDARY state until we have reached the minValid oplog entry. OpTime minValid = fassertStatusOK(40492, OpTime::parseFromOplogEntry(newMinValidDoc)); log() << "Setting minvalid to " << minValid; replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {}); // Use top of oplog. replicationProcess->getConsistencyMarkers()->setMinValid(opCtx, minValid); if (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) { // This log output is used in jstests so please leave it. log() << "rollback - rollbackHangThenFailAfterWritingMinValid fail point " "enabled. Blocking until fail point is disabled."; while (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) { invariant(!globalInShutdownDeprecated()); // It is an error to shutdown while enabled. mongo::sleepsecs(1); } uasserted(40502, "failing rollback due to rollbackHangThenFailAfterWritingMinValid fail point"); } } void syncFixUp(OperationContext* opCtx, const FixUpInfo& fixUpInfo, const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord, ReplicationProcess* replicationProcess) { unsigned long long totalSize = 0; // namespace -> doc id -> doc map> goodVersions; // Fetches all the goodVersions of each document from the current sync source. unsigned long long numFetched = 0; log() << "Starting refetching documents"; for (auto&& doc : fixUpInfo.docsToRefetch) { invariant(!doc._id.eoo()); // This is checked when we insert to the set. try { LOG(2) << "Refetching document, namespace: " << doc.ns << ", _id: " << redact(doc._id); // TODO : Slow. Lots of round trips. numFetched++; BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap()); totalSize += good.objsize(); // Checks that the total amount of data that needs to be refetched is at most // 300 MB. We do not roll back more than 300 MB of documents in order to // prevent out of memory errors from too much data being stored. See SERVER-23392. if (totalSize >= 300 * 1024 * 1024) { throw RSFatalException("replSet too much data to roll back."); } // Note good might be empty, indicating we should delete it. goodVersions[doc.ns][doc] = good; } catch (const DBException& ex) { // If the collection turned into a view, we might get an error trying to // refetch documents, but these errors should be ignored, as we'll be creating // the view during oplog replay. if (ex.getCode() == ErrorCodes::CommandNotSupportedOnView) continue; log() << "Rollback couldn't re-get from ns: " << doc.ns << " _id: " << redact(doc._id) << ' ' << numFetched << '/' << fixUpInfo.docsToRefetch.size() << ": " << redact(ex); throw; } } log() << "Finished refetching documents. Total size of documents refetched: " << goodVersions.size(); log() << "Checking the RollbackID and updating the MinValid if necessary"; checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess); invariant(!fixUpInfo.commonPointOurDiskloc.isNull()); log() << "Rolling back any collections pending being dropped"; // Roll back any drop-pending collections. This must be done first so that the collection // exists when we attempt to resync its metadata or insert documents into it. for (const auto& collPair : fixUpInfo.collectionsToRollBackPendingDrop) { const auto& optime = collPair.second.first; const auto& collName = collPair.second.second; DropPendingCollectionReaper::get(opCtx)->rollBackDropPendingCollection( opCtx, optime, collName); } // Full collection data and metadata resync. if (!fixUpInfo.collectionsToResyncData.empty() || !fixUpInfo.collectionsToResyncMetadata.empty()) { // Reloads the collection data from the sync source in order // to roll back a drop/dropIndexes/renameCollection operation. for (const string& ns : fixUpInfo.collectionsToResyncData) { log() << "Resyncing collection, namespace: " << ns; invariant(!fixUpInfo.indexesToDrop.count(ns)); invariant(!fixUpInfo.collectionsToResyncMetadata.count(ns)); const NamespaceString nss(ns); { Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); Database* db = dbHolder().openDb(opCtx, nss.db().toString()); invariant(db); WriteUnitOfWork wunit(opCtx); fassertStatusOK(40505, db->dropCollectionEvenIfSystem(opCtx, nss)); wunit.commit(); } rollbackSource.copyCollectionFromRemote(opCtx, nss); } // Retrieves collections from the sync source in order to obtain // the collection flags needed to roll back collMod operations. for (const string& ns : fixUpInfo.collectionsToResyncMetadata) { log() << "Resyncing collection metadata, namespace: " << ns; const NamespaceString nss(ns); Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); auto db = dbHolder().openDb(opCtx, nss.db().toString()); invariant(db); auto collection = db->getCollection(opCtx, nss); invariant(collection); auto cce = collection->getCatalogEntry(); auto infoResult = rollbackSource.getCollectionInfo(nss); if (!infoResult.isOK()) { // The collection was dropped by the sync source so we can't correctly change it // here. If we get to the roll-forward phase, we will drop it then. If the drop // is rolled back upstream and we restart, we expect to still have the // collection. log() << ns << " not found on remote host, so we do not roll back collmod " "operation. Instead, we will drop the collection soon."; continue; } auto info = infoResult.getValue(); CollectionOptions options; // Updates the collection flags. if (auto optionsField = info["options"]) { if (optionsField.type() != Object) { throw RSFatalException(str::stream() << "Failed to parse options " << info << ": expected 'options' to be an " << "Object, got " << typeName(optionsField.type())); } auto status = options.parse(optionsField.Obj(), CollectionOptions::parseForCommand); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to parse options " << info << ": " << status.toString()); } // TODO(SERVER-27992): Set options.uuid. } else { // Use default options. } WriteUnitOfWork wuow(opCtx); // Resets collection user flags such as noPadding and usePowerOf2Sizes. if (options.flagsSet || cce->getCollectionOptions(opCtx).flagsSet) { cce->updateFlags(opCtx, options.flags); } auto status = collection->setValidator(opCtx, options.validator); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to set validator: " << status.toString()); } status = collection->setValidationAction(opCtx, options.validationAction); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to set validationAction: " << status.toString()); } status = collection->setValidationLevel(opCtx, options.validationLevel); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to set validationLevel: " << status.toString()); } wuow.commit(); } // Since we read from the sync source to retrieve the metadata of the // collection, we must check if the sync source rolled back as well as update // minValid if necessary. log() << "Rechecking the Rollback ID and minValid"; checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess); } log() << "Dropping collections to roll back create operations"; // Drops collections before updating individual documents. for (set::iterator it = fixUpInfo.collectionsToDrop.begin(); it != fixUpInfo.collectionsToDrop.end(); it++) { log() << "Dropping collection: " << *it; invariant(!fixUpInfo.indexesToDrop.count(*it)); const NamespaceString nss(*it); Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); Database* db = dbHolder().get(opCtx, nsToDatabaseSubstring(*it)); if (db) { Helpers::RemoveSaver removeSaver("rollback", "", *it); // Performs a collection scan and writes all documents in the collection to disk // in order to keep an archive of items that were rolled back. auto exec = InternalPlanner::collectionScan( opCtx, *it, db->getCollection(opCtx, *it), PlanExecutor::YIELD_AUTO); BSONObj curObj; PlanExecutor::ExecState execState; while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) { auto status = removeSaver.goingToDelete(curObj); if (!status.isOK()) { severe() << "Rolling back createCollection on " << *it << " failed to write document to remove saver file: " << redact(status); throw RSFatalException( "Rolling back createCollection. Failed to write document to remove saver " "file."); } } // If we exited the above for loop with any other execState than IS_EOF, this means that // a FAILURE or DEAD state was returned. If a DEAD state occurred, the collection or // database that we are attempting to save may no longer be valid. If a FAILURE state // was returned, either an unrecoverable error was thrown by exec, or we attempted to // retrieve data that could not be provided by the PlanExecutor. In both of these cases // it is necessary for a full resync of the server. if (execState != PlanExecutor::IS_EOF) { if (execState == PlanExecutor::FAILURE && WorkingSetCommon::isValidStatusMemberObject(curObj)) { Status errorStatus = WorkingSetCommon::getMemberObjectStatus(curObj); severe() << "Rolling back createCollection on " << *it << " failed with " << redact(errorStatus) << ". A full resync is necessary."; throw RSFatalException( "Rolling back createCollection failed. A full resync is necessary."); } else { severe() << "Rolling back createCollection on " << *it << " failed. A full resync is necessary."; throw RSFatalException( "Rolling back createCollection failed. A full resync is necessary."); } } WriteUnitOfWork wunit(opCtx); fassertStatusOK(40504, db->dropCollectionEvenIfSystem(opCtx, nss)); wunit.commit(); } } // Drops indexes. for (auto it = fixUpInfo.indexesToDrop.begin(); it != fixUpInfo.indexesToDrop.end(); it++) { const NamespaceString nss(it->first); const string& indexName = it->second; log() << "Dropping index: collection = " << nss.toString() << ". index = " << indexName; Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); auto db = dbHolder().get(opCtx, nss.db()); // If the db is null, we skip over dropping the index. if (!db) { continue; } auto collection = db->getCollection(opCtx, nss); // If the collection is null, we skip over dropping the index. if (!collection) { continue; } auto indexCatalog = collection->getIndexCatalog(); if (!indexCatalog) { continue; } bool includeUnfinishedIndexes = false; auto indexDescriptor = indexCatalog->findIndexByName(opCtx, indexName, includeUnfinishedIndexes); if (!indexDescriptor) { warning() << "Rollback failed to drop index " << indexName << " in " << nss.toString() << ": index not found."; continue; } WriteUnitOfWork wunit(opCtx); auto status = indexCatalog->dropIndex(opCtx, indexDescriptor); if (!status.isOK()) { severe() << "Rollback failed to drop index " << indexName << " in " << nss.toString() << ": " << redact(status); throw RSFatalException(str::stream() << "Rollback failed to drop index " << indexName << " in " << nss.toString()); } wunit.commit(); } log() << "Deleting and updating documents to roll back insert, update and remove " "operations"; unsigned deletes = 0, updates = 0; time_t lastProgressUpdate = time(0); time_t progressUpdateGap = 10; for (const auto& nsAndGoodVersionsByDocID : goodVersions) { // Keeps an archive of items rolled back if the collection has not been dropped // while rolling back createCollection operations. const auto& ns = nsAndGoodVersionsByDocID.first; unique_ptr removeSaver; invariant(!fixUpInfo.collectionsToDrop.count(ns)); removeSaver.reset(new Helpers::RemoveSaver("rollback", "", ns)); const auto& goodVersionsByDocID = nsAndGoodVersionsByDocID.second; for (const auto& idAndDoc : goodVersionsByDocID) { time_t now = time(0); if (now - lastProgressUpdate > progressUpdateGap) { log() << deletes << " delete and " << updates << " update operations processed out of " << goodVersions.size() << " total operations."; lastProgressUpdate = now; } const DocID& doc = idAndDoc.first; BSONObj pattern = doc._id.wrap(); // { _id : ... } try { verify(doc.ns && *doc.ns); invariant(!fixUpInfo.collectionsToResyncData.count(doc.ns)); // TODO: Lots of overhead in context. This can be faster. const NamespaceString docNss(doc.ns); Lock::DBLock docDbLock(opCtx, docNss.db(), MODE_X); OldClientContext ctx(opCtx, doc.ns); Collection* collection = ctx.db()->getCollection(opCtx, docNss); // Adds the doc to our rollback file if the collection was not dropped while // rolling back createCollection operations. Does not log an error when // undoing an insert on a no longer existing collection. It is likely that // the collection was dropped as part of rolling back a createCollection // command and the document no longer exists. if (collection && removeSaver) { BSONObj obj; bool found = Helpers::findOne(opCtx, collection, pattern, obj, false); if (found) { auto status = removeSaver->goingToDelete(obj); if (!status.isOK()) { severe() << "Rollback cannot write document in namespace " << doc.ns << " to archive file: " << redact(status); throw RSFatalException(str::stream() << "Rollback cannot write document in namespace " << doc.ns << " to archive file."); } } else { error() << "Rollback cannot find object: " << pattern << " in namespace " << doc.ns; } } if (idAndDoc.second.isEmpty()) { // If the document could not be found on the primary, deletes the document. // TODO 1.6 : can't delete from a capped collection. Need to handle that // here. deletes++; if (collection) { if (collection->isCapped()) { // Can't delete from a capped collection - so we truncate instead. // if this item must go, so must all successors. try { // TODO: IIRC cappedTruncateAfter does not handle completely // empty. This will be slow if there is no _id index in // the collection. const auto clock = opCtx->getServiceContext()->getFastClockSource(); const auto findOneStart = clock->now(); RecordId loc = Helpers::findOne(opCtx, collection, pattern, false); if (clock->now() - findOneStart > Milliseconds(200)) warning() << "Roll back slow no _id index for " << doc.ns << " perhaps?"; // Would be faster but requires index: // RecordId loc = Helpers::findById(nsd, pattern); if (!loc.isNull()) { try { collection->cappedTruncateAfter(opCtx, loc, true); } catch (const DBException& e) { if (e.getCode() == 13415) { // hack: need to just make cappedTruncate do this... writeConflictRetry( opCtx, "truncate", collection->ns().ns(), [&] { WriteUnitOfWork wunit(opCtx); uassertStatusOK(collection->truncate(opCtx)); wunit.commit(); }); } else { throw e; } } } } catch (const DBException& e) { // Replicated capped collections have many ways to become // inconsistent. We rely on age-out to make these problems go away // eventually. warning() << "Ignoring failure to roll back change to capped " << "collection " << doc.ns << " with _id " << redact(idAndDoc.first._id.toString( /*includeFieldName*/ false)) << ": " << redact(e); } } else { deleteObjects(opCtx, collection, docNss, pattern, true, // justOne true); // god } } } else { // TODO faster... updates++; UpdateRequest request(docNss); request.setQuery(pattern); request.setUpdates(idAndDoc.second); request.setGod(); request.setUpsert(); UpdateLifecycleImpl updateLifecycle(docNss); request.setLifecycle(&updateLifecycle); update(opCtx, ctx.db(), request); } } catch (const DBException& e) { log() << "Exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' ' << redact(e) << " ndeletes:" << deletes; throw; } } } log() << "Rollback deleted " << deletes << " documents and updated " << updates << " documents."; log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString(); // Cleans up the oplog. { const NamespaceString oplogNss(rsOplogName); Lock::DBLock oplogDbLock(opCtx, oplogNss.db(), MODE_IX); Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X); OldClientContext ctx(opCtx, rsOplogName); Collection* oplogCollection = ctx.db()->getCollection(opCtx, oplogNss); if (!oplogCollection) { fassertFailedWithStatusNoTrace(40495, Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << "Can't find " << rsOplogName)); } // TODO: fatal error if this throws? oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false); } Status status = getGlobalAuthorizationManager()->initialize(opCtx); if (!status.isOK()) { severe() << "Failed to reinitialize auth data after rollback: " << redact(status); fassertFailedNoTrace(40496); } // Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the // lastAppliedHash value in bgsync to reflect our new last op. replCoord->resetLastOpTimesFromOplog(opCtx); } Status _syncRollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, int requiredRBID, ReplicationCoordinator* replCoord, ReplicationProcess* replicationProcess) { invariant(!opCtx->lockState()->isLocked()); FixUpInfo how; log() << "Starting rollback. Sync source: " << rollbackSource.getSource() << rsLog; how.rbid = rollbackSource.getRollbackId(); uassert( 40506, "Upstream node rolled back. Need to retry our rollback.", how.rbid == requiredRBID); log() << "Finding the Common Point"; try { auto processOperationForFixUp = [&how](const BSONObj& operation) { return updateFixUpInfoFromLocalOplogEntry(how, operation); }; // Calls syncRollBackLocalOperations to run updateFixUpInfoFromLocalOplogEntry // on each oplog entry up until the common point. auto res = syncRollBackLocalOperations( localOplog, rollbackSource.getOplog(), processOperationForFixUp); if (!res.isOK()) { const auto status = res.getStatus(); switch (status.code()) { case ErrorCodes::OplogStartMissing: case ErrorCodes::UnrecoverableRollbackError: return status; default: throw RSFatalException(status.toString()); } } how.commonPoint = res.getValue().first; // OpTime how.commonPointOurDiskloc = res.getValue().second; // RecordID how.removeRedundantOperations(); } catch (const RSFatalException& e) { return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << "need to rollback, but unable to determine common point between" " local and remote oplog: " << e.what(), 18752); } log() << "Rollback common point is " << how.commonPoint; try { ON_BLOCK_EXIT([&] { auto status = replicationProcess->incrementRollbackID(opCtx); fassertStatusOK(40497, status); }); syncFixUp(opCtx, how, rollbackSource, replCoord, replicationProcess); } catch (const RSFatalException& e) { return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753); } if (MONGO_FAIL_POINT(rollbackHangBeforeFinish)) { // This log output is used in js tests so please leave it. log() << "rollback - rollbackHangBeforeFinish fail point " "enabled. Blocking until fail point is disabled."; while (MONGO_FAIL_POINT(rollbackHangBeforeFinish)) { invariant(!globalInShutdownDeprecated()); // It is an error to shutdown while enabled. mongo::sleepsecs(1); } } return Status::OK(); } } // namespace Status syncRollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, int requiredRBID, ReplicationCoordinator* replCoord, ReplicationProcess* replicationProcess) { invariant(opCtx); invariant(replCoord); DisableDocumentValidation validationDisabler(opCtx); UnreplicatedWritesBlock replicationDisabler(opCtx); Status status = _syncRollback( opCtx, localOplog, rollbackSource, requiredRBID, replCoord, replicationProcess); log() << "Rollback finished. The final minValid is: " << replicationProcess->getConsistencyMarkers()->getMinValid(opCtx) << rsLog; return status; } void rollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, int requiredRBID, ReplicationCoordinator* replCoord, ReplicationProcess* replicationProcess, stdx::function sleepSecsFn) { // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or // after transitioning to RECOVERING. We always transition to RECOVERING regardless of success // or (recoverable) failure since we may be in an inconsistent state. If rollback failed before // writing anything, SyncTail will quickly take us to SECONDARY since are are still at our // original MinValid, which is fine because we may choose a sync source that doesn't require // rollback. If it failed after we wrote to MinValid, then we will pick a sync source that will // cause us to roll back to the same common point, which is fine. If we succeeded, we will be // consistent as soon as we apply up to/through MinValid and SyncTail will make us SECONDARY // then. { Lock::GlobalWrite globalWrite(opCtx); auto status = replCoord->setFollowerMode(MemberState::RS_ROLLBACK); if (!status.isOK()) { log() << "Cannot transition from " << replCoord->getMemberState().toString() << " to " << MemberState(MemberState::RS_ROLLBACK).toString() << causedBy(status); return; } } try { auto status = syncRollback( opCtx, localOplog, rollbackSource, requiredRBID, replCoord, replicationProcess); // Aborts only when syncRollback detects we are in a unrecoverable state. // WARNING: these statuses sometimes have location codes which are lost with uassertStatusOK // so we need to check here first. if (ErrorCodes::UnrecoverableRollbackError == status.code()) { severe() << "Unable to complete rollback. A full resync may be needed: " << redact(status); fassertFailedNoTrace(40507); } // In other cases, we log the message contained in the error status and retry later. uassertStatusOK(status); } catch (const DBException& ex) { // UnrecoverableRollbackError should only come from a returned status which is handled // above. invariant(ex.getCode() != ErrorCodes::UnrecoverableRollbackError); warning() << "Rollback cannot complete at this time (retrying later): " << redact(ex) << " appliedThrough= " << replCoord->getMyLastAppliedOpTime() << " minvalid= " << replicationProcess->getConsistencyMarkers()->getMinValid(opCtx); // Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If // we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this // will also prevent us from hot-looping and putting too much load on upstream nodes. sleepSecsFn(5); // 5 seconds was chosen as a completely arbitrary amount of time. } catch (...) { std::terminate(); } // At this point we are about to leave rollback. Before we do, wait for any writes done // as part of rollback to be durable, and then do any necessary checks that we didn't // wind up rolling back something illegal. We must wait for the rollback to be durable // so that if we wind up shutting down uncleanly in response to something we rolled back // we know that we won't wind up right back in the same situation when we start back up // because the rollback wasn't durable. opCtx->recoveryUnit()->waitUntilDurable(); // If we detected that we rolled back the shardIdentity document as part of this rollback // then we must shut down to clear the in-memory ShardingState associated with the // shardIdentity document. if (ShardIdentityRollbackNotifier::get(opCtx)->didRollbackHappen()) { severe() << "shardIdentity document rollback detected. Shutting down to clear " "in-memory sharding state. Restarting this process should safely return it " "to a healthy state"; fassertFailedNoTrace(40498); } auto status = replCoord->setFollowerMode(MemberState::RS_RECOVERING); if (!status.isOK()) { severe() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) << "; found self in " << replCoord->getMemberState() << causedBy(status); fassertFailedNoTrace(40499); } } } // namespace repl } // namespace mongo