diff options
Diffstat (limited to 'src/mongo/db/repl/rs_rollback.cpp')
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 1251 |
1 files changed, 600 insertions, 651 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index e5f01a6c8c7..d7a4c151910 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -102,762 +102,711 @@ namespace mongo { - using std::shared_ptr; - using std::unique_ptr; - using std::endl; - using std::list; - using std::map; - using std::set; - using std::string; - using std::pair; +using std::shared_ptr; +using std::unique_ptr; +using std::endl; +using std::list; +using std::map; +using std::set; +using std::string; +using std::pair; namespace repl { namespace { - class RSFatalException : public std::exception { - public: - RSFatalException(std::string m = "replica set fatal exception") - : msg(m) {} - virtual ~RSFatalException() throw() {}; - virtual const char* what() const throw() { - return msg.c_str(); - } - private: - std::string msg; - }; - - struct DocID { - // ns and _id both point into ownedObj's buffer - BSONObj ownedObj; - const char* ns; - BSONElement _id; - bool operator<(const DocID& other) const { - int comp = strcmp(ns, other.ns); - if (comp < 0) - return true; - if (comp > 0) - return false; - return _id < other._id; - } - }; +class RSFatalException : public std::exception { +public: + RSFatalException(std::string m = "replica set fatal exception") : msg(m) {} + virtual ~RSFatalException() throw(){}; + virtual const char* what() const throw() { + return msg.c_str(); + } - struct FixUpInfo { - // note this is a set -- if there are many $inc's on a single document we need to rollback, - // we only need to refetch it once. - set<DocID> toRefetch; +private: + std::string msg; +}; + +struct DocID { + // ns and _id both point into ownedObj's buffer + BSONObj ownedObj; + const char* ns; + BSONElement _id; + bool operator<(const DocID& other) const { + int comp = strcmp(ns, other.ns); + if (comp < 0) + return true; + if (comp > 0) + return false; + return _id < other._id; + } +}; - // collections to drop - set<string> toDrop; +struct FixUpInfo { + // note this is a set -- if there are many $inc's on a single document we need to rollback, + // we only need to refetch it once. + set<DocID> toRefetch; - set<string> collectionsToResyncData; - set<string> collectionsToResyncMetadata; + // collections to drop + set<string> toDrop; - Timestamp commonPoint; - RecordId commonPointOurDiskloc; + set<string> collectionsToResyncData; + set<string> collectionsToResyncMetadata; - int rbid; // remote server's current rollback sequence # - }; + Timestamp commonPoint; + RecordId commonPointOurDiskloc; + int rbid; // remote server's current rollback sequence # +}; - Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { - const char* op = ourObj.getStringField("op"); - if (*op == 'n') - return Status::OK(); - if (ourObj.objsize() > 512 * 1024 * 1024) - throw RSFatalException("rollback too large"); +Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { + const char* op = ourObj.getStringField("op"); + if (*op == 'n') + return Status::OK(); - DocID doc; - doc.ownedObj = ourObj.getOwned(); - doc.ns = doc.ownedObj.getStringField("ns"); - if (*doc.ns == '\0') { - warning() << "ignoring op on rollback no ns TODO : " - << doc.ownedObj.toString(); - return Status::OK(); - } + if (ourObj.objsize() > 512 * 1024 * 1024) + throw RSFatalException("rollback too large"); - BSONObj obj = doc.ownedObj.getObjectField(*op=='u' ? "o2" : "o"); - if (obj.isEmpty()) { - warning() << "ignoring op on rollback : " << doc.ownedObj.toString(); - return Status::OK(); - } + DocID doc; + doc.ownedObj = ourObj.getOwned(); + doc.ns = doc.ownedObj.getStringField("ns"); + if (*doc.ns == '\0') { + warning() << "ignoring op on rollback no ns TODO : " << doc.ownedObj.toString(); + return Status::OK(); + } - if (*op == 'c') { - BSONElement first = obj.firstElement(); - NamespaceString nss(doc.ns); // foo.$cmd - string cmdname = first.fieldName(); - Command* cmd = Command::findCommand(cmdname.c_str()); - if (cmd == NULL) { - severe() << "rollback no such command " << first.fieldName(); - return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << - "rollback no such command " << first.fieldName(), - 18751); - } - if (cmdname == "create") { - // Create collection operation - // { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } } - string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc - fixUpInfo.toDrop.insert(ns); - return Status::OK(); - } - else if (cmdname == "drop") { - string ns = nss.db().toString() + '.' + first.valuestr(); - fixUpInfo.collectionsToResyncData.insert(ns); - return Status::OK(); - } - else if (cmdname == "dropIndexes" || cmdname == "deleteIndexes") { - // TODO: this is bad. we simply full resync the collection here, - // which could be very slow. - warning() << "rollback of dropIndexes is slow in this version of " - << "mongod"; - string ns = nss.db().toString() + '.' + first.valuestr(); - fixUpInfo.collectionsToResyncData.insert(ns); - return Status::OK(); - } - else if (cmdname == "renameCollection") { - // TODO: slow. - warning() << "rollback of renameCollection is slow in this version of " - << "mongod"; - string from = first.valuestr(); - string to = obj["to"].String(); - fixUpInfo.collectionsToResyncData.insert(from); - fixUpInfo.collectionsToResyncData.insert(to); - return Status::OK(); - } - else if (cmdname == "dropDatabase") { - severe() << "rollback : can't rollback drop database full resync " - << "will be required"; - log() << obj.toString(); - throw RSFatalException(); - } - else if (cmdname == "collMod") { - const auto ns = NamespaceString(cmd->parseNs(nss.db().toString(), obj)); - for (auto field : obj) { - const auto modification = field.fieldNameStringData(); - if (modification == cmdname) { - continue; // Skipping command name. - } + BSONObj obj = doc.ownedObj.getObjectField(*op == 'u' ? "o2" : "o"); + if (obj.isEmpty()) { + warning() << "ignoring op on rollback : " << doc.ownedObj.toString(); + return Status::OK(); + } - if (modification == "validator" - || modification == "usePowerOf2Sizes" - || modification == "noPadding") { - fixUpInfo.collectionsToResyncMetadata.insert(ns); - continue; - } + if (*op == 'c') { + BSONElement first = obj.firstElement(); + NamespaceString nss(doc.ns); // foo.$cmd + string cmdname = first.fieldName(); + Command* cmd = Command::findCommand(cmdname.c_str()); + if (cmd == NULL) { + severe() << "rollback no such command " << first.fieldName(); + return Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "rollback no such command " << first.fieldName(), + 18751); + } + if (cmdname == "create") { + // Create collection operation + // { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } } + string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc + fixUpInfo.toDrop.insert(ns); + return Status::OK(); + } else if (cmdname == "drop") { + string ns = nss.db().toString() + '.' + first.valuestr(); + fixUpInfo.collectionsToResyncData.insert(ns); + return Status::OK(); + } else if (cmdname == "dropIndexes" || cmdname == "deleteIndexes") { + // TODO: this is bad. we simply full resync the collection here, + // which could be very slow. + warning() << "rollback of dropIndexes is slow in this version of " + << "mongod"; + string ns = nss.db().toString() + '.' + first.valuestr(); + fixUpInfo.collectionsToResyncData.insert(ns); + return Status::OK(); + } else if (cmdname == "renameCollection") { + // TODO: slow. + warning() << "rollback of renameCollection is slow in this version of " + << "mongod"; + string from = first.valuestr(); + string to = obj["to"].String(); + fixUpInfo.collectionsToResyncData.insert(from); + fixUpInfo.collectionsToResyncData.insert(to); + return Status::OK(); + } else if (cmdname == "dropDatabase") { + severe() << "rollback : can't rollback drop database full resync " + << "will be required"; + log() << obj.toString(); + throw RSFatalException(); + } else if (cmdname == "collMod") { + const auto ns = NamespaceString(cmd->parseNs(nss.db().toString(), obj)); + for (auto field : obj) { + const auto modification = field.fieldNameStringData(); + if (modification == cmdname) { + continue; // Skipping command name. + } - severe() << "cannot rollback a collMod command: " << obj; - throw RSFatalException(); + if (modification == "validator" || modification == "usePowerOf2Sizes" || + modification == "noPadding") { + fixUpInfo.collectionsToResyncMetadata.insert(ns); + continue; } - } - else { - severe() << "can't rollback this command yet: " - << obj.toString(); - log() << "cmdname=" << cmdname; + + severe() << "cannot rollback a collMod command: " << obj; throw RSFatalException(); } + } else { + severe() << "can't rollback this command yet: " << obj.toString(); + log() << "cmdname=" << cmdname; + throw RSFatalException(); } + } - doc._id = obj["_id"]; - if (doc._id.eoo()) { - warning() << "ignoring op on rollback no _id TODO : " << doc.ns << ' ' - << doc.ownedObj.toString(); - return Status::OK(); - } - - fixUpInfo.toRefetch.insert(doc); + doc._id = obj["_id"]; + if (doc._id.eoo()) { + warning() << "ignoring op on rollback no _id TODO : " << doc.ns << ' ' + << doc.ownedObj.toString(); return Status::OK(); } + fixUpInfo.toRefetch.insert(doc); + return Status::OK(); +} - void syncFixUp(OperationContext* txn, - FixUpInfo& fixUpInfo, - const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord) { - // fetch all first so we needn't handle interruption in a fancy way - unsigned long long totalSize = 0; +void syncFixUp(OperationContext* txn, + FixUpInfo& fixUpInfo, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord) { + // fetch all first so we needn't handle interruption in a fancy way - list< pair<DocID, BSONObj> > goodVersions; + unsigned long long totalSize = 0; - BSONObj newMinValid; + list<pair<DocID, BSONObj>> goodVersions; - // fetch all the goodVersions of each document from current primary - DocID doc; - unsigned long long numFetched = 0; - try { - for (set<DocID>::iterator it = fixUpInfo.toRefetch.begin(); - it != fixUpInfo.toRefetch.end(); - it++) { - doc = *it; - - verify(!doc._id.eoo()); - - { - // TODO : slow. lots of round trips. - numFetched++; - BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap()); - totalSize += good.objsize(); - uassert(13410, "replSet too much data to roll back", - totalSize < 300 * 1024 * 1024); - - // note good might be eoo, indicating we should delete it - goodVersions.push_back(pair<DocID, BSONObj>(doc,good)); - } - } - newMinValid = rollbackSource.getLastOperation(); - if (newMinValid.isEmpty()) { - error() << "rollback error newMinValid empty?"; - return; - } - } - catch (const DBException& e) { - LOG(1) << "rollback re-get objects: " << e.toString(); - error() << "rollback couldn't re-get ns:" << doc.ns << " _id:" << doc._id << ' ' - << numFetched << '/' << fixUpInfo.toRefetch.size(); - throw e; - } + BSONObj newMinValid; - log() << "rollback 3.5"; - if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { - // Our source rolled back itself so the data we received isn't necessarily consistent. - warning() << "rollback rbid on source changed during rollback, " - << "cancelling this attempt"; - return; - } + // fetch all the goodVersions of each document from current primary + DocID doc; + unsigned long long numFetched = 0; + try { + for (set<DocID>::iterator it = fixUpInfo.toRefetch.begin(); it != fixUpInfo.toRefetch.end(); + it++) { + doc = *it; - // update them - log() << "rollback 4 n:" << goodVersions.size(); + verify(!doc._id.eoo()); - bool warn = false; + { + // TODO : slow. lots of round trips. + numFetched++; + BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap()); + totalSize += good.objsize(); + uassert(13410, "replSet too much data to roll back", totalSize < 300 * 1024 * 1024); - invariant(!fixUpInfo.commonPointOurDiskloc.isNull()); + // note good might be eoo, indicating we should delete it + goodVersions.push_back(pair<DocID, BSONObj>(doc, good)); + } + } + newMinValid = rollbackSource.getLastOperation(); + if (newMinValid.isEmpty()) { + error() << "rollback error newMinValid empty?"; + return; + } + } catch (const DBException& e) { + LOG(1) << "rollback re-get objects: " << e.toString(); + error() << "rollback couldn't re-get ns:" << doc.ns << " _id:" << doc._id << ' ' + << numFetched << '/' << fixUpInfo.toRefetch.size(); + throw e; + } - // we have items we are writing that aren't from a point-in-time. thus best not to come - // online until we get to that point in freshness. - OpTime minValid = extractOpTime(newMinValid); - log() << "minvalid=" << minValid; - setMinValid(txn, minValid); + log() << "rollback 3.5"; + if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { + // Our source rolled back itself so the data we received isn't necessarily consistent. + warning() << "rollback rbid on source changed during rollback, " + << "cancelling this attempt"; + return; + } - // any full collection resyncs required? - if (!fixUpInfo.collectionsToResyncData.empty() - || !fixUpInfo.collectionsToResyncMetadata.empty()) { + // update them + log() << "rollback 4 n:" << goodVersions.size(); - for (const string& ns : fixUpInfo.collectionsToResyncData) { - log() << "rollback 4.1.1 coll resync " << ns; + bool warn = false; - fixUpInfo.collectionsToResyncMetadata.erase(ns); + invariant(!fixUpInfo.commonPointOurDiskloc.isNull()); - const NamespaceString nss(ns); + // we have items we are writing that aren't from a point-in-time. thus best not to come + // online until we get to that point in freshness. + OpTime minValid = extractOpTime(newMinValid); + log() << "minvalid=" << minValid; + setMinValid(txn, minValid); + // any full collection resyncs required? + if (!fixUpInfo.collectionsToResyncData.empty() || + !fixUpInfo.collectionsToResyncMetadata.empty()) { + for (const string& ns : fixUpInfo.collectionsToResyncData) { + log() << "rollback 4.1.1 coll resync " << ns; - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); - Database* db = dbHolder().openDb(txn, nss.db().toString()); - invariant(db); - WriteUnitOfWork wunit(txn); - db->dropCollection(txn, ns); - wunit.commit(); - } + fixUpInfo.collectionsToResyncMetadata.erase(ns); - rollbackSource.copyCollectionFromRemote(txn, nss); - } + const NamespaceString nss(ns); - for (const string& ns : fixUpInfo.collectionsToResyncMetadata) { - log() << "rollback 4.1.2 coll metadata resync " << ns; - const NamespaceString nss(ns); + { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); - auto db = dbHolder().openDb(txn, nss.db().toString()); + Database* db = dbHolder().openDb(txn, nss.db().toString()); invariant(db); - auto collection = db->getCollection(ns); - invariant(collection); - auto cce = collection->getCatalogEntry(); + WriteUnitOfWork wunit(txn); + db->dropCollection(txn, ns); + wunit.commit(); + } - auto infoResult = rollbackSource.getCollectionInfo(nss); + rollbackSource.copyCollectionFromRemote(txn, nss); + } - if (!infoResult.isOK()) { - // Collection dropped by "them" so we should drop it too. - log() << ns << " not found on remote host, dropping"; - fixUpInfo.toDrop.insert(ns); - continue; - } + for (const string& ns : fixUpInfo.collectionsToResyncMetadata) { + log() << "rollback 4.1.2 coll metadata resync " << ns; - auto info = infoResult.getValue(); - CollectionOptions options; - if (auto optionsField = info["options"]) { - if (optionsField.type() != Object) { - throw RSFatalException(str::stream() << "Failed to parse options " - << info << ": expected 'options' to be an " - << "Object, got " << typeName(optionsField.type())); - } + const NamespaceString nss(ns); + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); + auto db = dbHolder().openDb(txn, nss.db().toString()); + invariant(db); + auto collection = db->getCollection(ns); + invariant(collection); + auto cce = collection->getCatalogEntry(); - auto status = options.parse(optionsField.Obj()); - if (!status.isOK()) { - throw RSFatalException(str::stream() << "Failed to parse options " - << info << ": " - << status.toString()); - } - } - else { - // Use default options. - } + auto infoResult = rollbackSource.getCollectionInfo(nss); - WriteUnitOfWork wuow(txn); - if (options.flagsSet || cce->getCollectionOptions(txn).flagsSet) { - cce->updateFlags(txn, options.flags); + if (!infoResult.isOK()) { + // Collection dropped by "them" so we should drop it too. + log() << ns << " not found on remote host, dropping"; + fixUpInfo.toDrop.insert(ns); + continue; + } + + auto info = infoResult.getValue(); + CollectionOptions options; + if (auto optionsField = info["options"]) { + if (optionsField.type() != Object) { + throw RSFatalException(str::stream() << "Failed to parse options " << info + << ": expected 'options' to be an " + << "Object, got " + << typeName(optionsField.type())); } - auto status = collection->setValidator(txn, options.validator); + auto status = options.parse(optionsField.Obj()); if (!status.isOK()) { - throw RSFatalException(str::stream() << "Failed to set validator: " - << status.toString()); + throw RSFatalException(str::stream() << "Failed to parse options " << info + << ": " << status.toString()); } - wuow.commit(); + } else { + // Use default options. } - // we did more reading from primary, so check it again for a rollback (which would mess - // us up), and make minValid newer. - log() << "rollback 4.2"; - - string err; - try { - newMinValid = rollbackSource.getLastOperation(); - if (newMinValid.isEmpty()) { - err = "can't get minvalid from sync source"; - } - else { - OpTime minValid = extractOpTime(newMinValid); - log() << "minvalid=" << minValid; - setMinValid(txn, minValid); - } - } - catch (const DBException& e) { - err = "can't get/set minvalid: "; - err += e.what(); - } - if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { - // our source rolled back itself. so the data we received isn't necessarily - // consistent. however, we've now done writes. thus we have a problem. - err += "rbid at primary changed during resync/rollback"; + WriteUnitOfWork wuow(txn); + if (options.flagsSet || cce->getCollectionOptions(txn).flagsSet) { + cce->updateFlags(txn, options.flags); } - if (!err.empty()) { - severe() << "rolling back : " << err - << ". A full resync will be necessary."; - // TODO: reset minvalid so that we are permanently in fatal state - // TODO: don't be fatal, but rather, get all the data first. - throw RSFatalException(); + + auto status = collection->setValidator(txn, options.validator); + if (!status.isOK()) { + throw RSFatalException(str::stream() + << "Failed to set validator: " << status.toString()); } - log() << "rollback 4.3"; + wuow.commit(); } - map<string,shared_ptr<Helpers::RemoveSaver> > removeSavers; + // we did more reading from primary, so check it again for a rollback (which would mess + // us up), and make minValid newer. + log() << "rollback 4.2"; - log() << "rollback 4.6"; - // drop collections to drop before doing individual fixups - that might make things faster - // below actually if there were subsequent inserts to rollback - for (set<string>::iterator it = fixUpInfo.toDrop.begin(); - it != fixUpInfo.toDrop.end(); - it++) { - log() << "rollback drop: " << *it; - - ScopedTransaction transaction(txn, MODE_IX); - const NamespaceString nss(*it); - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); - Database* db = dbHolder().get(txn, nsToDatabaseSubstring(*it)); - if (db) { - WriteUnitOfWork wunit(txn); + string err; + try { + newMinValid = rollbackSource.getLastOperation(); + if (newMinValid.isEmpty()) { + err = "can't get minvalid from sync source"; + } else { + OpTime minValid = extractOpTime(newMinValid); + log() << "minvalid=" << minValid; + setMinValid(txn, minValid); + } + } catch (const DBException& e) { + err = "can't get/set minvalid: "; + err += e.what(); + } + if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { + // our source rolled back itself. so the data we received isn't necessarily + // consistent. however, we've now done writes. thus we have a problem. + err += "rbid at primary changed during resync/rollback"; + } + if (!err.empty()) { + severe() << "rolling back : " << err << ". A full resync will be necessary."; + // TODO: reset minvalid so that we are permanently in fatal state + // TODO: don't be fatal, but rather, get all the data first. + throw RSFatalException(); + } + log() << "rollback 4.3"; + } - shared_ptr<Helpers::RemoveSaver>& removeSaver = removeSavers[*it]; - if (!removeSaver) - removeSaver.reset(new Helpers::RemoveSaver("rollback", "", *it)); - - // perform a collection scan and write all documents in the collection to disk - std::unique_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(txn, - *it, - db->getCollection(*it))); - BSONObj curObj; - PlanExecutor::ExecState execState; - while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) { - removeSaver->goingToDelete(curObj); - } - if (execState != PlanExecutor::IS_EOF) { - if (execState == PlanExecutor::FAILURE && - WorkingSetCommon::isValidStatusMemberObject(curObj)) { - Status errorStatus = WorkingSetCommon::getMemberObjectStatus(curObj); - severe() << "rolling back createCollection on " << *it - << " failed with " << errorStatus - << ". A full resync is necessary."; - } - else { - severe() << "rolling back createCollection on " << *it - << " failed. A full resync is necessary."; - } - - throw RSFatalException(); + map<string, shared_ptr<Helpers::RemoveSaver>> removeSavers; + + log() << "rollback 4.6"; + // drop collections to drop before doing individual fixups - that might make things faster + // below actually if there were subsequent inserts to rollback + for (set<string>::iterator it = fixUpInfo.toDrop.begin(); it != fixUpInfo.toDrop.end(); it++) { + log() << "rollback drop: " << *it; + + ScopedTransaction transaction(txn, MODE_IX); + const NamespaceString nss(*it); + Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); + Database* db = dbHolder().get(txn, nsToDatabaseSubstring(*it)); + if (db) { + WriteUnitOfWork wunit(txn); + + shared_ptr<Helpers::RemoveSaver>& removeSaver = removeSavers[*it]; + if (!removeSaver) + removeSaver.reset(new Helpers::RemoveSaver("rollback", "", *it)); + + // perform a collection scan and write all documents in the collection to disk + std::unique_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(txn, *it, db->getCollection(*it))); + BSONObj curObj; + PlanExecutor::ExecState execState; + while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) { + removeSaver->goingToDelete(curObj); + } + if (execState != PlanExecutor::IS_EOF) { + if (execState == PlanExecutor::FAILURE && + WorkingSetCommon::isValidStatusMemberObject(curObj)) { + Status errorStatus = WorkingSetCommon::getMemberObjectStatus(curObj); + severe() << "rolling back createCollection on " << *it << " failed with " + << errorStatus << ". A full resync is necessary."; + } else { + severe() << "rolling back createCollection on " << *it + << " failed. A full resync is necessary."; } - db->dropCollection(txn, *it); - wunit.commit(); + throw RSFatalException(); } + + db->dropCollection(txn, *it); + wunit.commit(); } + } - log() << "rollback 4.7"; - unsigned deletes = 0, updates = 0; - time_t lastProgressUpdate = time(0); - time_t progressUpdateGap = 10; - for (list<pair<DocID, BSONObj> >::iterator it = goodVersions.begin(); - it != goodVersions.end(); - it++) { - time_t now = time(0); - if (now - lastProgressUpdate > progressUpdateGap) { - log() << deletes << " delete and " - << updates << " update operations processed out of " - << goodVersions.size() << " total operations"; - lastProgressUpdate = now; + log() << "rollback 4.7"; + unsigned deletes = 0, updates = 0; + time_t lastProgressUpdate = time(0); + time_t progressUpdateGap = 10; + for (list<pair<DocID, BSONObj>>::iterator it = goodVersions.begin(); it != goodVersions.end(); + it++) { + time_t now = time(0); + if (now - lastProgressUpdate > progressUpdateGap) { + log() << deletes << " delete and " << updates << " update operations processed out of " + << goodVersions.size() << " total operations"; + lastProgressUpdate = now; + } + const DocID& doc = it->first; + BSONObj pattern = doc._id.wrap(); // { _id : ... } + try { + verify(doc.ns && *doc.ns); + if (fixUpInfo.collectionsToResyncData.count(doc.ns)) { + // we just synced this entire collection + continue; } - const DocID& doc = it->first; - BSONObj pattern = doc._id.wrap(); // { _id : ... } - try { - verify(doc.ns && *doc.ns); - if (fixUpInfo.collectionsToResyncData.count(doc.ns)) { - // we just synced this entire collection - continue; - } - // keep an archive of items rolled back - shared_ptr<Helpers::RemoveSaver>& removeSaver = removeSavers[doc.ns]; - if (!removeSaver) - removeSaver.reset(new Helpers::RemoveSaver("rollback", "", doc.ns)); + // keep an archive of items rolled back + shared_ptr<Helpers::RemoveSaver>& removeSaver = removeSavers[doc.ns]; + if (!removeSaver) + removeSaver.reset(new Helpers::RemoveSaver("rollback", "", doc.ns)); - // todo: lots of overhead in context, this can be faster - const NamespaceString docNss(doc.ns); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock docDbLock(txn->lockState(), docNss.db(), MODE_X); - OldClientContext ctx(txn, doc.ns); + // todo: lots of overhead in context, this can be faster + const NamespaceString docNss(doc.ns); + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock docDbLock(txn->lockState(), docNss.db(), MODE_X); + OldClientContext ctx(txn, doc.ns); + + // Add the doc to our rollback file + BSONObj obj; + Collection* collection = ctx.db()->getCollection(doc.ns); + + // Do not log an error when undoing an insert on a no longer existent collection. + // It is likely that the collection was dropped as part of rolling back a + // createCollection command and regardless, the document no longer exists. + if (collection) { + bool found = Helpers::findOne(txn, collection, pattern, obj, false); + if (found) { + removeSaver->goingToDelete(obj); + } else { + error() << "rollback cannot find object: " << pattern << " in namespace " + << doc.ns; + } + } - // Add the doc to our rollback file - BSONObj obj; - Collection* collection = ctx.db()->getCollection(doc.ns); + if (it->second.isEmpty()) { + // wasn't on the primary; delete. + // TODO 1.6 : can't delete from a capped collection. need to handle that here. + deletes++; - // Do not log an error when undoing an insert on a no longer existent collection. - // It is likely that the collection was dropped as part of rolling back a - // createCollection command and regardless, the document no longer exists. if (collection) { - bool found = Helpers::findOne(txn, collection, pattern, obj, false); - if (found) { - removeSaver->goingToDelete(obj); - } - else { - error() << "rollback cannot find object: " << pattern - << " in namespace " << doc.ns; - } - } - - if (it->second.isEmpty()) { - // wasn't on the primary; delete. - // TODO 1.6 : can't delete from a capped collection. need to handle that here. - deletes++; - - if (collection) { - if (collection->isCapped()) { - // can't delete from a capped collection - so we truncate instead. if - // this item must go, so must all successors!!! - try { - // TODO: IIRC cappedTruncateAfter does not handle completely empty. - // this will crazy slow if no _id index. - long long start = Listener::getElapsedTimeMillis(); - RecordId loc = Helpers::findOne(txn, collection, pattern, false); - if (Listener::getElapsedTimeMillis() - start > 200) - warning() << "roll back slow no _id index for " - << doc.ns << " perhaps?"; - // would be faster but requires index: - // RecordId loc = Helpers::findById(nsd, pattern); - if (!loc.isNull()) { - try { - collection->temp_cappedTruncateAfter(txn, loc, true); - } - catch (const DBException& e) { - if (e.getCode() == 13415) { - // hack: need to just make cappedTruncate do this... - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(txn); - uassertStatusOK(collection->truncate(txn)); - wunit.commit(); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, - "truncate", - collection->ns().ns()); - } - else { - throw e; + if (collection->isCapped()) { + // can't delete from a capped collection - so we truncate instead. if + // this item must go, so must all successors!!! + try { + // TODO: IIRC cappedTruncateAfter does not handle completely empty. + // this will crazy slow if no _id index. + long long start = Listener::getElapsedTimeMillis(); + RecordId loc = Helpers::findOne(txn, collection, pattern, false); + if (Listener::getElapsedTimeMillis() - start > 200) + warning() << "roll back slow no _id index for " << doc.ns + << " perhaps?"; + // would be faster but requires index: + // RecordId loc = Helpers::findById(nsd, pattern); + if (!loc.isNull()) { + try { + collection->temp_cappedTruncateAfter(txn, loc, true); + } catch (const DBException& e) { + if (e.getCode() == 13415) { + // hack: need to just make cappedTruncate do this... + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + uassertStatusOK(collection->truncate(txn)); + wunit.commit(); } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, "truncate", collection->ns().ns()); + } else { + throw e; } } } - catch (const DBException& e) { - error() << "rolling back capped collection rec " - << doc.ns << ' ' << e.toString(); - } - } - else { - deleteObjects(txn, - ctx.db(), - doc.ns, - pattern, - PlanExecutor::YIELD_MANUAL, - true, // justone - true); // god + } catch (const DBException& e) { + error() << "rolling back capped collection rec " << doc.ns << ' ' + << e.toString(); } - // did we just empty the collection? if so let's check if it even - // exists on the source. - if (collection->numRecords(txn) == 0) { - try { - NamespaceString nss(doc.ns); - auto infoResult = rollbackSource.getCollectionInfo(nss); - if (!infoResult.isOK()) { - // we should drop - WriteUnitOfWork wunit(txn); - ctx.db()->dropCollection(txn, doc.ns); - wunit.commit(); - } - } - catch (const DBException&) { - // this isn't *that* big a deal, but is bad. - warning() << "rollback error querying for existence of " - << doc.ns << " at the primary, ignoring"; + } else { + deleteObjects(txn, + ctx.db(), + doc.ns, + pattern, + PlanExecutor::YIELD_MANUAL, + true, // justone + true); // god + } + // did we just empty the collection? if so let's check if it even + // exists on the source. + if (collection->numRecords(txn) == 0) { + try { + NamespaceString nss(doc.ns); + auto infoResult = rollbackSource.getCollectionInfo(nss); + if (!infoResult.isOK()) { + // we should drop + WriteUnitOfWork wunit(txn); + ctx.db()->dropCollection(txn, doc.ns); + wunit.commit(); } + } catch (const DBException&) { + // this isn't *that* big a deal, but is bad. + warning() << "rollback error querying for existence of " << doc.ns + << " at the primary, ignoring"; } } } - else { - // TODO faster... - OpDebug debug; - updates++; - - const NamespaceString requestNs(doc.ns); - UpdateRequest request(requestNs); - - request.setQuery(pattern); - request.setUpdates(it->second); - request.setGod(); - request.setUpsert(); - UpdateLifecycleImpl updateLifecycle(true, requestNs); - request.setLifecycle(&updateLifecycle); - - update(txn, ctx.db(), request, &debug); - - } - } - catch (const DBException& e) { - log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() - << ' ' << e.toString() << " ndeletes:" << deletes; - warn = true; - } - } - - removeSavers.clear(); // this effectively closes all of them - log() << "rollback 5 d:" << deletes << " u:" << updates; - log() << "rollback 6"; - - // clean up oplog - LOG(2) << "rollback truncate oplog after " << - fixUpInfo.commonPoint.toStringPretty(); - { - const NamespaceString oplogNss(rsOplogName); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock oplogDbLock(txn->lockState(), oplogNss.db(), MODE_IX); - Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X); - OldClientContext ctx(txn, rsOplogName); - Collection* oplogCollection = ctx.db()->getCollection(rsOplogName); - if (!oplogCollection) { - fassertFailedWithStatusNoTrace( - 13423, - Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << - "Can't find " << rsOplogName)); + } else { + // TODO faster... + OpDebug debug; + updates++; + + const NamespaceString requestNs(doc.ns); + UpdateRequest request(requestNs); + + request.setQuery(pattern); + request.setUpdates(it->second); + request.setGod(); + request.setUpsert(); + UpdateLifecycleImpl updateLifecycle(true, requestNs); + request.setLifecycle(&updateLifecycle); + + update(txn, ctx.db(), request, &debug); } - // TODO: fatal error if this throws? - oplogCollection->temp_cappedTruncateAfter(txn, fixUpInfo.commonPointOurDiskloc, false); - } - - Status status = getGlobalAuthorizationManager()->initialize(txn); - if (!status.isOK()) { - warning() << "Failed to reinitialize auth data after rollback: " << status; + } catch (const DBException& e) { + log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' ' + << e.toString() << " ndeletes:" << deletes; warn = true; } - - // Reload the lastOpTimeApplied value in the replcoord and the lastAppliedHash value in - // bgsync to reflect our new last op. - replCoord->resetLastOpTimeFromOplog(txn); - BackgroundSync::get()->loadLastAppliedHash(txn); - - // done - if (warn) - warning() << "issues during syncRollback, see log"; - else - log() << "rollback done"; } - Status _syncRollback(OperationContext* txn, - const OplogInterface& localOplog, - const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord, - const SleepSecondsFn& sleepSecondsFn) { - invariant(!txn->lockState()->isLocked()); - - log() << "rollback 0"; - - /** by doing this, we will not service reads (return an error as we aren't in secondary - * state. that perhaps is moot because of the write lock above, but that write lock - * probably gets deferred or removed or yielded later anyway. - * - * also, this is better for status reporting - we know what is happening. - */ - { - Lock::GlobalWrite globalWrite(txn->lockState()); - if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) { - return Status( - ErrorCodes::OperationFailed, str::stream() << - "Cannot transition from " << replCoord->getMemberState().toString() << - " to " << MemberState(MemberState::RS_ROLLBACK).toString()); - } + removeSavers.clear(); // this effectively closes all of them + log() << "rollback 5 d:" << deletes << " u:" << updates; + log() << "rollback 6"; + + // clean up oplog + LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toStringPretty(); + { + const NamespaceString oplogNss(rsOplogName); + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock oplogDbLock(txn->lockState(), oplogNss.db(), MODE_IX); + Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X); + OldClientContext ctx(txn, rsOplogName); + Collection* oplogCollection = ctx.db()->getCollection(rsOplogName); + if (!oplogCollection) { + fassertFailedWithStatusNoTrace(13423, + Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "Can't find " << rsOplogName)); } + // TODO: fatal error if this throws? + oplogCollection->temp_cappedTruncateAfter(txn, fixUpInfo.commonPointOurDiskloc, false); + } - FixUpInfo how; - log() << "rollback 1"; - how.rbid = rollbackSource.getRollbackId(); - { - log() << "rollback 2 FindCommonPoint"; - try { - auto processOperationForFixUp = [&how](const BSONObj& operation) { - return refetch(how, operation); - }; - auto res = syncRollBackLocalOperations( - localOplog, - rollbackSource.getOplog(), - processOperationForFixUp); - if (!res.isOK()) { - switch (res.getStatus().code()) { - case ErrorCodes::OplogStartMissing: - case ErrorCodes::UnrecoverableRollbackError: - sleepSecondsFn(Seconds(1)); - return res.getStatus(); - default: - throw RSFatalException(res.getStatus().toString()); - } - } - else { - how.commonPoint = res.getValue().first; - how.commonPointOurDiskloc = res.getValue().second; - } - } - catch (const RSFatalException& e) { - error() << string(e.what()); - return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << - "need to rollback, but unable to determine common point between" - "local and remote oplog: " << e.what(), - 18752); - } - catch (const DBException& e) { - warning() << "rollback 2 exception " << e.toString() << "; sleeping 1 min"; + Status status = getGlobalAuthorizationManager()->initialize(txn); + if (!status.isOK()) { + warning() << "Failed to reinitialize auth data after rollback: " << status; + warn = true; + } - sleepSecondsFn(Seconds(60)); - throw; - } + // Reload the lastOpTimeApplied value in the replcoord and the lastAppliedHash value in + // bgsync to reflect our new last op. + replCoord->resetLastOpTimeFromOplog(txn); + BackgroundSync::get()->loadLastAppliedHash(txn); + + // done + if (warn) + warning() << "issues during syncRollback, see log"; + else + log() << "rollback done"; +} + +Status _syncRollback(OperationContext* txn, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord, + const SleepSecondsFn& sleepSecondsFn) { + invariant(!txn->lockState()->isLocked()); + + log() << "rollback 0"; + + /** by doing this, we will not service reads (return an error as we aren't in secondary + * state. that perhaps is moot because of the write lock above, but that write lock + * probably gets deferred or removed or yielded later anyway. + * + * also, this is better for status reporting - we know what is happening. + */ + { + Lock::GlobalWrite globalWrite(txn->lockState()); + if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "Cannot transition from " + << replCoord->getMemberState().toString() << " to " + << MemberState(MemberState::RS_ROLLBACK).toString()); } + } - log() << "rollback 3 fixup"; - - replCoord->incrementRollbackID(); + FixUpInfo how; + log() << "rollback 1"; + how.rbid = rollbackSource.getRollbackId(); + { + log() << "rollback 2 FindCommonPoint"; try { - syncFixUp(txn, how, rollbackSource, replCoord); - } - catch (const RSFatalException& e) { - error() << "exception during rollback: " << e.what(); - return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << - "exception during rollback: " << e.what(), - 18753); - } - catch (...) { - replCoord->incrementRollbackID(); - - if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { - warning() << "Failed to transition into " << - MemberState(MemberState::RS_RECOVERING) << "; expected to be in state " << - MemberState(MemberState::RS_ROLLBACK) << "but found self in " << - replCoord->getMemberState(); + auto processOperationForFixUp = + [&how](const BSONObj& operation) { return refetch(how, operation); }; + auto res = syncRollBackLocalOperations( + localOplog, rollbackSource.getOplog(), processOperationForFixUp); + if (!res.isOK()) { + switch (res.getStatus().code()) { + case ErrorCodes::OplogStartMissing: + case ErrorCodes::UnrecoverableRollbackError: + sleepSecondsFn(Seconds(1)); + return res.getStatus(); + default: + throw RSFatalException(res.getStatus().toString()); + } + } else { + how.commonPoint = res.getValue().first; + how.commonPointOurDiskloc = res.getValue().second; } - + } catch (const RSFatalException& e) { + error() << string(e.what()); + return Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() + << "need to rollback, but unable to determine common point between" + "local and remote oplog: " << e.what(), + 18752); + } catch (const DBException& e) { + warning() << "rollback 2 exception " << e.toString() << "; sleeping 1 min"; + + sleepSecondsFn(Seconds(60)); throw; } + } + + log() << "rollback 3 fixup"; + + replCoord->incrementRollbackID(); + try { + syncFixUp(txn, how, rollbackSource, replCoord); + } catch (const RSFatalException& e) { + error() << "exception during rollback: " << e.what(); + return Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "exception during rollback: " << e.what(), + 18753); + } catch (...) { replCoord->incrementRollbackID(); - // success - leave "ROLLBACK" state - // can go to SECONDARY once minvalid is achieved if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { - warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) << - "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) << - "but found self in " << replCoord->getMemberState(); + warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) + << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) + << "but found self in " << replCoord->getMemberState(); } - return Status::OK(); + throw; } - -} // namespace - - Status syncRollback(OperationContext* txn, - const OpTime& lastOpTimeApplied, - const OplogInterface& localOplog, - const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord, - const SleepSecondsFn& sleepSecondsFn) { - - invariant(txn); - invariant(replCoord); - - // check that we are at minvalid, otherwise we cannot rollback as we may be in an - // inconsistent state - { - OpTime minvalid = getMinValid(txn); - if( minvalid > lastOpTimeApplied ) { - severe() << "need to rollback, but in inconsistent state" << endl; - return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << - "need to rollback, but in inconsistent state. " << - "minvalid: " << minvalid.toString() << " our last optime: " << - lastOpTimeApplied.toString(), - 18750); - } - } - - log() << "beginning rollback" << rsLog; - - DisableDocumentValidation validationDisabler(txn); - txn->setReplicatedWrites(false); - Status status = _syncRollback(txn, - localOplog, - rollbackSource, - replCoord, - sleepSecondsFn); - - log() << "rollback finished" << rsLog; - return status; + replCoord->incrementRollbackID(); + + // success - leave "ROLLBACK" state + // can go to SECONDARY once minvalid is achieved + if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { + warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) + << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) + << "but found self in " << replCoord->getMemberState(); } - Status syncRollback(OperationContext* txn, - const OpTime& lastOpTimeWritten, - const OplogInterface& localOplog, - const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord) { - - return syncRollback(txn, - lastOpTimeWritten, - localOplog, - rollbackSource, - replCoord, - [](Seconds seconds) { sleepsecs(seconds.count()); }); + return Status::OK(); +} + +} // namespace + +Status syncRollback(OperationContext* txn, + const OpTime& lastOpTimeApplied, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord, + const SleepSecondsFn& sleepSecondsFn) { + invariant(txn); + invariant(replCoord); + + // check that we are at minvalid, otherwise we cannot rollback as we may be in an + // inconsistent state + { + OpTime minvalid = getMinValid(txn); + if (minvalid > lastOpTimeApplied) { + severe() << "need to rollback, but in inconsistent state" << endl; + return Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "need to rollback, but in inconsistent state. " + << "minvalid: " << minvalid.toString() + << " our last optime: " << lastOpTimeApplied.toString(), + 18750); + } } -} // namespace repl -} // namespace mongo + log() << "beginning rollback" << rsLog; + + DisableDocumentValidation validationDisabler(txn); + txn->setReplicatedWrites(false); + Status status = _syncRollback(txn, localOplog, rollbackSource, replCoord, sleepSecondsFn); + + log() << "rollback finished" << rsLog; + return status; +} + +Status syncRollback(OperationContext* txn, + const OpTime& lastOpTimeWritten, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord) { + return syncRollback(txn, + lastOpTimeWritten, + localOplog, + rollbackSource, + replCoord, + [](Seconds seconds) { sleepsecs(seconds.count()); }); +} + +} // namespace repl +} // namespace mongo |