From fb5a39c59d661021c99ba3548e4e5be2e2fb50f5 Mon Sep 17 00:00:00 2001 From: Mathias Stearn Date: Wed, 18 Jan 2017 18:25:03 -0500 Subject: SERVER-27050 Ensure upstream node doesn't roll back after checking MinValid (cherry picked from commit 0b76764eac7651ddba4c82c504aa7e8d785087c2) SERVER-25860 Allow replication rollback to drop system collections (cherry picked from commit 99e19b1ded425a1d859a9bc52fd5c2712e71f83a) SERVER-25860 Remove redundant operations during rollback (cherry picked from commit 2dec7e9a15af8e0fc4d8e68ed40e3abe90b3a3b3) SERVER-25862 Add a test of replaying a batch at startup with update and delete of same object This is a special case of SERVER-7200 that interacts with plans for SERVER-25862. (cherry picked from commit 8e7231a38341d68fb2cdc60509687397e9a17741) SERVER-27282 clean up RS rollback error handling (cherry picked from commit ef1f1739d6cbff9fb4ddbcc77d467f183c0ab9f2) (all cherry picked from v3.4 commit f4cab348460c90fcd506b2d46bf8c830b7b87379) --- .../oplog_replay_on_startup_update_and_delete.js | 48 +++ jstests/replsets/rollback_empty_ns.js | 93 ----- jstests/replsets/rollback_empty_o.js | 93 ----- jstests/replsets/rollback_empty_o2.js | 94 ----- src/mongo/db/catalog/database.cpp | 35 +- src/mongo/db/catalog/database.h | 5 + src/mongo/db/repl/bgsync.cpp | 162 +++++--- src/mongo/db/repl/bgsync.h | 4 +- src/mongo/db/repl/oplogreader.cpp | 15 +- src/mongo/db/repl/oplogreader.h | 7 +- src/mongo/db/repl/rs_rollback.cpp | 421 ++++++++------------- src/mongo/db/repl/rs_rollback.h | 85 ++++- src/mongo/db/repl/rs_rollback_test.cpp | 243 +++++++++--- 13 files changed, 630 insertions(+), 675 deletions(-) create mode 100644 jstests/replsets/oplog_replay_on_startup_update_and_delete.js delete mode 100644 jstests/replsets/rollback_empty_ns.js delete mode 100644 jstests/replsets/rollback_empty_o.js delete mode 100644 jstests/replsets/rollback_empty_o2.js diff --git a/jstests/replsets/oplog_replay_on_startup_update_and_delete.js b/jstests/replsets/oplog_replay_on_startup_update_and_delete.js new file mode 100644 index 00000000000..14d99b330a4 --- /dev/null +++ b/jstests/replsets/oplog_replay_on_startup_update_and_delete.js @@ -0,0 +1,48 @@ +// SERVER-7200 On startup, replica set nodes delete oplog state past the oplog delete point and +// apply any remaining unapplied ops before coming up as a secondary. This test specifically tests +// having an update and a delete of the same document in the same batch. This is a bit of an edge +// case because if the delete has been applied already, the update won't find any documents. +// +// @tags: [requires_persistence] +(function() { + "use strict"; + + var ns = "test.coll"; + var id = ObjectId(); + + var rst = new ReplSetTest({ + nodes: 1, + }); + + rst.startSet(); + rst.initiate(); + + var conn = rst.getPrimary(); // Waits for PRIMARY state. + + // Do the insert update and delete operations. + var coll = conn.getCollection(ns); + assert.writeOK(coll.insert({_id: id})); + assert.writeOK(coll.update({_id: id}, {$inc: {a: 1}})); + assert.writeOK(coll.remove({_id: id})); + assert.eq(coll.findOne({_id: id}), null); + + // Set the appliedThrough point back to the insert so the update and delete are replayed. + conn = rst.restart(0, {noReplSet: true}); // Restart as a standalone node. + assert.neq(null, conn, "failed to restart"); + var oplog = conn.getCollection('local.oplog.rs'); + oplog.find().forEach(printjsononeline); + assert.eq(oplog.count({ns: ns, op: 'i'}), 1); + var insertOp = oplog.findOne({ns: ns, op: 'i'}); + var term = 't' in insertOp ? insertOp.t : -1; + var minValidColl = conn.getCollection('local.replset.minvalid'); + assert.writeOK(minValidColl.update({}, {$set: {begin: {ts: insertOp.ts, t: term}}})); + printjson({minValidDoc: minValidColl.findOne()}); + + // Make sure it starts up fine again and doesn't have the document. + conn = rst.restart(0); // Restart in replSet mode again. + conn = rst.getPrimary(); // Waits for PRIMARY state. + coll = conn.getCollection(ns); + assert.eq(coll.findOne({_id: id}), null); + + rst.stopSet(); +})(); diff --git a/jstests/replsets/rollback_empty_ns.js b/jstests/replsets/rollback_empty_ns.js deleted file mode 100644 index f6a07319eb4..00000000000 --- a/jstests/replsets/rollback_empty_ns.js +++ /dev/null @@ -1,93 +0,0 @@ -// test that a rollback of an op with empty ns causes a message to be logged -// -// If all data-bearing nodes in a replica set are using an ephemeral storage engine, the set will -// not be able to survive a scenario where all data-bearing nodes are down simultaneously. In such a -// scenario, none of the members will have any data, and upon restart will each look for a member to -// inital sync from, so no primary will be elected. This test induces such a scenario, so cannot be -// run on ephemeral storage engines. -// @tags: [requires_persistence] - -// function to check the logs for an entry -doesEntryMatch = function(array, regex) { - var found = false; - for (i = 0; i < array.length; i++) { - if (regex.test(array[i])) { - found = true; - } - } - return found; -}; - -// set up a set and grab things for later -var name = "rollback_empty_ns"; -var replTest = new ReplSetTest({name: name, nodes: 3}); -var nodes = replTest.nodeList(); -var conns = replTest.startSet(); -replTest.initiate({ - "_id": name, - "members": [ - {"_id": 0, "host": nodes[0], priority: 3}, - {"_id": 1, "host": nodes[1]}, - {"_id": 2, "host": nodes[2], arbiterOnly: true} - ] -}); -var a_conn = conns[0]; -var b_conn = conns[1]; -var AID = replTest.getNodeId(a_conn); -var BID = replTest.getNodeId(b_conn); - -replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY, 60 * 1000); - -// get master and do an initial write -var master = replTest.getPrimary(); -assert(master === conns[0], "conns[0] assumed to be master"); -assert(a_conn.host === master.host, "a_conn assumed to be master"); -var options = { - writeConcern: {w: 2, wtimeout: 60000}, - upsert: true -}; -assert.writeOK(a_conn.getDB(name).foo.insert({x: 1}, options)); - -// shut down master -replTest.stop(AID); - -// insert a fake oplog entry with an empty ns -master = replTest.getPrimary(); -assert(b_conn.host === master.host, "b_conn assumed to be master"); -options = { - writeConcern: {w: 1, wtimeout: 60000}, - upsert: true -}; -// another insert to set minvalid ahead -assert.writeOK(b_conn.getDB(name).foo.insert({x: 123})); -var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0]; -oplog_entry["ts"] = Timestamp(oplog_entry["ts"].t, oplog_entry["ts"].i + 1); -oplog_entry["ns"] = ""; -assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry)); - -// shut down B and bring back the original master -replTest.stop(BID); -replTest.restart(AID); -master = replTest.getPrimary(); -assert(a_conn.host === master.host, "a_conn assumed to be master"); - -// do a write so that B will have to roll back -options = { - writeConcern: {w: 1, wtimeout: 60000}, - upsert: true -}; -assert.writeOK(a_conn.getDB(name).foo.insert({x: 2}, options)); - -// restart B, which should rollback and log a message about not rolling back empty ns'd oplog entry -replTest.restart(BID); -var msg = RegExp("ignoring op on rollback no ns TODO : "); -assert.soon(function() { - try { - var log = b_conn.getDB("admin").adminCommand({getLog: "global"}).log; - return doesEntryMatch(log, msg); - } catch (e) { - return false; - } -}, "Did not see a log entry about skipping the empty ns'd oplog entry during rollback"); - -replTest.stopSet(); diff --git a/jstests/replsets/rollback_empty_o.js b/jstests/replsets/rollback_empty_o.js deleted file mode 100644 index f3468fcde5e..00000000000 --- a/jstests/replsets/rollback_empty_o.js +++ /dev/null @@ -1,93 +0,0 @@ -// test that a rollback of an op with empty o causes a message to be logged -// -// If all data-bearing nodes in a replica set are using an ephemeral storage engine, the set will -// not be able to survive a scenario where all data-bearing nodes are down simultaneously. In such a -// scenario, none of the members will have any data, and upon restart will each look for a member to -// inital sync from, so no primary will be elected. This test induces such a scenario, so cannot be -// run on ephemeral storage engines. -// @tags: [requires_persistence] - -// function to check the logs for an entry -doesEntryMatch = function(array, regex) { - var found = false; - for (i = 0; i < array.length; i++) { - if (regex.test(array[i])) { - found = true; - } - } - return found; -}; - -// set up a set and grab things for later -var name = "rollback_empty_o"; -var replTest = new ReplSetTest({name: name, nodes: 3}); -var nodes = replTest.nodeList(); -var conns = replTest.startSet(); -replTest.initiate({ - "_id": name, - "members": [ - {"_id": 0, "host": nodes[0], priority: 3}, - {"_id": 1, "host": nodes[1]}, - {"_id": 2, "host": nodes[2], arbiterOnly: true} - ] -}); -var a_conn = conns[0]; -var b_conn = conns[1]; -var AID = replTest.getNodeId(a_conn); -var BID = replTest.getNodeId(b_conn); - -replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY, 60 * 1000); - -// get master and do an initial write -var master = replTest.getPrimary(); -assert(master === conns[0], "conns[0] assumed to be master"); -assert(a_conn.host === master.host, "a_conn assumed to be master"); -var options = { - writeConcern: {w: 2, wtimeout: 60000}, - upsert: true -}; -assert.writeOK(a_conn.getDB(name).foo.insert({x: 1}, options)); - -// shut down master -replTest.stop(AID); - -// insert a fake oplog entry with an empty o -master = replTest.getPrimary(); -assert(b_conn.host === master.host, "b_conn assumed to be master"); -options = { - writeConcern: {w: 1, wtimeout: 60000}, - upsert: true -}; -// another insert to set minvalid ahead -assert.writeOK(b_conn.getDB(name).foo.insert({x: 123})); -var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0]; -oplog_entry["ts"] = Timestamp(oplog_entry["ts"].t, oplog_entry["ts"].i + 1); -oplog_entry["o"] = {}; -assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry)); - -// shut down B and bring back the original master -replTest.stop(BID); -replTest.restart(AID); -master = replTest.getPrimary(); -assert(a_conn.host === master.host, "a_conn assumed to be master"); - -// do a write so that B will have to roll back -options = { - writeConcern: {w: 1, wtimeout: 60000}, - upsert: true -}; -assert.writeOK(a_conn.getDB(name).foo.insert({x: 2}, options)); - -// restart B, which should rollback and log a message about not rolling back empty o'd oplog entry -replTest.restart(BID); -var msg = RegExp("ignoring op on rollback : "); -assert.soon(function() { - try { - var log = b_conn.getDB("admin").adminCommand({getLog: "global"}).log; - return doesEntryMatch(log, msg); - } catch (e) { - return false; - } -}, "Did not see a log entry about skipping the empty o'd oplog entry during rollback"); - -replTest.stopSet(); diff --git a/jstests/replsets/rollback_empty_o2.js b/jstests/replsets/rollback_empty_o2.js deleted file mode 100644 index 56eb8512575..00000000000 --- a/jstests/replsets/rollback_empty_o2.js +++ /dev/null @@ -1,94 +0,0 @@ -// test that a rollback of an update with empty o2 causes a message to be logged -// -// If all data-bearing nodes in a replica set are using an ephemeral storage engine, the set will -// not be able to survive a scenario where all data-bearing nodes are down simultaneously. In such a -// scenario, none of the members will have any data, and upon restart will each look for a member to -// inital sync from, so no primary will be elected. This test induces such a scenario, so cannot be -// run on ephemeral storage engines. -// @tags: [requires_persistence] - -// function to check the logs for an entry -doesEntryMatch = function(array, regex) { - var found = false; - for (i = 0; i < array.length; i++) { - if (regex.test(array[i])) { - found = true; - } - } - return found; -}; - -// set up a set and grab things for later -var name = "rollback_empty_o2"; -var replTest = new ReplSetTest({name: name, nodes: 3}); -var nodes = replTest.nodeList(); -var conns = replTest.startSet(); -replTest.initiate({ - "_id": name, - "members": [ - {"_id": 0, "host": nodes[0], priority: 3}, - {"_id": 1, "host": nodes[1]}, - {"_id": 2, "host": nodes[2], arbiterOnly: true} - ] -}); -var a_conn = conns[0]; -var b_conn = conns[1]; -var AID = replTest.getNodeId(a_conn); -var BID = replTest.getNodeId(b_conn); - -replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY, 60 * 1000); - -// get master and do an initial write -var master = replTest.getPrimary(); -assert(master === conns[0], "conns[0] assumed to be master"); -assert(a_conn.host === master.host, "a_conn assumed to be master"); -var options = { - writeConcern: {w: 2, wtimeout: 60000}, - upsert: true -}; -assert.writeOK(a_conn.getDB(name).foo.insert({x: 1}, options)); - -// shut down master -replTest.stop(AID); - -// insert a fake oplog entry with an empty o2 -master = replTest.getPrimary(); -assert(b_conn.host === master.host, "b_conn assumed to be master"); -options = { - writeConcern: {w: 1, wtimeout: 60000}, - upsert: true -}; -// another insert to set minvalid ahead -assert.writeOK(b_conn.getDB(name).foo.insert({x: 123})); -var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0]; -oplog_entry["ts"] = Timestamp(oplog_entry["ts"].t, oplog_entry["ts"].i + 1); -oplog_entry["op"] = "u"; -oplog_entry["o2"] = {}; -assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry)); - -// shut down B and bring back the original master -replTest.stop(BID); -replTest.restart(AID); -master = replTest.getPrimary(); -assert(a_conn.host === master.host, "a_conn assumed to be master"); - -// do a write so that B will have to roll back -options = { - writeConcern: {w: 1, wtimeout: 60000}, - upsert: true -}; -assert.writeOK(a_conn.getDB(name).foo.insert({x: 2}, options)); - -// restart B, which should rollback and log a message about not rolling back empty o2'd oplog entry -replTest.restart(BID); -var msg = RegExp("ignoring op on rollback : "); -assert.soon(function() { - try { - var log = b_conn.getDB("admin").adminCommand({getLog: "global"}).log; - return doesEntryMatch(log, msg); - } catch (e) { - return false; - } -}, "Did not see a log entry about skipping the empty o2'd oplog entry during rollback"); - -replTest.stopSet(); diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp index 9b68549f295..13b4b7faa01 100644 --- a/src/mongo/db/catalog/database.cpp +++ b/src/mongo/db/catalog/database.cpp @@ -315,14 +315,8 @@ void Database::getStats(OperationContext* opCtx, BSONObjBuilder* output, double } Status Database::dropCollection(OperationContext* txn, StringData fullns) { - invariant(txn->lockState()->isDbLockedForMode(name(), MODE_X)); - - LOG(1) << "dropCollection: " << fullns << endl; - massertNamespaceNotIndex(fullns, "dropCollection"); - - Collection* collection = getCollection(fullns); - if (!collection) { - // collection doesn't exist + if (!getCollection(fullns)) { + // Collection doesn't exist so don't bother validating if it can be dropped. return Status::OK(); } @@ -341,9 +335,24 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) { } } + return dropCollectionEvenIfSystem(txn, nss); +} + +Status Database::dropCollectionEvenIfSystem(OperationContext* txn, const NamespaceString& fullns) { + invariant(txn->lockState()->isDbLockedForMode(name(), MODE_X)); + + LOG(1) << "dropCollection: " << fullns; + + Collection* collection = getCollection(fullns); + if (!collection) { + return Status::OK(); // Post condition already met. + } + + massertNamespaceNotIndex(fullns.toString(), "dropCollection"); + BackgroundOperation::assertNoBgOpInProgForNs(fullns); - audit::logDropCollection(&cc(), fullns); + audit::logDropCollection(&cc(), fullns.toString()); Status s = collection->getIndexCatalog()->dropAllIndexes(txn, true); if (!s.isOK()) { @@ -355,13 +364,13 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) { verify(collection->_details->getTotalIndexCount(txn) == 0); LOG(1) << "\t dropIndexes done" << endl; - Top::get(txn->getClient()->getServiceContext()).collectionDropped(fullns); + Top::get(txn->getClient()->getServiceContext()).collectionDropped(fullns.toString()); // We want to destroy the Collection object before telling the StorageEngine to destroy the // RecordStore. - _clearCollectionCache(txn, fullns, "collection dropped"); + _clearCollectionCache(txn, fullns.toString(), "collection dropped"); - s = _dbEntry->dropCollection(txn, fullns); + s = _dbEntry->dropCollection(txn, fullns.toString()); if (!s.isOK()) return s; @@ -378,7 +387,7 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) { } } - getGlobalServiceContext()->getOpObserver()->onDropCollection(txn, nss); + getGlobalServiceContext()->getOpObserver()->onDropCollection(txn, fullns); return Status::OK(); } diff --git a/src/mongo/db/catalog/database.h b/src/mongo/db/catalog/database.h index 2fa20bc9358..77b6ccc865b 100644 --- a/src/mongo/db/catalog/database.h +++ b/src/mongo/db/catalog/database.h @@ -144,7 +144,12 @@ public: const DatabaseCatalogEntry* getDatabaseCatalogEntry() const; + /** + * dropCollection() will refuse to drop system collections. Use dropCollectionEvenIfSystem() if + * that is required. + */ Status dropCollection(OperationContext* txn, StringData fullns); + Status dropCollectionEvenIfSystem(OperationContext* txn, const NamespaceString& fullns); Collection* createCollection(OperationContext* txn, StringData ns, diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 7e2587b4561..37ba22bf1a7 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -54,6 +54,8 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/executor/network_interface_factory.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/thread_pool.h" @@ -242,6 +244,8 @@ void BackgroundSync::_producerThread() { return; } + invariant(!state.rollback()); + // We need to wait until initial sync has started. if (_replCoord->getMyLastAppliedOpTime().isNull()) { sleepsecs(1); @@ -296,7 +300,9 @@ void BackgroundSync::_produce(OperationContext* txn) { minValid = minValidSaved; } } - syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord); + + int rbid; + syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord, &rbid); // no server found if (syncSourceReader.getHost().empty()) { @@ -348,7 +354,8 @@ void BackgroundSync::_produce(OperationContext* txn) { lastOpTimeFetched, lastHashFetched, fetcherMaxTimeMS, - &fetcherReturnStatus); + &fetcherReturnStatus, + rbid); BSONObjBuilder cmdBob; @@ -436,19 +443,8 @@ void BackgroundSync::_produce(OperationContext* txn) { } } } - // check that we are at minvalid, otherwise we cannot roll back as we may be in an - // inconsistent state - const auto minValid = getMinValid(txn); - if (lastApplied < minValid) { - fassertNoTrace(18750, - Status(ErrorCodes::UnrecoverableRollbackError, - str::stream() - << "need to rollback, but in inconsistent state. " - << "minvalid: " << minValid.toString() - << " > our last optime: " << lastApplied.toString())); - } - _rollback(txn, source, getConnection); + _rollback(txn, source, rbid, getConnection); stop(); } else if (!fetcherReturnStatus.isOK()) { warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString(); @@ -461,7 +457,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith& OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* returnStatus) { + Status* returnStatus, + int rbid) { // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor if (!result.isOK()) { @@ -515,6 +512,46 @@ void BackgroundSync::_fetcherCallback(const StatusWith& // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. if (queryResponse.first) { + // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back + // since that could cause it to not have our required minValid point. The cursor will be + // killed if the upstream node rolls back so we don't need to keep checking. This must be + // blocking since the Fetcher doesn't give us a way to defer sending the getmores after we + // return. + auto handle = _threadPoolTaskExecutor.scheduleRemoteCommand( + {source, "admin", BSON("replSetGetRBID" << 1)}, + [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { + *returnStatus = rbidReply.response.getStatus(); + if (!returnStatus->isOK()) + return; + + const auto& rbidReplyObj = rbidReply.response.getValue().data; + *returnStatus = getStatusFromCommandResult(rbidReplyObj); + if (!returnStatus->isOK()) + return; + + const auto rbidElem = rbidReplyObj["rbid"]; + if (rbidElem.type() != NumberInt) { + *returnStatus = + Status(ErrorCodes::BadValue, + str::stream() << "Upstream node returned an " + << "rbid with invalid type " << rbidElem.type()); + return; + } + if (rbidElem.Int() != rbid) { + *returnStatus = Status(ErrorCodes::BadValue, + "Upstream node rolled back after verifying " + "that it had our MinValid point. Retrying."); + } + }); + if (!handle.isOK()) { + *returnStatus = handle.getStatus(); + return; + } + + _threadPoolTaskExecutor.wait(handle.getValue()); + if (!returnStatus->isOK()) + return; + auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith { if (firstDocToApply == lastDocToApply) { return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing"); @@ -702,40 +739,77 @@ void BackgroundSync::consume() { void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, + boost::optional requiredRBID, stdx::function getConnection) { - // Abort only when syncRollback detects we are in a unrecoverable state. - // In other cases, we log the message contained in the error status and retry later. - auto status = syncRollback(txn, - OplogInterfaceLocal(txn, rsOplogName), - RollbackSourceImpl(getConnection, source, rsOplogName), - _replCoord); - if (status.isOK()) { - // When the syncTail thread sees there is no new data by adding something to the buffer. - _signalNoNewDataForApplier(); - // Wait until the buffer is empty. - // This is an indication that syncTail has removed the sentinal marker from the buffer - // and reset its local lastAppliedOpTime via the replCoord. - while (!_buffer.empty()) { - sleepmillis(10); - if (inShutdown()) { - return; - } + // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from + // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any + // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or + // after transitioning to RECOVERING. We always transition to RECOVERING regardless of success + // or (recoverable) failure since we may be in an inconsistent state. If rollback failed before + // writing anything, SyncTail will quickly take us to SECONDARY since are are still at our + // original MinValid, which is fine because we may choose a sync source that doesn't require + // rollback. If it failed after we wrote to MinValid, then we will pick a sync source that will + // cause us to roll back to the same common point, which is fine. If we succeeded, we will be + // consistent as soon as we apply up to/through MinValid and SyncTail will make us SECONDARY + // then. + { + log() << "rollback 0"; + Lock::GlobalWrite globalWrite(txn->lockState()); + if (!_replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) { + log() << "Cannot transition from " << _replCoord->getMemberState().toString() << " to " + << MemberState(MemberState::RS_ROLLBACK).toString(); + return; } + } - // It is now safe to clear the ROLLBACK state, which may result in the applier thread - // transitioning to SECONDARY. This is safe because the applier thread has now reloaded - // the new rollback minValid from the database. - if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { - warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) - << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) - << " but found self in " << _replCoord->getMemberState(); + try { + auto status = syncRollback(txn, + OplogInterfaceLocal(txn, rsOplogName), + RollbackSourceImpl(getConnection, source, rsOplogName), + requiredRBID, + _replCoord); + + // Abort only when syncRollback detects we are in a unrecoverable state. + // WARNING: these statuses sometimes have location codes which are lost with uassertStatusOK + // so we need to check here first. + if (ErrorCodes::UnrecoverableRollbackError == status.code()) { + severe() << "Unable to complete rollback. A full resync may be needed: " << status; + fassertFailedNoTrace(28723); } - return; - } - if (ErrorCodes::UnrecoverableRollbackError == status.code()) { - fassertNoTrace(28723, status); + + // In other cases, we log the message contained in the error status and retry later. + uassertStatusOK(status); + } catch (const DBException& ex) { + // UnrecoverableRollbackError should only come from a returned status which is handled + // above. + invariant(ex.getCode() != ErrorCodes::UnrecoverableRollbackError); + + warning() << "rollback cannot complete at this time (retrying later): " << ex + << " appliedThrough=" << _replCoord->getMyLastAppliedOpTime() + << " minvalid=" << getMinValid(txn); + + // Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If + // we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this + // will also prevent us from hot-looping and putting too much load on upstream nodes. + sleepsecs(5); // 5 seconds was chosen as a completely arbitrary amount of time. + } catch (...) { + std::terminate(); + } + + // At this point we are about to leave rollback. Before we do, wait for any writes done + // as part of rollback to be durable, and then do any necessary checks that we didn't + // wind up rolling back something illegal. We must wait for the rollback to be durable + // so that if we wind up shutting down uncleanly in response to something we rolled back + // we know that we won't wind up right back in the same situation when we start back up + // because the rollback wasn't durable. + txn->recoveryUnit()->waitUntilDurable(); + + if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { + severe() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) + << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) + << " but found self in " << _replCoord->getMemberState(); + fassertFailedNoTrace(40364); } - warning() << "rollback cannot proceed at this time (retrying later): " << status; } HostAndPort BackgroundSync::getSyncTarget() { diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 489790f988d..0c1a1ea99f5 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -187,7 +187,8 @@ private: OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* returnStatus); + Status* returnStatus, + int rbid); /** * Executes a rollback. @@ -195,6 +196,7 @@ private: */ void _rollback(OperationContext* txn, const HostAndPort& source, + boost::optional requiredRBID, stdx::function getConnection); /** diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 63832b201af..f00b95e147a 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -165,7 +165,8 @@ Status OplogReader::_compareRequiredOpTimeWithQueryResponse(const OpTime& requir void OplogReader::connectToSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched, const OpTime& requiredOpTime, - ReplicationCoordinator* replCoord) { + ReplicationCoordinator* replCoord, + int* rbidOut) { const Timestamp sentinelTimestamp(duration_cast(Milliseconds(curTimeMillis64())), 0); const OpTime sentinel(sentinelTimestamp, std::numeric_limits::max()); OpTime oldestOpTimeSeen = sentinel; @@ -209,6 +210,18 @@ void OplogReader::connectToSyncSource(OperationContext* txn, replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10)); continue; } + + if (rbidOut) { + BSONObj reply; + if (!conn()->runCommand("admin", BSON("replSetGetRBID" << 1), reply)) { + log() << "Error getting rbid from " << candidate << ": " << reply; + resetConnection(); + replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10)); + continue; + } + *rbidOut = reply["rbid"].Int(); + } + // Read the first (oldest) op and confirm that it's not newer than our last // fetched op. Otherwise, we have fallen off the back of that source's oplog. BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query())); diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 1434125697a..3c983b007fc 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -153,12 +153,17 @@ public: * is left unconnected, where this->conn() equals NULL. * In the process of connecting, this function may add items to the repl coordinator's * sync source blacklist. + * + * If the rbidOut param is non-null it will be set to the rbid of the server before any data is + * fetched from it. + * * This function may throw DB exceptions. */ void connectToSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched, const OpTime& requiredOpTime, - ReplicationCoordinator* replCoord); + ReplicationCoordinator* replCoord, + int* rbidOut = nullptr); private: /** diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index c05c5a7d69c..c2162484171 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -64,6 +64,7 @@ #include "mongo/db/repl/rslog.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" /* Scenarios * @@ -120,58 +121,48 @@ namespace repl { // Failpoint which causes rollback to hang before finishing. MONGO_FP_DECLARE(rollbackHangBeforeFinish); -namespace { - -class RSFatalException : public std::exception { -public: - RSFatalException(std::string m = "replica set fatal exception") : msg(m) {} - virtual ~RSFatalException() throw(){}; - virtual const char* what() const throw() { - return msg.c_str(); - } - -private: - std::string msg; -}; - -struct DocID { - // ns and _id both point into ownedObj's buffer - BSONObj ownedObj; - const char* ns; - BSONElement _id; - bool operator<(const DocID& other) const { - int comp = strcmp(ns, other.ns); - if (comp < 0) - return true; - if (comp > 0) - return false; - return _id < other._id; - } -}; - -struct FixUpInfo { - // note this is a set -- if there are many $inc's on a single document we need to rollback, - // we only need to refetch it once. - set toRefetch; +using namespace rollback_internal; - // collections to drop - set toDrop; +bool DocID::operator<(const DocID& other) const { + int comp = strcmp(ns, other.ns); + if (comp < 0) + return true; + if (comp > 0) + return false; - // Indexes to drop. - // Key is collection namespace. Value is name of index to drop. - multimap indexesToDrop; + return _id < other._id; +} - set collectionsToResyncData; - set collectionsToResyncMetadata; +bool DocID::operator==(const DocID& other) const { + // Since this is only used for tests, going with the simple impl that reuses operator< which is + // used in the real code. + return !(*this < other || other < *this); +} - OpTime commonPoint; - RecordId commonPointOurDiskloc; +void FixUpInfo::removeAllDocsToRefetchFor(const std::string& collection) { + docsToRefetch.erase(docsToRefetch.lower_bound(DocID::minFor(collection.c_str())), + docsToRefetch.upper_bound(DocID::maxFor(collection.c_str()))); +} - int rbid; // remote server's current rollback sequence # -}; +void FixUpInfo::removeRedundantOperations() { + // These loops and their bodies can be done in any order. The final result of the FixUpInfo + // members will be the same either way. + for (const auto& collection : collectionsToDrop) { + removeAllDocsToRefetchFor(collection); + indexesToDrop.erase(collection); + collectionsToResyncMetadata.erase(collection); + } + for (const auto& collection : collectionsToResyncData) { + removeAllDocsToRefetchFor(collection); + indexesToDrop.erase(collection); + collectionsToResyncMetadata.erase(collection); + collectionsToDrop.erase(collection); + } +} -Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { +Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo, + const BSONObj& ourObj) { const char* op = ourObj.getStringField("op"); if (*op == 'n') return Status::OK(); @@ -183,14 +174,14 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { doc.ownedObj = ourObj.getOwned(); doc.ns = doc.ownedObj.getStringField("ns"); if (*doc.ns == '\0') { - warning() << "ignoring op on rollback no ns TODO : " << doc.ownedObj.toString(); - return Status::OK(); + throw RSFatalException(str::stream() + << "local op on rollback has no ns: " << doc.ownedObj.toString()); } BSONObj obj = doc.ownedObj.getObjectField(*op == 'u' ? "o2" : "o"); if (obj.isEmpty()) { - warning() << "ignoring op on rollback : " << doc.ownedObj.toString(); - return Status::OK(); + throw RSFatalException(str::stream() + << "local op on rollback has no object field: " << (doc.ownedObj)); } if (*op == 'c') { @@ -208,7 +199,7 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { // Create collection operation // { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } } string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc - fixUpInfo.toDrop.insert(ns); + fixUpInfo.collectionsToDrop.insert(ns); return Status::OK(); } else if (cmdname == "drop") { string ns = nss.db().toString() + '.' + first.valuestr(); @@ -270,7 +261,7 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { severe() << message; return Status(ErrorCodes::UnrecoverableRollbackError, message); } - auto subStatus = refetch(fixUpInfo, subopElement.Obj()); + auto subStatus = updateFixUpInfoFromLocalOplogEntry(fixUpInfo, subopElement.Obj()); if (!subStatus.isOK()) { return subStatus; } @@ -328,13 +319,45 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { throw RSFatalException(); } - fixUpInfo.toRefetch.insert(doc); + fixUpInfo.docsToRefetch.insert(doc); return Status::OK(); } +namespace { + +/** + * This must be called before making any changes to our local data and after fetching any + * information from the upstream node. If any information is fetched from the upstream node after we + * have written locally, the function must be called again. + */ +void checkRbidAndUpdateMinValid(OperationContext* txn, + const int rbid, + const RollbackSource& rollbackSource) { + // It is important that the steps are performed in order to avoid racing with upstream rollbacks + // + // 1) Get the last doc in their oplog. + // 2) Get their RBID and fail if it has changed. + // 3) Set our minValid to the previously fetched OpTime of the top of their oplog. + + const auto newMinValidDoc = rollbackSource.getLastOperation(); + if (newMinValidDoc.isEmpty()) { + uasserted(40361, "rollback error newest oplog entry on source is missing or empty"); + } + if (rbid != rollbackSource.getRollbackId()) { + // Our source rolled back itself so the data we received isn't necessarily consistent. + uasserted(40365, "rollback rbid on source changed during rollback, canceling this attempt"); + } + + // we have items we are writing that aren't from a point-in-time. thus best not to come + // online until we get to that point in freshness. + OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValidDoc)); + log() << "Setting minvalid to " << minValid; + setAppliedThrough(txn, {}); // Use top of oplog. + setMinValid(txn, minValid); +} void syncFixUp(OperationContext* txn, - FixUpInfo& fixUpInfo, + const FixUpInfo& fixUpInfo, const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord) { // fetch all first so we needn't handle interruption in a fancy way @@ -344,73 +367,45 @@ void syncFixUp(OperationContext* txn, // namespace -> doc id -> doc map> goodVersions; - BSONObj newMinValid; - // fetch all the goodVersions of each document from current primary - DocID doc; unsigned long long numFetched = 0; - try { - for (set::iterator it = fixUpInfo.toRefetch.begin(); it != fixUpInfo.toRefetch.end(); - it++) { - doc = *it; + for (auto&& doc : fixUpInfo.docsToRefetch) { + invariant(!doc._id.eoo()); // This is checked when we insert to the set. - verify(!doc._id.eoo()); - - { - // TODO : slow. lots of round trips. - numFetched++; - BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap()); - totalSize += good.objsize(); - uassert(13410, "replSet too much data to roll back", totalSize < 300 * 1024 * 1024); - - // note good might be eoo, indicating we should delete it - goodVersions[doc.ns][doc] = good; + try { + // TODO : slow. lots of round trips. + numFetched++; + BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap()); + totalSize += good.objsize(); + if (totalSize >= 300 * 1024 * 1024) { + throw RSFatalException("replSet too much data to roll back"); } + + // Note good might be empty, indicating we should delete it. + goodVersions[doc.ns][doc] = good; + } catch (const DBException& ex) { + log() << "rollback couldn't re-get from ns: " << doc.ns << " _id: " << (doc._id) << ' ' + << numFetched << '/' << fixUpInfo.docsToRefetch.size() << ": " << (ex); + throw; } - newMinValid = rollbackSource.getLastOperation(); - if (newMinValid.isEmpty()) { - error() << "rollback error newMinValid empty?"; - return; - } - } catch (const DBException& e) { - LOG(1) << "rollback re-get objects: " << e.toString(); - error() << "rollback couldn't re-get ns:" << doc.ns << " _id:" << doc._id << ' ' - << numFetched << '/' << fixUpInfo.toRefetch.size(); - throw e; } log() << "rollback 3.5"; - if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { - // Our source rolled back itself so the data we received isn't necessarily consistent. - warning() << "rollback rbid on source changed during rollback, " - << "cancelling this attempt"; - return; - } + checkRbidAndUpdateMinValid(txn, fixUpInfo.rbid, rollbackSource); // update them log() << "rollback 4 n:" << goodVersions.size(); - bool warn = false; - invariant(!fixUpInfo.commonPointOurDiskloc.isNull()); - // we have items we are writing that aren't from a point-in-time. thus best not to come - // online until we get to that point in freshness. - // TODO this is still wrong because we don't record that we are in rollback, and we can't really - // recover. - OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid)); - log() << "minvalid=" << minValid; - setAppliedThrough(txn, {}); // Use top of oplog. - setMinValid(txn, minValid); - // any full collection resyncs required? if (!fixUpInfo.collectionsToResyncData.empty() || !fixUpInfo.collectionsToResyncMetadata.empty()) { for (const string& ns : fixUpInfo.collectionsToResyncData) { log() << "rollback 4.1.1 coll resync " << ns; - fixUpInfo.indexesToDrop.erase(ns); - fixUpInfo.collectionsToResyncMetadata.erase(ns); + invariant(!fixUpInfo.indexesToDrop.count(ns)); + invariant(!fixUpInfo.collectionsToResyncMetadata.count(ns)); const NamespaceString nss(ns); @@ -421,7 +416,7 @@ void syncFixUp(OperationContext* txn, Database* db = dbHolder().openDb(txn, nss.db().toString()); invariant(db); WriteUnitOfWork wunit(txn); - db->dropCollection(txn, ns); + fassertStatusOK(40359, db->dropCollectionEvenIfSystem(txn, nss)); wunit.commit(); } @@ -443,9 +438,11 @@ void syncFixUp(OperationContext* txn, auto infoResult = rollbackSource.getCollectionInfo(nss); if (!infoResult.isOK()) { - // Collection dropped by "them" so we should drop it too. - log() << ns << " not found on remote host, dropping"; - fixUpInfo.toDrop.insert(ns); + // Collection dropped by "them" so we can't correctly change it here. If we get to + // the roll-forward phase, we will drop it then. If the drop is rolled-back upstream + // and we restart, we will be expected to still have the collection. + log() << ns << " not found on remote host, so not rolling back collmod operation." + " We will drop the collection soon."; continue; } @@ -496,56 +493,28 @@ void syncFixUp(OperationContext* txn, // we did more reading from primary, so check it again for a rollback (which would mess // us up), and make minValid newer. log() << "rollback 4.2"; - - string err; - try { - newMinValid = rollbackSource.getLastOperation(); - if (newMinValid.isEmpty()) { - err = "can't get minvalid from sync source"; - } else { - OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid)); - log() << "minvalid=" << minValid; - setMinValid(txn, minValid); - setAppliedThrough(txn, fixUpInfo.commonPoint); - } - } catch (const DBException& e) { - err = "can't get/set minvalid: "; - err += e.what(); - } - if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { - // our source rolled back itself. so the data we received isn't necessarily - // consistent. however, we've now done writes. thus we have a problem. - err += "rbid at primary changed during resync/rollback"; - } - if (!err.empty()) { - severe() << "rolling back : " << err << ". A full resync will be necessary."; - // TODO: reset minvalid so that we are permanently in fatal state - // TODO: don't be fatal, but rather, get all the data first. - throw RSFatalException(); - } - log() << "rollback 4.3"; + checkRbidAndUpdateMinValid(txn, fixUpInfo.rbid, rollbackSource); } log() << "rollback 4.6"; - // drop collections to drop before doing individual fixups - that might make things faster - // below actually if there were subsequent inserts to rollback - for (set::iterator it = fixUpInfo.toDrop.begin(); it != fixUpInfo.toDrop.end(); it++) { + // drop collections to drop before doing individual fixups + for (set::iterator it = fixUpInfo.collectionsToDrop.begin(); + it != fixUpInfo.collectionsToDrop.end(); + it++) { log() << "rollback drop: " << *it; - fixUpInfo.indexesToDrop.erase(*it); + invariant(!fixUpInfo.indexesToDrop.count(*it)); ScopedTransaction transaction(txn, MODE_IX); const NamespaceString nss(*it); Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X); Database* db = dbHolder().get(txn, nsToDatabaseSubstring(*it)); if (db) { - WriteUnitOfWork wunit(txn); - Helpers::RemoveSaver removeSaver("rollback", "", *it); // perform a collection scan and write all documents in the collection to disk std::unique_ptr exec(InternalPlanner::collectionScan( - txn, *it, db->getCollection(*it), PlanExecutor::YIELD_MANUAL)); + txn, *it, db->getCollection(*it), PlanExecutor::YIELD_AUTO)); BSONObj curObj; PlanExecutor::ExecState execState; while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) { @@ -570,7 +539,8 @@ void syncFixUp(OperationContext* txn, throw RSFatalException(); } - db->dropCollection(txn, *it); + WriteUnitOfWork wunit(txn); + fassertStatusOK(40360, db->dropCollectionEvenIfSystem(txn, nss)); wunit.commit(); } } @@ -622,9 +592,8 @@ void syncFixUp(OperationContext* txn, // while rolling back createCollection operations. const auto& ns = nsAndGoodVersionsByDocID.first; unique_ptr removeSaver; - if (!fixUpInfo.toDrop.count(ns)) { - removeSaver.reset(new Helpers::RemoveSaver("rollback", "", ns)); - } + invariant(!fixUpInfo.collectionsToDrop.count(ns)); + removeSaver.reset(new Helpers::RemoveSaver("rollback", "", ns)); const auto& goodVersionsByDocID = nsAndGoodVersionsByDocID.second; for (const auto& idAndDoc : goodVersionsByDocID) { @@ -639,10 +608,7 @@ void syncFixUp(OperationContext* txn, BSONObj pattern = doc._id.wrap(); // { _id : ... } try { verify(doc.ns && *doc.ns); - if (fixUpInfo.collectionsToResyncData.count(doc.ns)) { - // We just synced this entire collection. - continue; - } + invariant(!fixUpInfo.collectionsToResyncData.count(doc.ns)); // TODO: Lots of overhead in context. This can be faster. const NamespaceString docNss(doc.ns); @@ -715,8 +681,13 @@ void syncFixUp(OperationContext* txn, } } } catch (const DBException& e) { - error() << "rolling back capped collection rec " << doc.ns << ' ' - << e.toString(); + // Replicated capped collections have many ways to become + // inconsistent. We rely on age-out to make these problems go away + // eventually. + warning() << "ignoring failure to roll back change to capped " + << "collection " << doc.ns << " with _id " + << idAndDoc.first._id.toString( + /*includeFieldName*/ false) << ": " << e; } } else { deleteObjects(txn, @@ -727,25 +698,6 @@ void syncFixUp(OperationContext* txn, true, // justone true); // god } - // did we just empty the collection? if so let's check if it even - // exists on the source. - if (collection->numRecords(txn) == 0) { - try { - NamespaceString nss(doc.ns); - auto infoResult = rollbackSource.getCollectionInfo(nss); - if (!infoResult.isOK()) { - // we should drop - WriteUnitOfWork wunit(txn); - ctx.db()->dropCollection(txn, doc.ns); - wunit.commit(); - } - } catch (const DBException& ex) { - // Failed to run listCollections command on sync source. - // This isn't *that* big a deal, but is bad. - warning() << "rollback error querying for existence of " << doc.ns - << " at the primary, ignoring: " << ex; - } - } } } else { // TODO faster... @@ -766,8 +718,8 @@ void syncFixUp(OperationContext* txn, } } catch (const DBException& e) { log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' ' - << e.toString() << " ndeletes:" << deletes; - warn = true; + << e << " ndeletes:" << deletes; + throw; } } } @@ -795,101 +747,69 @@ void syncFixUp(OperationContext* txn, Status status = getGlobalAuthorizationManager()->initialize(txn); if (!status.isOK()) { - warning() << "Failed to reinitialize auth data after rollback: " << status; - warn = true; + severe() << "Failed to reinitialize auth data after rollback: " << status; + fassertFailedNoTrace(40366); } // Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the // lastAppliedHash value in bgsync to reflect our new last op. replCoord->resetLastOpTimesFromOplog(txn); - - // done - if (warn) - warning() << "issues during syncRollback, see log"; - else - log() << "rollback done"; + log() << "rollback done"; } Status _syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord, - const SleepSecondsFn& sleepSecondsFn) { + boost::optional requiredRBID, + ReplicationCoordinator* replCoord) { invariant(!txn->lockState()->isLocked()); - log() << "rollback 0"; - - /** by doing this, we will not service reads (return an error as we aren't in secondary - * state. that perhaps is moot because of the write lock above, but that write lock - * probably gets deferred or removed or yielded later anyway. - * - * also, this is better for status reporting - we know what is happening. - */ - { - Lock::GlobalWrite globalWrite(txn->lockState()); - if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) { - return Status(ErrorCodes::OperationFailed, - str::stream() << "Cannot transition from " - << replCoord->getMemberState().toString() << " to " - << MemberState(MemberState::RS_ROLLBACK).toString()); - } - } - FixUpInfo how; log() << "rollback 1"; how.rbid = rollbackSource.getRollbackId(); - { - log() << "rollback 2 FindCommonPoint"; - try { - auto processOperationForFixUp = - [&how](const BSONObj& operation) { return refetch(how, operation); }; - auto res = syncRollBackLocalOperations( - localOplog, rollbackSource.getOplog(), processOperationForFixUp); - if (!res.isOK()) { - const auto status = res.getStatus(); - switch (status.code()) { - case ErrorCodes::OplogStartMissing: - case ErrorCodes::UnrecoverableRollbackError: - sleepSecondsFn(Seconds(1)); - return status; - default: - throw RSFatalException(status.toString()); - } - } else { - how.commonPoint = res.getValue().first; - how.commonPointOurDiskloc = res.getValue().second; + if (requiredRBID) { + uassert(40362, + "Upstream node rolled back. Need to retry our rollback.", + how.rbid == *requiredRBID); + } + + log() << "rollback 2 FindCommonPoint"; + try { + auto processOperationForFixUp = [&how](const BSONObj& operation) { + return updateFixUpInfoFromLocalOplogEntry(how, operation); + }; + auto res = syncRollBackLocalOperations( + localOplog, rollbackSource.getOplog(), processOperationForFixUp); + if (!res.isOK()) { + const auto status = res.getStatus(); + switch (status.code()) { + case ErrorCodes::OplogStartMissing: + case ErrorCodes::UnrecoverableRollbackError: + return status; + default: + throw RSFatalException(status.toString()); } - } catch (const RSFatalException& e) { - error() << string(e.what()); - return Status(ErrorCodes::UnrecoverableRollbackError, - str::stream() - << "need to rollback, but unable to determine common point between" - " local and remote oplog: " << e.what(), - 18752); - } catch (const DBException& e) { - warning() << "rollback 2 exception " << e.toString() << "; sleeping 1 min"; - - sleepSecondsFn(Seconds(60)); - throw; } + + how.commonPoint = res.getValue().first; + how.commonPointOurDiskloc = res.getValue().second; + how.removeRedundantOperations(); + } catch (const RSFatalException& e) { + return Status(ErrorCodes::UnrecoverableRollbackError, + str::stream() + << "need to rollback, but unable to determine common point between" + " local and remote oplog: " << e.what(), + 18752); } + log() << "rollback common point is " << how.commonPoint; log() << "rollback 3 fixup"; - - replCoord->incrementRollbackID(); try { + ON_BLOCK_EXIT([&] { replCoord->incrementRollbackID(); }); syncFixUp(txn, how, rollbackSource, replCoord); } catch (const RSFatalException& e) { - error() << "exception during rollback: " << e.what(); - return Status(ErrorCodes::UnrecoverableRollbackError, - str::stream() << "exception during rollback: " << e.what(), - 18753); - } catch (...) { - replCoord->incrementRollbackID(); - - throw; + return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753); } - replCoord->incrementRollbackID(); if (MONGO_FAIL_POINT(rollbackHangBeforeFinish)) { // This log output is used in js tests so please leave it. @@ -900,8 +820,6 @@ Status _syncRollback(OperationContext* txn, } } - // Success; leave "ROLLBACK" state intact until applier thread has reloaded the new minValid. - // Otherwise, the applier could transition the node to SECONDARY with an out-of-date minValid. return Status::OK(); } @@ -910,8 +828,8 @@ Status _syncRollback(OperationContext* txn, Status syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord, - const SleepSecondsFn& sleepSecondsFn) { + boost::optional requiredRBID, + ReplicationCoordinator* replCoord) { invariant(txn); invariant(replCoord); @@ -919,22 +837,11 @@ Status syncRollback(OperationContext* txn, DisableDocumentValidation validationDisabler(txn); txn->setReplicatedWrites(false); - Status status = _syncRollback(txn, localOplog, rollbackSource, replCoord, sleepSecondsFn); + Status status = _syncRollback(txn, localOplog, rollbackSource, requiredRBID, replCoord); log() << "rollback finished" << rsLog; return status; } -Status syncRollback(OperationContext* txn, - const OplogInterface& localOplog, - const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord) { - return syncRollback(txn, - localOplog, - rollbackSource, - replCoord, - [](Seconds seconds) { sleepsecs(durationCount(seconds)); }); -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index c88e9fd27c0..8ee7dd04367 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -28,12 +28,10 @@ #pragma once -#include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" -#include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" -#include "mongo/stdx/functional.h" -#include "mongo/util/time_support.h" +#include "mongo/db/record_id.h" +#include "mongo/db/repl/optime.h" namespace mongo { @@ -44,7 +42,6 @@ class OperationContext; namespace repl { class OplogInterface; -class OpTime; class ReplicationCoordinator; class RollbackSource; @@ -58,7 +55,7 @@ class RollbackSource; * - undo operations by fetching all documents affected, then replaying * the sync source's oplog until we reach the time in the oplog when we fetched the last * document. - * This function can throw std::exception on failures. + * This function can throw exceptions on failures. * This function runs a command on the sync source to detect if the sync source rolls back * while our rollback is in progress. * @@ -69,21 +66,77 @@ class RollbackSource; * supports fetching documents and copying collections. * @param replCoord Used to track the rollback ID and to change the follower state * - * Failures: Most failures are returned as a status but some failures throw an std::exception. + * If requiredRBID is supplied, we error if the upstream node has a different RBID (ie it rolled + * back) after fetching any information from it. + * + * Failures: If a Status with code UnrecoverableRollbackError is returned, the caller must exit + * fatally. All other errors should be considered recoverable regardless of whether reported as a + * status or exception. */ - -using SleepSecondsFn = stdx::function; - -Status syncRollback(OperationContext* txn, - const OplogInterface& localOplog, - const RollbackSource& rollbackSource, - ReplicationCoordinator* replCoord, - const SleepSecondsFn& sleepSecondsFn); - Status syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, + boost::optional requiredRBID, ReplicationCoordinator* replCoord); +/** + * This namespace contains internal details of the rollback system. It is only exposed in a header + * for unittesting. Nothing here should be used outside of rs_rollback.cpp or its unittest. + */ +namespace rollback_internal { + +struct DocID { + BSONObj ownedObj; + const char* ns; + BSONElement _id; + bool operator<(const DocID& other) const; + bool operator==(const DocID& other) const; + + static DocID minFor(const char* ns) { + auto obj = BSON("" << MINKEY); + return {obj, ns, obj.firstElement()}; + } + + static DocID maxFor(const char* ns) { + auto obj = BSON("" << MAXKEY); + return {obj, ns, obj.firstElement()}; + } +}; + +struct FixUpInfo { + // note this is a set -- if there are many $inc's on a single document we need to rollback, + // we only need to refetch it once. + std::set docsToRefetch; + + // Key is collection namespace. Value is name of index to drop. + std::multimap indexesToDrop; + + std::set collectionsToDrop; + std::set collectionsToResyncData; + std::set collectionsToResyncMetadata; + + OpTime commonPoint; + RecordId commonPointOurDiskloc; + + int rbid; // remote server's current rollback sequence # + + void removeAllDocsToRefetchFor(const std::string& collection); + void removeRedundantOperations(); +}; + +// Indicates that rollback cannot complete and the server must abort. +class RSFatalException : public std::exception { +public: + RSFatalException(std::string m = "replica set fatal exception") : msg(m) {} + virtual const char* what() const throw() { + return msg.c_str(); + } + +private: + std::string msg; +}; + +Status updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo, const BSONObj& ourObj); +} // namespace rollback_internal } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 3f768d79f4b..3a324aff179 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -33,6 +33,7 @@ #include #include +#include "mongo/bson/json.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" @@ -63,6 +64,7 @@ namespace { using namespace mongo; using namespace mongo::repl; +using namespace mongo::repl::rollback_internal; const OplogInterfaceMock::Operations kEmptyMockOperations; @@ -173,8 +175,6 @@ void RSRollbackTest::tearDown() { setGlobalReplicationCoordinator(nullptr); } -void noSleep(Seconds seconds) {} - TEST_F(RSRollbackTest, InconsistentMinValid) { repl::setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0)); repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0)); @@ -182,36 +182,12 @@ TEST_F(RSRollbackTest, InconsistentMinValid) { OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr( new OplogInterfaceMock(kEmptyMockOperations))), - _coordinator, - noSleep); + {}, + _coordinator); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); } -TEST_F(RSRollbackTest, SetFollowerModeFailed) { - class ReplicationCoordinatorSetFollowerModeMock : public ReplicationCoordinatorMock { - public: - ReplicationCoordinatorSetFollowerModeMock() - : ReplicationCoordinatorMock(createReplSettings()) {} - MemberState getMemberState() const override { - return MemberState::RS_DOWN; - } - bool setFollowerMode(const MemberState& newState) override { - return false; - } - }; - _coordinator = new ReplicationCoordinatorSetFollowerModeMock(); - setGlobalReplicationCoordinator(_coordinator); - - ASSERT_EQUALS(ErrorCodes::OperationFailed, - syncRollback(_txn.get(), - OplogInterfaceMock(kEmptyMockOperations), - RollbackSourceMock(std::unique_ptr( - new OplogInterfaceMock(kEmptyMockOperations))), - _coordinator, - noSleep).code()); -} - TEST_F(RSRollbackTest, OplogStartMissing) { OpTime ts(Timestamp(Seconds(1), 0), 0); auto operation = @@ -223,8 +199,8 @@ TEST_F(RSRollbackTest, OplogStartMissing) { RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({ operation, }))), - _coordinator, - noSleep).code()); + {}, + _coordinator).code()); } TEST_F(RSRollbackTest, NoRemoteOpLog) { @@ -235,8 +211,8 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) { OplogInterfaceMock({operation}), RollbackSourceMock(std::unique_ptr( new OplogInterfaceMock(kEmptyMockOperations))), - _coordinator, - noSleep); + {}, + _coordinator); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); } @@ -257,12 +233,36 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) { OplogInterfaceMock({operation}), RollbackSourceLocal(std::unique_ptr( new OplogInterfaceMock(kEmptyMockOperations))), - _coordinator, - noSleep), + {}, + _coordinator), UserException, ErrorCodes::UnknownError); } +TEST_F(RSRollbackTest, RemoteGetRollbackIdDiffersFromRequiredRBID) { + OpTime ts(Timestamp(Seconds(1), 0), 0); + auto operation = + std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr oplog) + : RollbackSourceMock(std::move(oplog)) {} + int getRollbackId() const override { + return 2; + } + }; + + ASSERT_THROWS_CODE(syncRollback(_txn.get(), + OplogInterfaceMock({operation}), + RollbackSourceLocal(std::unique_ptr( + new OplogInterfaceMock(kEmptyMockOperations))), + {1}, + _coordinator), + UserException, + ErrorCodes::Error(40362)); +} + TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) { createOplog(_txn.get()); OpTime ts(Timestamp(Seconds(1), 0), 1); @@ -274,8 +274,8 @@ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) { RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({ operation, }))), - _coordinator, - noSleep)); + {}, + _coordinator)); } /** @@ -341,8 +341,8 @@ int _testRollbackDelete(OperationContext* txn, ASSERT_OK(syncRollback(txn, OplogInterfaceMock({deleteOperation, commonOperation}), rollbackSource, - coordinator, - noSleep)); + {}, + coordinator)); ASSERT_TRUE(rollbackSource.called); Lock::DBLock dbLock(txn->lockState(), "test", MODE_S); @@ -415,8 +415,8 @@ TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep); + {}, + _coordinator); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); @@ -474,8 +474,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) { _txn.get(), OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep)); + {}, + _coordinator)); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("rollback drop index: collection: test.t. index: a_1")); @@ -531,8 +531,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep)); + {}, + _coordinator)); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("rollback drop index: collection: test.t. index: a_1")); @@ -578,8 +578,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep); + {}, + _coordinator); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); @@ -622,8 +622,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep); + {}, + _coordinator); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); @@ -665,8 +665,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep); + {}, + _coordinator); stopCapturingLogMessages(); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18752, status.location()); @@ -700,8 +700,8 @@ TEST_F(RSRollbackTest, RollbackUnknownCommand) { RollbackSourceMock(std::unique_ptr(new OplogInterfaceMock({ commonOperation, }))), - _coordinator, - noSleep); + {}, + _coordinator); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18751, status.location()); } @@ -735,11 +735,51 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({dropCollectionOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep)); + {}, + _coordinator)); ASSERT_TRUE(rollbackSource.called); } +TEST_F(RSRollbackTest, RollbackDropCollectionCommandFailsIfRBIDChangesWhileSyncingCollection) { + createOplog(_txn.get()); + auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + auto dropCollectionOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" + << "c" + << "ns" + << "test.t" + << "o" << BSON("drop" + << "t")), + RecordId(2)); + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr oplog) + : RollbackSourceMock(std::move(oplog)), copyCollectionCalled(false) {} + int getRollbackId() const override { + return copyCollectionCalled ? 1 : 0; + } + void copyCollectionFromRemote(OperationContext* txn, + const NamespaceString& nss) const override { + copyCollectionCalled = true; + } + mutable bool copyCollectionCalled; + }; + RollbackSourceLocal rollbackSource(std::unique_ptr(new OplogInterfaceMock({ + commonOperation, + }))); + + _createCollection(_txn.get(), "test.t", CollectionOptions()); + ASSERT_THROWS_CODE(syncRollback(_txn.get(), + OplogInterfaceMock({dropCollectionOperation, commonOperation}), + rollbackSource, + 0, + _coordinator), + DBException, + 40365); + ASSERT(rollbackSource.copyCollectionCalled); +} + BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list ops) { BSONObjBuilder entry; entry << "ts" << ts << "h" << 1LL << "op" @@ -848,8 +888,8 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({applyOpsOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep)); + {}, + _coordinator)); ASSERT_EQUALS(4U, rollbackSource.searchedIds.size()); ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1)); ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2)); @@ -887,8 +927,8 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({createCollectionOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep)); + {}, + _coordinator)); { Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S); auto db = dbHolder().get(_txn.get(), "test"); @@ -928,8 +968,8 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep)); + {}, + _coordinator)); stopCapturingLogMessages(); ASSERT_TRUE(rollbackSource.called); for (const auto& message : getCapturedLogMessages()) { @@ -967,10 +1007,89 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt syncRollback(_txn.get(), OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, - _coordinator, - noSleep); + {}, + _coordinator); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); ASSERT_EQUALS(18753, status.location()); } +TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) { + const auto validOplogEntry = fromjson("{op: 'i', ns: 'test.t', o: {_id:1, a: 1}}"); + FixUpInfo fui; + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry)); + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry.removeField("ns")), + RSFatalException); +} + +TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) { + const auto validOplogEntry = fromjson("{op: 'i', ns: 'test.t', o: {_id:1, a: 1}}"); + FixUpInfo fui; + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry)); + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry.removeField("o")), + RSFatalException); +} + +TEST(RSRollbackTest, LocalEntryWithoutO2IsFatal) { + const auto validOplogEntry = + fromjson("{op: 'u', ns: 'test.t', o2: {_id: 1}, o: {_id:1, a: 1}}"); + FixUpInfo fui; + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry)); + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry.removeField("o2")), + RSFatalException); +} + +// The testcases used here are trying to detect off-by-one errors in +// FixUpInfo::removeAllDocsToRefectchFor. +TEST(FixUpInfoTest, RemoveAllDocsToRefetchForWorks) { + const auto normalHolder = BSON("" << OID::gen()); + const auto normalKey = normalHolder.firstElement(); + + // Can't use ASSERT_EQ with this since it isn't ostream-able. Failures will at least give you + // the size. If that isn't enough, use GDB. + using DocSet = std::set; + + FixUpInfo fui; + fui.docsToRefetch = { + DocID::minFor("a"), + DocID{{}, "a", normalKey}, + DocID::maxFor("a"), + + DocID::minFor("b"), + DocID{{}, "b", normalKey}, + DocID::maxFor("b"), + + DocID::minFor("c"), + DocID{{}, "c", normalKey}, + DocID::maxFor("c"), + }; + + // Remove from the middle. + fui.removeAllDocsToRefetchFor("b"); + ASSERT((fui.docsToRefetch == + DocSet{ + DocID::minFor("a"), + DocID{{}, "a", normalKey}, + DocID::maxFor("a"), + + DocID::minFor("c"), + DocID{{}, "c", normalKey}, + DocID::maxFor("c"), + })) + << "remaining docs: " << fui.docsToRefetch.size(); + + // Remove from the end. + fui.removeAllDocsToRefetchFor("c"); + ASSERT((fui.docsToRefetch == + DocSet{ + DocID::minFor("a"), // This comment helps clang-format. + DocID{{}, "a", normalKey}, + DocID::maxFor("a"), + })) + << "remaining docs: " << fui.docsToRefetch.size(); + + // Everything else. + fui.removeAllDocsToRefetchFor("a"); + ASSERT((fui.docsToRefetch == DocSet{})) << "remaining docs: " << fui.docsToRefetch.size(); +} + } // namespace -- cgit v1.2.1