summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <redbeard0531@gmail.com>2017-01-18 18:25:03 -0500
committerMathias Stearn <redbeard0531@gmail.com>2017-01-20 18:25:42 -0500
commitfb5a39c59d661021c99ba3548e4e5be2e2fb50f5 (patch)
tree62e834399945e0fcb60992183fcd4173bf7cc7f1
parentbcca51caf0054c7def89d7588a1c99c6b2513a79 (diff)
downloadmongo-fb5a39c59d661021c99ba3548e4e5be2e2fb50f5.tar.gz
SERVER-27050 Ensure upstream node doesn't roll back after checking MinValid
(cherry picked from commit 0b76764eac7651ddba4c82c504aa7e8d785087c2) SERVER-25860 Allow replication rollback to drop system collections (cherry picked from commit 99e19b1ded425a1d859a9bc52fd5c2712e71f83a) SERVER-25860 Remove redundant operations during rollback (cherry picked from commit 2dec7e9a15af8e0fc4d8e68ed40e3abe90b3a3b3) SERVER-25862 Add a test of replaying a batch at startup with update and delete of same object This is a special case of SERVER-7200 that interacts with plans for SERVER-25862. (cherry picked from commit 8e7231a38341d68fb2cdc60509687397e9a17741) SERVER-27282 clean up RS rollback error handling (cherry picked from commit ef1f1739d6cbff9fb4ddbcc77d467f183c0ab9f2) (all cherry picked from v3.4 commit f4cab348460c90fcd506b2d46bf8c830b7b87379)
-rw-r--r--jstests/replsets/oplog_replay_on_startup_update_and_delete.js48
-rw-r--r--jstests/replsets/rollback_empty_ns.js93
-rw-r--r--jstests/replsets/rollback_empty_o.js93
-rw-r--r--jstests/replsets/rollback_empty_o2.js94
-rw-r--r--src/mongo/db/catalog/database.cpp35
-rw-r--r--src/mongo/db/catalog/database.h5
-rw-r--r--src/mongo/db/repl/bgsync.cpp162
-rw-r--r--src/mongo/db/repl/bgsync.h4
-rw-r--r--src/mongo/db/repl/oplogreader.cpp15
-rw-r--r--src/mongo/db/repl/oplogreader.h7
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp421
-rw-r--r--src/mongo/db/repl/rs_rollback.h85
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp243
13 files changed, 630 insertions, 675 deletions
diff --git a/jstests/replsets/oplog_replay_on_startup_update_and_delete.js b/jstests/replsets/oplog_replay_on_startup_update_and_delete.js
new file mode 100644
index 00000000000..14d99b330a4
--- /dev/null
+++ b/jstests/replsets/oplog_replay_on_startup_update_and_delete.js
@@ -0,0 +1,48 @@
+// SERVER-7200 On startup, replica set nodes delete oplog state past the oplog delete point and
+// apply any remaining unapplied ops before coming up as a secondary. This test specifically tests
+// having an update and a delete of the same document in the same batch. This is a bit of an edge
+// case because if the delete has been applied already, the update won't find any documents.
+//
+// @tags: [requires_persistence]
+(function() {
+ "use strict";
+
+ var ns = "test.coll";
+ var id = ObjectId();
+
+ var rst = new ReplSetTest({
+ nodes: 1,
+ });
+
+ rst.startSet();
+ rst.initiate();
+
+ var conn = rst.getPrimary(); // Waits for PRIMARY state.
+
+ // Do the insert update and delete operations.
+ var coll = conn.getCollection(ns);
+ assert.writeOK(coll.insert({_id: id}));
+ assert.writeOK(coll.update({_id: id}, {$inc: {a: 1}}));
+ assert.writeOK(coll.remove({_id: id}));
+ assert.eq(coll.findOne({_id: id}), null);
+
+ // Set the appliedThrough point back to the insert so the update and delete are replayed.
+ conn = rst.restart(0, {noReplSet: true}); // Restart as a standalone node.
+ assert.neq(null, conn, "failed to restart");
+ var oplog = conn.getCollection('local.oplog.rs');
+ oplog.find().forEach(printjsononeline);
+ assert.eq(oplog.count({ns: ns, op: 'i'}), 1);
+ var insertOp = oplog.findOne({ns: ns, op: 'i'});
+ var term = 't' in insertOp ? insertOp.t : -1;
+ var minValidColl = conn.getCollection('local.replset.minvalid');
+ assert.writeOK(minValidColl.update({}, {$set: {begin: {ts: insertOp.ts, t: term}}}));
+ printjson({minValidDoc: minValidColl.findOne()});
+
+ // Make sure it starts up fine again and doesn't have the document.
+ conn = rst.restart(0); // Restart in replSet mode again.
+ conn = rst.getPrimary(); // Waits for PRIMARY state.
+ coll = conn.getCollection(ns);
+ assert.eq(coll.findOne({_id: id}), null);
+
+ rst.stopSet();
+})();
diff --git a/jstests/replsets/rollback_empty_ns.js b/jstests/replsets/rollback_empty_ns.js
deleted file mode 100644
index f6a07319eb4..00000000000
--- a/jstests/replsets/rollback_empty_ns.js
+++ /dev/null
@@ -1,93 +0,0 @@
-// test that a rollback of an op with empty ns causes a message to be logged
-//
-// If all data-bearing nodes in a replica set are using an ephemeral storage engine, the set will
-// not be able to survive a scenario where all data-bearing nodes are down simultaneously. In such a
-// scenario, none of the members will have any data, and upon restart will each look for a member to
-// inital sync from, so no primary will be elected. This test induces such a scenario, so cannot be
-// run on ephemeral storage engines.
-// @tags: [requires_persistence]
-
-// function to check the logs for an entry
-doesEntryMatch = function(array, regex) {
- var found = false;
- for (i = 0; i < array.length; i++) {
- if (regex.test(array[i])) {
- found = true;
- }
- }
- return found;
-};
-
-// set up a set and grab things for later
-var name = "rollback_empty_ns";
-var replTest = new ReplSetTest({name: name, nodes: 3});
-var nodes = replTest.nodeList();
-var conns = replTest.startSet();
-replTest.initiate({
- "_id": name,
- "members": [
- {"_id": 0, "host": nodes[0], priority: 3},
- {"_id": 1, "host": nodes[1]},
- {"_id": 2, "host": nodes[2], arbiterOnly: true}
- ]
-});
-var a_conn = conns[0];
-var b_conn = conns[1];
-var AID = replTest.getNodeId(a_conn);
-var BID = replTest.getNodeId(b_conn);
-
-replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY, 60 * 1000);
-
-// get master and do an initial write
-var master = replTest.getPrimary();
-assert(master === conns[0], "conns[0] assumed to be master");
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-var options = {
- writeConcern: {w: 2, wtimeout: 60000},
- upsert: true
-};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 1}, options));
-
-// shut down master
-replTest.stop(AID);
-
-// insert a fake oplog entry with an empty ns
-master = replTest.getPrimary();
-assert(b_conn.host === master.host, "b_conn assumed to be master");
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-// another insert to set minvalid ahead
-assert.writeOK(b_conn.getDB(name).foo.insert({x: 123}));
-var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0];
-oplog_entry["ts"] = Timestamp(oplog_entry["ts"].t, oplog_entry["ts"].i + 1);
-oplog_entry["ns"] = "";
-assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry));
-
-// shut down B and bring back the original master
-replTest.stop(BID);
-replTest.restart(AID);
-master = replTest.getPrimary();
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-
-// do a write so that B will have to roll back
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 2}, options));
-
-// restart B, which should rollback and log a message about not rolling back empty ns'd oplog entry
-replTest.restart(BID);
-var msg = RegExp("ignoring op on rollback no ns TODO : ");
-assert.soon(function() {
- try {
- var log = b_conn.getDB("admin").adminCommand({getLog: "global"}).log;
- return doesEntryMatch(log, msg);
- } catch (e) {
- return false;
- }
-}, "Did not see a log entry about skipping the empty ns'd oplog entry during rollback");
-
-replTest.stopSet();
diff --git a/jstests/replsets/rollback_empty_o.js b/jstests/replsets/rollback_empty_o.js
deleted file mode 100644
index f3468fcde5e..00000000000
--- a/jstests/replsets/rollback_empty_o.js
+++ /dev/null
@@ -1,93 +0,0 @@
-// test that a rollback of an op with empty o causes a message to be logged
-//
-// If all data-bearing nodes in a replica set are using an ephemeral storage engine, the set will
-// not be able to survive a scenario where all data-bearing nodes are down simultaneously. In such a
-// scenario, none of the members will have any data, and upon restart will each look for a member to
-// inital sync from, so no primary will be elected. This test induces such a scenario, so cannot be
-// run on ephemeral storage engines.
-// @tags: [requires_persistence]
-
-// function to check the logs for an entry
-doesEntryMatch = function(array, regex) {
- var found = false;
- for (i = 0; i < array.length; i++) {
- if (regex.test(array[i])) {
- found = true;
- }
- }
- return found;
-};
-
-// set up a set and grab things for later
-var name = "rollback_empty_o";
-var replTest = new ReplSetTest({name: name, nodes: 3});
-var nodes = replTest.nodeList();
-var conns = replTest.startSet();
-replTest.initiate({
- "_id": name,
- "members": [
- {"_id": 0, "host": nodes[0], priority: 3},
- {"_id": 1, "host": nodes[1]},
- {"_id": 2, "host": nodes[2], arbiterOnly: true}
- ]
-});
-var a_conn = conns[0];
-var b_conn = conns[1];
-var AID = replTest.getNodeId(a_conn);
-var BID = replTest.getNodeId(b_conn);
-
-replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY, 60 * 1000);
-
-// get master and do an initial write
-var master = replTest.getPrimary();
-assert(master === conns[0], "conns[0] assumed to be master");
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-var options = {
- writeConcern: {w: 2, wtimeout: 60000},
- upsert: true
-};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 1}, options));
-
-// shut down master
-replTest.stop(AID);
-
-// insert a fake oplog entry with an empty o
-master = replTest.getPrimary();
-assert(b_conn.host === master.host, "b_conn assumed to be master");
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-// another insert to set minvalid ahead
-assert.writeOK(b_conn.getDB(name).foo.insert({x: 123}));
-var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0];
-oplog_entry["ts"] = Timestamp(oplog_entry["ts"].t, oplog_entry["ts"].i + 1);
-oplog_entry["o"] = {};
-assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry));
-
-// shut down B and bring back the original master
-replTest.stop(BID);
-replTest.restart(AID);
-master = replTest.getPrimary();
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-
-// do a write so that B will have to roll back
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 2}, options));
-
-// restart B, which should rollback and log a message about not rolling back empty o'd oplog entry
-replTest.restart(BID);
-var msg = RegExp("ignoring op on rollback : ");
-assert.soon(function() {
- try {
- var log = b_conn.getDB("admin").adminCommand({getLog: "global"}).log;
- return doesEntryMatch(log, msg);
- } catch (e) {
- return false;
- }
-}, "Did not see a log entry about skipping the empty o'd oplog entry during rollback");
-
-replTest.stopSet();
diff --git a/jstests/replsets/rollback_empty_o2.js b/jstests/replsets/rollback_empty_o2.js
deleted file mode 100644
index 56eb8512575..00000000000
--- a/jstests/replsets/rollback_empty_o2.js
+++ /dev/null
@@ -1,94 +0,0 @@
-// test that a rollback of an update with empty o2 causes a message to be logged
-//
-// If all data-bearing nodes in a replica set are using an ephemeral storage engine, the set will
-// not be able to survive a scenario where all data-bearing nodes are down simultaneously. In such a
-// scenario, none of the members will have any data, and upon restart will each look for a member to
-// inital sync from, so no primary will be elected. This test induces such a scenario, so cannot be
-// run on ephemeral storage engines.
-// @tags: [requires_persistence]
-
-// function to check the logs for an entry
-doesEntryMatch = function(array, regex) {
- var found = false;
- for (i = 0; i < array.length; i++) {
- if (regex.test(array[i])) {
- found = true;
- }
- }
- return found;
-};
-
-// set up a set and grab things for later
-var name = "rollback_empty_o2";
-var replTest = new ReplSetTest({name: name, nodes: 3});
-var nodes = replTest.nodeList();
-var conns = replTest.startSet();
-replTest.initiate({
- "_id": name,
- "members": [
- {"_id": 0, "host": nodes[0], priority: 3},
- {"_id": 1, "host": nodes[1]},
- {"_id": 2, "host": nodes[2], arbiterOnly: true}
- ]
-});
-var a_conn = conns[0];
-var b_conn = conns[1];
-var AID = replTest.getNodeId(a_conn);
-var BID = replTest.getNodeId(b_conn);
-
-replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY, 60 * 1000);
-
-// get master and do an initial write
-var master = replTest.getPrimary();
-assert(master === conns[0], "conns[0] assumed to be master");
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-var options = {
- writeConcern: {w: 2, wtimeout: 60000},
- upsert: true
-};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 1}, options));
-
-// shut down master
-replTest.stop(AID);
-
-// insert a fake oplog entry with an empty o2
-master = replTest.getPrimary();
-assert(b_conn.host === master.host, "b_conn assumed to be master");
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-// another insert to set minvalid ahead
-assert.writeOK(b_conn.getDB(name).foo.insert({x: 123}));
-var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0];
-oplog_entry["ts"] = Timestamp(oplog_entry["ts"].t, oplog_entry["ts"].i + 1);
-oplog_entry["op"] = "u";
-oplog_entry["o2"] = {};
-assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry));
-
-// shut down B and bring back the original master
-replTest.stop(BID);
-replTest.restart(AID);
-master = replTest.getPrimary();
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-
-// do a write so that B will have to roll back
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 2}, options));
-
-// restart B, which should rollback and log a message about not rolling back empty o2'd oplog entry
-replTest.restart(BID);
-var msg = RegExp("ignoring op on rollback : ");
-assert.soon(function() {
- try {
- var log = b_conn.getDB("admin").adminCommand({getLog: "global"}).log;
- return doesEntryMatch(log, msg);
- } catch (e) {
- return false;
- }
-}, "Did not see a log entry about skipping the empty o2'd oplog entry during rollback");
-
-replTest.stopSet();
diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp
index 9b68549f295..13b4b7faa01 100644
--- a/src/mongo/db/catalog/database.cpp
+++ b/src/mongo/db/catalog/database.cpp
@@ -315,14 +315,8 @@ void Database::getStats(OperationContext* opCtx, BSONObjBuilder* output, double
}
Status Database::dropCollection(OperationContext* txn, StringData fullns) {
- invariant(txn->lockState()->isDbLockedForMode(name(), MODE_X));
-
- LOG(1) << "dropCollection: " << fullns << endl;
- massertNamespaceNotIndex(fullns, "dropCollection");
-
- Collection* collection = getCollection(fullns);
- if (!collection) {
- // collection doesn't exist
+ if (!getCollection(fullns)) {
+ // Collection doesn't exist so don't bother validating if it can be dropped.
return Status::OK();
}
@@ -341,9 +335,24 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) {
}
}
+ return dropCollectionEvenIfSystem(txn, nss);
+}
+
+Status Database::dropCollectionEvenIfSystem(OperationContext* txn, const NamespaceString& fullns) {
+ invariant(txn->lockState()->isDbLockedForMode(name(), MODE_X));
+
+ LOG(1) << "dropCollection: " << fullns;
+
+ Collection* collection = getCollection(fullns);
+ if (!collection) {
+ return Status::OK(); // Post condition already met.
+ }
+
+ massertNamespaceNotIndex(fullns.toString(), "dropCollection");
+
BackgroundOperation::assertNoBgOpInProgForNs(fullns);
- audit::logDropCollection(&cc(), fullns);
+ audit::logDropCollection(&cc(), fullns.toString());
Status s = collection->getIndexCatalog()->dropAllIndexes(txn, true);
if (!s.isOK()) {
@@ -355,13 +364,13 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) {
verify(collection->_details->getTotalIndexCount(txn) == 0);
LOG(1) << "\t dropIndexes done" << endl;
- Top::get(txn->getClient()->getServiceContext()).collectionDropped(fullns);
+ Top::get(txn->getClient()->getServiceContext()).collectionDropped(fullns.toString());
// We want to destroy the Collection object before telling the StorageEngine to destroy the
// RecordStore.
- _clearCollectionCache(txn, fullns, "collection dropped");
+ _clearCollectionCache(txn, fullns.toString(), "collection dropped");
- s = _dbEntry->dropCollection(txn, fullns);
+ s = _dbEntry->dropCollection(txn, fullns.toString());
if (!s.isOK())
return s;
@@ -378,7 +387,7 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) {
}
}
- getGlobalServiceContext()->getOpObserver()->onDropCollection(txn, nss);
+ getGlobalServiceContext()->getOpObserver()->onDropCollection(txn, fullns);
return Status::OK();
}
diff --git a/src/mongo/db/catalog/database.h b/src/mongo/db/catalog/database.h
index 2fa20bc9358..77b6ccc865b 100644
--- a/src/mongo/db/catalog/database.h
+++ b/src/mongo/db/catalog/database.h
@@ -144,7 +144,12 @@ public:
const DatabaseCatalogEntry* getDatabaseCatalogEntry() const;
+ /**
+ * dropCollection() will refuse to drop system collections. Use dropCollectionEvenIfSystem() if
+ * that is required.
+ */
Status dropCollection(OperationContext* txn, StringData fullns);
+ Status dropCollectionEvenIfSystem(OperationContext* txn, const NamespaceString& fullns);
Collection* createCollection(OperationContext* txn,
StringData ns,
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 7e2587b4561..37ba22bf1a7 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -54,6 +54,8 @@
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/executor/network_interface_factory.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -242,6 +244,8 @@ void BackgroundSync::_producerThread() {
return;
}
+ invariant(!state.rollback());
+
// We need to wait until initial sync has started.
if (_replCoord->getMyLastAppliedOpTime().isNull()) {
sleepsecs(1);
@@ -296,7 +300,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
minValid = minValidSaved;
}
}
- syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord);
+
+ int rbid;
+ syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord, &rbid);
// no server found
if (syncSourceReader.getHost().empty()) {
@@ -348,7 +354,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
lastOpTimeFetched,
lastHashFetched,
fetcherMaxTimeMS,
- &fetcherReturnStatus);
+ &fetcherReturnStatus,
+ rbid);
BSONObjBuilder cmdBob;
@@ -436,19 +443,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
}
- // check that we are at minvalid, otherwise we cannot roll back as we may be in an
- // inconsistent state
- const auto minValid = getMinValid(txn);
- if (lastApplied < minValid) {
- fassertNoTrace(18750,
- Status(ErrorCodes::UnrecoverableRollbackError,
- str::stream()
- << "need to rollback, but in inconsistent state. "
- << "minvalid: " << minValid.toString()
- << " > our last optime: " << lastApplied.toString()));
- }
- _rollback(txn, source, getConnection);
+ _rollback(txn, source, rbid, getConnection);
stop();
} else if (!fetcherReturnStatus.isOK()) {
warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString();
@@ -461,7 +457,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* returnStatus) {
+ Status* returnStatus,
+ int rbid) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!result.isOK()) {
@@ -515,6 +512,46 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
+ // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
+ // since that could cause it to not have our required minValid point. The cursor will be
+ // killed if the upstream node rolls back so we don't need to keep checking. This must be
+ // blocking since the Fetcher doesn't give us a way to defer sending the getmores after we
+ // return.
+ auto handle = _threadPoolTaskExecutor.scheduleRemoteCommand(
+ {source, "admin", BSON("replSetGetRBID" << 1)},
+ [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
+ *returnStatus = rbidReply.response.getStatus();
+ if (!returnStatus->isOK())
+ return;
+
+ const auto& rbidReplyObj = rbidReply.response.getValue().data;
+ *returnStatus = getStatusFromCommandResult(rbidReplyObj);
+ if (!returnStatus->isOK())
+ return;
+
+ const auto rbidElem = rbidReplyObj["rbid"];
+ if (rbidElem.type() != NumberInt) {
+ *returnStatus =
+ Status(ErrorCodes::BadValue,
+ str::stream() << "Upstream node returned an "
+ << "rbid with invalid type " << rbidElem.type());
+ return;
+ }
+ if (rbidElem.Int() != rbid) {
+ *returnStatus = Status(ErrorCodes::BadValue,
+ "Upstream node rolled back after verifying "
+ "that it had our MinValid point. Retrying.");
+ }
+ });
+ if (!handle.isOK()) {
+ *returnStatus = handle.getStatus();
+ return;
+ }
+
+ _threadPoolTaskExecutor.wait(handle.getValue());
+ if (!returnStatus->isOK())
+ return;
+
auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> {
if (firstDocToApply == lastDocToApply) {
return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing");
@@ -702,40 +739,77 @@ void BackgroundSync::consume() {
void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
+ boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
- // Abort only when syncRollback detects we are in a unrecoverable state.
- // In other cases, we log the message contained in the error status and retry later.
- auto status = syncRollback(txn,
- OplogInterfaceLocal(txn, rsOplogName),
- RollbackSourceImpl(getConnection, source, rsOplogName),
- _replCoord);
- if (status.isOK()) {
- // When the syncTail thread sees there is no new data by adding something to the buffer.
- _signalNoNewDataForApplier();
- // Wait until the buffer is empty.
- // This is an indication that syncTail has removed the sentinal marker from the buffer
- // and reset its local lastAppliedOpTime via the replCoord.
- while (!_buffer.empty()) {
- sleepmillis(10);
- if (inShutdown()) {
- return;
- }
+ // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from
+ // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any
+ // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or
+ // after transitioning to RECOVERING. We always transition to RECOVERING regardless of success
+ // or (recoverable) failure since we may be in an inconsistent state. If rollback failed before
+ // writing anything, SyncTail will quickly take us to SECONDARY since are are still at our
+ // original MinValid, which is fine because we may choose a sync source that doesn't require
+ // rollback. If it failed after we wrote to MinValid, then we will pick a sync source that will
+ // cause us to roll back to the same common point, which is fine. If we succeeded, we will be
+ // consistent as soon as we apply up to/through MinValid and SyncTail will make us SECONDARY
+ // then.
+ {
+ log() << "rollback 0";
+ Lock::GlobalWrite globalWrite(txn->lockState());
+ if (!_replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) {
+ log() << "Cannot transition from " << _replCoord->getMemberState().toString() << " to "
+ << MemberState(MemberState::RS_ROLLBACK).toString();
+ return;
}
+ }
- // It is now safe to clear the ROLLBACK state, which may result in the applier thread
- // transitioning to SECONDARY. This is safe because the applier thread has now reloaded
- // the new rollback minValid from the database.
- if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
- warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
- << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK)
- << " but found self in " << _replCoord->getMemberState();
+ try {
+ auto status = syncRollback(txn,
+ OplogInterfaceLocal(txn, rsOplogName),
+ RollbackSourceImpl(getConnection, source, rsOplogName),
+ requiredRBID,
+ _replCoord);
+
+ // Abort only when syncRollback detects we are in a unrecoverable state.
+ // WARNING: these statuses sometimes have location codes which are lost with uassertStatusOK
+ // so we need to check here first.
+ if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
+ severe() << "Unable to complete rollback. A full resync may be needed: " << status;
+ fassertFailedNoTrace(28723);
}
- return;
- }
- if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
- fassertNoTrace(28723, status);
+
+ // In other cases, we log the message contained in the error status and retry later.
+ uassertStatusOK(status);
+ } catch (const DBException& ex) {
+ // UnrecoverableRollbackError should only come from a returned status which is handled
+ // above.
+ invariant(ex.getCode() != ErrorCodes::UnrecoverableRollbackError);
+
+ warning() << "rollback cannot complete at this time (retrying later): " << ex
+ << " appliedThrough=" << _replCoord->getMyLastAppliedOpTime()
+ << " minvalid=" << getMinValid(txn);
+
+ // Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If
+ // we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this
+ // will also prevent us from hot-looping and putting too much load on upstream nodes.
+ sleepsecs(5); // 5 seconds was chosen as a completely arbitrary amount of time.
+ } catch (...) {
+ std::terminate();
+ }
+
+ // At this point we are about to leave rollback. Before we do, wait for any writes done
+ // as part of rollback to be durable, and then do any necessary checks that we didn't
+ // wind up rolling back something illegal. We must wait for the rollback to be durable
+ // so that if we wind up shutting down uncleanly in response to something we rolled back
+ // we know that we won't wind up right back in the same situation when we start back up
+ // because the rollback wasn't durable.
+ txn->recoveryUnit()->waitUntilDurable();
+
+ if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
+ severe() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
+ << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK)
+ << " but found self in " << _replCoord->getMemberState();
+ fassertFailedNoTrace(40364);
}
- warning() << "rollback cannot proceed at this time (retrying later): " << status;
}
HostAndPort BackgroundSync::getSyncTarget() {
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 489790f988d..0c1a1ea99f5 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -187,7 +187,8 @@ private:
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* returnStatus);
+ Status* returnStatus,
+ int rbid);
/**
* Executes a rollback.
@@ -195,6 +196,7 @@ private:
*/
void _rollback(OperationContext* txn,
const HostAndPort& source,
+ boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection);
/**
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 63832b201af..f00b95e147a 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -165,7 +165,8 @@ Status OplogReader::_compareRequiredOpTimeWithQueryResponse(const OpTime& requir
void OplogReader::connectToSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched,
const OpTime& requiredOpTime,
- ReplicationCoordinator* replCoord) {
+ ReplicationCoordinator* replCoord,
+ int* rbidOut) {
const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max());
OpTime oldestOpTimeSeen = sentinel;
@@ -209,6 +210,18 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10));
continue;
}
+
+ if (rbidOut) {
+ BSONObj reply;
+ if (!conn()->runCommand("admin", BSON("replSetGetRBID" << 1), reply)) {
+ log() << "Error getting rbid from " << candidate << ": " << reply;
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10));
+ continue;
+ }
+ *rbidOut = reply["rbid"].Int();
+ }
+
// Read the first (oldest) op and confirm that it's not newer than our last
// fetched op. Otherwise, we have fallen off the back of that source's oplog.
BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query()));
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 1434125697a..3c983b007fc 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -153,12 +153,17 @@ public:
* is left unconnected, where this->conn() equals NULL.
* In the process of connecting, this function may add items to the repl coordinator's
* sync source blacklist.
+ *
+ * If the rbidOut param is non-null it will be set to the rbid of the server before any data is
+ * fetched from it.
+ *
* This function may throw DB exceptions.
*/
void connectToSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched,
const OpTime& requiredOpTime,
- ReplicationCoordinator* replCoord);
+ ReplicationCoordinator* replCoord,
+ int* rbidOut = nullptr);
private:
/**
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index c05c5a7d69c..c2162484171 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -64,6 +64,7 @@
#include "mongo/db/repl/rslog.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
/* Scenarios
*
@@ -120,58 +121,48 @@ namespace repl {
// Failpoint which causes rollback to hang before finishing.
MONGO_FP_DECLARE(rollbackHangBeforeFinish);
-namespace {
-
-class RSFatalException : public std::exception {
-public:
- RSFatalException(std::string m = "replica set fatal exception") : msg(m) {}
- virtual ~RSFatalException() throw(){};
- virtual const char* what() const throw() {
- return msg.c_str();
- }
-
-private:
- std::string msg;
-};
-
-struct DocID {
- // ns and _id both point into ownedObj's buffer
- BSONObj ownedObj;
- const char* ns;
- BSONElement _id;
- bool operator<(const DocID& other) const {
- int comp = strcmp(ns, other.ns);
- if (comp < 0)
- return true;
- if (comp > 0)
- return false;
- return _id < other._id;
- }
-};
-
-struct FixUpInfo {
- // note this is a set -- if there are many $inc's on a single document we need to rollback,
- // we only need to refetch it once.
- set<DocID> toRefetch;
+using namespace rollback_internal;
- // collections to drop
- set<string> toDrop;
+bool DocID::operator<(const DocID& other) const {
+ int comp = strcmp(ns, other.ns);
+ if (comp < 0)
+ return true;
+ if (comp > 0)
+ return false;
- // Indexes to drop.
- // Key is collection namespace. Value is name of index to drop.
- multimap<string, string> indexesToDrop;
+ return _id < other._id;
+}
- set<string> collectionsToResyncData;
- set<string> collectionsToResyncMetadata;
+bool DocID::operator==(const DocID& other) const {
+ // Since this is only used for tests, going with the simple impl that reuses operator< which is
+ // used in the real code.
+ return !(*this < other || other < *this);
+}
- OpTime commonPoint;
- RecordId commonPointOurDiskloc;
+void FixUpInfo::removeAllDocsToRefetchFor(const std::string& collection) {
+ docsToRefetch.erase(docsToRefetch.lower_bound(DocID::minFor(collection.c_str())),
+ docsToRefetch.upper_bound(DocID::maxFor(collection.c_str())));
+}
- int rbid; // remote server's current rollback sequence #
-};
+void FixUpInfo::removeRedundantOperations() {
+ // These loops and their bodies can be done in any order. The final result of the FixUpInfo
+ // members will be the same either way.
+ for (const auto& collection : collectionsToDrop) {
+ removeAllDocsToRefetchFor(collection);
+ indexesToDrop.erase(collection);
+ collectionsToResyncMetadata.erase(collection);
+ }
+ for (const auto& collection : collectionsToResyncData) {
+ removeAllDocsToRefetchFor(collection);
+ indexesToDrop.erase(collection);
+ collectionsToResyncMetadata.erase(collection);
+ collectionsToDrop.erase(collection);
+ }
+}
-Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
+Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo,
+ const BSONObj& ourObj) {
const char* op = ourObj.getStringField("op");
if (*op == 'n')
return Status::OK();
@@ -183,14 +174,14 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
doc.ownedObj = ourObj.getOwned();
doc.ns = doc.ownedObj.getStringField("ns");
if (*doc.ns == '\0') {
- warning() << "ignoring op on rollback no ns TODO : " << doc.ownedObj.toString();
- return Status::OK();
+ throw RSFatalException(str::stream()
+ << "local op on rollback has no ns: " << doc.ownedObj.toString());
}
BSONObj obj = doc.ownedObj.getObjectField(*op == 'u' ? "o2" : "o");
if (obj.isEmpty()) {
- warning() << "ignoring op on rollback : " << doc.ownedObj.toString();
- return Status::OK();
+ throw RSFatalException(str::stream()
+ << "local op on rollback has no object field: " << (doc.ownedObj));
}
if (*op == 'c') {
@@ -208,7 +199,7 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
// Create collection operation
// { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc
- fixUpInfo.toDrop.insert(ns);
+ fixUpInfo.collectionsToDrop.insert(ns);
return Status::OK();
} else if (cmdname == "drop") {
string ns = nss.db().toString() + '.' + first.valuestr();
@@ -270,7 +261,7 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
severe() << message;
return Status(ErrorCodes::UnrecoverableRollbackError, message);
}
- auto subStatus = refetch(fixUpInfo, subopElement.Obj());
+ auto subStatus = updateFixUpInfoFromLocalOplogEntry(fixUpInfo, subopElement.Obj());
if (!subStatus.isOK()) {
return subStatus;
}
@@ -328,13 +319,45 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
throw RSFatalException();
}
- fixUpInfo.toRefetch.insert(doc);
+ fixUpInfo.docsToRefetch.insert(doc);
return Status::OK();
}
+namespace {
+
+/**
+ * This must be called before making any changes to our local data and after fetching any
+ * information from the upstream node. If any information is fetched from the upstream node after we
+ * have written locally, the function must be called again.
+ */
+void checkRbidAndUpdateMinValid(OperationContext* txn,
+ const int rbid,
+ const RollbackSource& rollbackSource) {
+ // It is important that the steps are performed in order to avoid racing with upstream rollbacks
+ //
+ // 1) Get the last doc in their oplog.
+ // 2) Get their RBID and fail if it has changed.
+ // 3) Set our minValid to the previously fetched OpTime of the top of their oplog.
+
+ const auto newMinValidDoc = rollbackSource.getLastOperation();
+ if (newMinValidDoc.isEmpty()) {
+ uasserted(40361, "rollback error newest oplog entry on source is missing or empty");
+ }
+ if (rbid != rollbackSource.getRollbackId()) {
+ // Our source rolled back itself so the data we received isn't necessarily consistent.
+ uasserted(40365, "rollback rbid on source changed during rollback, canceling this attempt");
+ }
+
+ // we have items we are writing that aren't from a point-in-time. thus best not to come
+ // online until we get to that point in freshness.
+ OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValidDoc));
+ log() << "Setting minvalid to " << minValid;
+ setAppliedThrough(txn, {}); // Use top of oplog.
+ setMinValid(txn, minValid);
+}
void syncFixUp(OperationContext* txn,
- FixUpInfo& fixUpInfo,
+ const FixUpInfo& fixUpInfo,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord) {
// fetch all first so we needn't handle interruption in a fancy way
@@ -344,73 +367,45 @@ void syncFixUp(OperationContext* txn,
// namespace -> doc id -> doc
map<string, map<DocID, BSONObj>> goodVersions;
- BSONObj newMinValid;
-
// fetch all the goodVersions of each document from current primary
- DocID doc;
unsigned long long numFetched = 0;
- try {
- for (set<DocID>::iterator it = fixUpInfo.toRefetch.begin(); it != fixUpInfo.toRefetch.end();
- it++) {
- doc = *it;
+ for (auto&& doc : fixUpInfo.docsToRefetch) {
+ invariant(!doc._id.eoo()); // This is checked when we insert to the set.
- verify(!doc._id.eoo());
-
- {
- // TODO : slow. lots of round trips.
- numFetched++;
- BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap());
- totalSize += good.objsize();
- uassert(13410, "replSet too much data to roll back", totalSize < 300 * 1024 * 1024);
-
- // note good might be eoo, indicating we should delete it
- goodVersions[doc.ns][doc] = good;
+ try {
+ // TODO : slow. lots of round trips.
+ numFetched++;
+ BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap());
+ totalSize += good.objsize();
+ if (totalSize >= 300 * 1024 * 1024) {
+ throw RSFatalException("replSet too much data to roll back");
}
+
+ // Note good might be empty, indicating we should delete it.
+ goodVersions[doc.ns][doc] = good;
+ } catch (const DBException& ex) {
+ log() << "rollback couldn't re-get from ns: " << doc.ns << " _id: " << (doc._id) << ' '
+ << numFetched << '/' << fixUpInfo.docsToRefetch.size() << ": " << (ex);
+ throw;
}
- newMinValid = rollbackSource.getLastOperation();
- if (newMinValid.isEmpty()) {
- error() << "rollback error newMinValid empty?";
- return;
- }
- } catch (const DBException& e) {
- LOG(1) << "rollback re-get objects: " << e.toString();
- error() << "rollback couldn't re-get ns:" << doc.ns << " _id:" << doc._id << ' '
- << numFetched << '/' << fixUpInfo.toRefetch.size();
- throw e;
}
log() << "rollback 3.5";
- if (fixUpInfo.rbid != rollbackSource.getRollbackId()) {
- // Our source rolled back itself so the data we received isn't necessarily consistent.
- warning() << "rollback rbid on source changed during rollback, "
- << "cancelling this attempt";
- return;
- }
+ checkRbidAndUpdateMinValid(txn, fixUpInfo.rbid, rollbackSource);
// update them
log() << "rollback 4 n:" << goodVersions.size();
- bool warn = false;
-
invariant(!fixUpInfo.commonPointOurDiskloc.isNull());
- // we have items we are writing that aren't from a point-in-time. thus best not to come
- // online until we get to that point in freshness.
- // TODO this is still wrong because we don't record that we are in rollback, and we can't really
- // recover.
- OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid));
- log() << "minvalid=" << minValid;
- setAppliedThrough(txn, {}); // Use top of oplog.
- setMinValid(txn, minValid);
-
// any full collection resyncs required?
if (!fixUpInfo.collectionsToResyncData.empty() ||
!fixUpInfo.collectionsToResyncMetadata.empty()) {
for (const string& ns : fixUpInfo.collectionsToResyncData) {
log() << "rollback 4.1.1 coll resync " << ns;
- fixUpInfo.indexesToDrop.erase(ns);
- fixUpInfo.collectionsToResyncMetadata.erase(ns);
+ invariant(!fixUpInfo.indexesToDrop.count(ns));
+ invariant(!fixUpInfo.collectionsToResyncMetadata.count(ns));
const NamespaceString nss(ns);
@@ -421,7 +416,7 @@ void syncFixUp(OperationContext* txn,
Database* db = dbHolder().openDb(txn, nss.db().toString());
invariant(db);
WriteUnitOfWork wunit(txn);
- db->dropCollection(txn, ns);
+ fassertStatusOK(40359, db->dropCollectionEvenIfSystem(txn, nss));
wunit.commit();
}
@@ -443,9 +438,11 @@ void syncFixUp(OperationContext* txn,
auto infoResult = rollbackSource.getCollectionInfo(nss);
if (!infoResult.isOK()) {
- // Collection dropped by "them" so we should drop it too.
- log() << ns << " not found on remote host, dropping";
- fixUpInfo.toDrop.insert(ns);
+ // Collection dropped by "them" so we can't correctly change it here. If we get to
+ // the roll-forward phase, we will drop it then. If the drop is rolled-back upstream
+ // and we restart, we will be expected to still have the collection.
+ log() << ns << " not found on remote host, so not rolling back collmod operation."
+ " We will drop the collection soon.";
continue;
}
@@ -496,56 +493,28 @@ void syncFixUp(OperationContext* txn,
// we did more reading from primary, so check it again for a rollback (which would mess
// us up), and make minValid newer.
log() << "rollback 4.2";
-
- string err;
- try {
- newMinValid = rollbackSource.getLastOperation();
- if (newMinValid.isEmpty()) {
- err = "can't get minvalid from sync source";
- } else {
- OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid));
- log() << "minvalid=" << minValid;
- setMinValid(txn, minValid);
- setAppliedThrough(txn, fixUpInfo.commonPoint);
- }
- } catch (const DBException& e) {
- err = "can't get/set minvalid: ";
- err += e.what();
- }
- if (fixUpInfo.rbid != rollbackSource.getRollbackId()) {
- // our source rolled back itself. so the data we received isn't necessarily
- // consistent. however, we've now done writes. thus we have a problem.
- err += "rbid at primary changed during resync/rollback";
- }
- if (!err.empty()) {
- severe() << "rolling back : " << err << ". A full resync will be necessary.";
- // TODO: reset minvalid so that we are permanently in fatal state
- // TODO: don't be fatal, but rather, get all the data first.
- throw RSFatalException();
- }
- log() << "rollback 4.3";
+ checkRbidAndUpdateMinValid(txn, fixUpInfo.rbid, rollbackSource);
}
log() << "rollback 4.6";
- // drop collections to drop before doing individual fixups - that might make things faster
- // below actually if there were subsequent inserts to rollback
- for (set<string>::iterator it = fixUpInfo.toDrop.begin(); it != fixUpInfo.toDrop.end(); it++) {
+ // drop collections to drop before doing individual fixups
+ for (set<string>::iterator it = fixUpInfo.collectionsToDrop.begin();
+ it != fixUpInfo.collectionsToDrop.end();
+ it++) {
log() << "rollback drop: " << *it;
- fixUpInfo.indexesToDrop.erase(*it);
+ invariant(!fixUpInfo.indexesToDrop.count(*it));
ScopedTransaction transaction(txn, MODE_IX);
const NamespaceString nss(*it);
Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_X);
Database* db = dbHolder().get(txn, nsToDatabaseSubstring(*it));
if (db) {
- WriteUnitOfWork wunit(txn);
-
Helpers::RemoveSaver removeSaver("rollback", "", *it);
// perform a collection scan and write all documents in the collection to disk
std::unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
- txn, *it, db->getCollection(*it), PlanExecutor::YIELD_MANUAL));
+ txn, *it, db->getCollection(*it), PlanExecutor::YIELD_AUTO));
BSONObj curObj;
PlanExecutor::ExecState execState;
while (PlanExecutor::ADVANCED == (execState = exec->getNext(&curObj, NULL))) {
@@ -570,7 +539,8 @@ void syncFixUp(OperationContext* txn,
throw RSFatalException();
}
- db->dropCollection(txn, *it);
+ WriteUnitOfWork wunit(txn);
+ fassertStatusOK(40360, db->dropCollectionEvenIfSystem(txn, nss));
wunit.commit();
}
}
@@ -622,9 +592,8 @@ void syncFixUp(OperationContext* txn,
// while rolling back createCollection operations.
const auto& ns = nsAndGoodVersionsByDocID.first;
unique_ptr<Helpers::RemoveSaver> removeSaver;
- if (!fixUpInfo.toDrop.count(ns)) {
- removeSaver.reset(new Helpers::RemoveSaver("rollback", "", ns));
- }
+ invariant(!fixUpInfo.collectionsToDrop.count(ns));
+ removeSaver.reset(new Helpers::RemoveSaver("rollback", "", ns));
const auto& goodVersionsByDocID = nsAndGoodVersionsByDocID.second;
for (const auto& idAndDoc : goodVersionsByDocID) {
@@ -639,10 +608,7 @@ void syncFixUp(OperationContext* txn,
BSONObj pattern = doc._id.wrap(); // { _id : ... }
try {
verify(doc.ns && *doc.ns);
- if (fixUpInfo.collectionsToResyncData.count(doc.ns)) {
- // We just synced this entire collection.
- continue;
- }
+ invariant(!fixUpInfo.collectionsToResyncData.count(doc.ns));
// TODO: Lots of overhead in context. This can be faster.
const NamespaceString docNss(doc.ns);
@@ -715,8 +681,13 @@ void syncFixUp(OperationContext* txn,
}
}
} catch (const DBException& e) {
- error() << "rolling back capped collection rec " << doc.ns << ' '
- << e.toString();
+ // Replicated capped collections have many ways to become
+ // inconsistent. We rely on age-out to make these problems go away
+ // eventually.
+ warning() << "ignoring failure to roll back change to capped "
+ << "collection " << doc.ns << " with _id "
+ << idAndDoc.first._id.toString(
+ /*includeFieldName*/ false) << ": " << e;
}
} else {
deleteObjects(txn,
@@ -727,25 +698,6 @@ void syncFixUp(OperationContext* txn,
true, // justone
true); // god
}
- // did we just empty the collection? if so let's check if it even
- // exists on the source.
- if (collection->numRecords(txn) == 0) {
- try {
- NamespaceString nss(doc.ns);
- auto infoResult = rollbackSource.getCollectionInfo(nss);
- if (!infoResult.isOK()) {
- // we should drop
- WriteUnitOfWork wunit(txn);
- ctx.db()->dropCollection(txn, doc.ns);
- wunit.commit();
- }
- } catch (const DBException& ex) {
- // Failed to run listCollections command on sync source.
- // This isn't *that* big a deal, but is bad.
- warning() << "rollback error querying for existence of " << doc.ns
- << " at the primary, ignoring: " << ex;
- }
- }
}
} else {
// TODO faster...
@@ -766,8 +718,8 @@ void syncFixUp(OperationContext* txn,
}
} catch (const DBException& e) {
log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' '
- << e.toString() << " ndeletes:" << deletes;
- warn = true;
+ << e << " ndeletes:" << deletes;
+ throw;
}
}
}
@@ -795,101 +747,69 @@ void syncFixUp(OperationContext* txn,
Status status = getGlobalAuthorizationManager()->initialize(txn);
if (!status.isOK()) {
- warning() << "Failed to reinitialize auth data after rollback: " << status;
- warn = true;
+ severe() << "Failed to reinitialize auth data after rollback: " << status;
+ fassertFailedNoTrace(40366);
}
// Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the
// lastAppliedHash value in bgsync to reflect our new last op.
replCoord->resetLastOpTimesFromOplog(txn);
-
- // done
- if (warn)
- warning() << "issues during syncRollback, see log";
- else
- log() << "rollback done";
+ log() << "rollback done";
}
Status _syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
- ReplicationCoordinator* replCoord,
- const SleepSecondsFn& sleepSecondsFn) {
+ boost::optional<int> requiredRBID,
+ ReplicationCoordinator* replCoord) {
invariant(!txn->lockState()->isLocked());
- log() << "rollback 0";
-
- /** by doing this, we will not service reads (return an error as we aren't in secondary
- * state. that perhaps is moot because of the write lock above, but that write lock
- * probably gets deferred or removed or yielded later anyway.
- *
- * also, this is better for status reporting - we know what is happening.
- */
- {
- Lock::GlobalWrite globalWrite(txn->lockState());
- if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "Cannot transition from "
- << replCoord->getMemberState().toString() << " to "
- << MemberState(MemberState::RS_ROLLBACK).toString());
- }
- }
-
FixUpInfo how;
log() << "rollback 1";
how.rbid = rollbackSource.getRollbackId();
- {
- log() << "rollback 2 FindCommonPoint";
- try {
- auto processOperationForFixUp =
- [&how](const BSONObj& operation) { return refetch(how, operation); };
- auto res = syncRollBackLocalOperations(
- localOplog, rollbackSource.getOplog(), processOperationForFixUp);
- if (!res.isOK()) {
- const auto status = res.getStatus();
- switch (status.code()) {
- case ErrorCodes::OplogStartMissing:
- case ErrorCodes::UnrecoverableRollbackError:
- sleepSecondsFn(Seconds(1));
- return status;
- default:
- throw RSFatalException(status.toString());
- }
- } else {
- how.commonPoint = res.getValue().first;
- how.commonPointOurDiskloc = res.getValue().second;
+ if (requiredRBID) {
+ uassert(40362,
+ "Upstream node rolled back. Need to retry our rollback.",
+ how.rbid == *requiredRBID);
+ }
+
+ log() << "rollback 2 FindCommonPoint";
+ try {
+ auto processOperationForFixUp = [&how](const BSONObj& operation) {
+ return updateFixUpInfoFromLocalOplogEntry(how, operation);
+ };
+ auto res = syncRollBackLocalOperations(
+ localOplog, rollbackSource.getOplog(), processOperationForFixUp);
+ if (!res.isOK()) {
+ const auto status = res.getStatus();
+ switch (status.code()) {
+ case ErrorCodes::OplogStartMissing:
+ case ErrorCodes::UnrecoverableRollbackError:
+ return status;
+ default:
+ throw RSFatalException(status.toString());
}
- } catch (const RSFatalException& e) {
- error() << string(e.what());
- return Status(ErrorCodes::UnrecoverableRollbackError,
- str::stream()
- << "need to rollback, but unable to determine common point between"
- " local and remote oplog: " << e.what(),
- 18752);
- } catch (const DBException& e) {
- warning() << "rollback 2 exception " << e.toString() << "; sleeping 1 min";
-
- sleepSecondsFn(Seconds(60));
- throw;
}
+
+ how.commonPoint = res.getValue().first;
+ how.commonPointOurDiskloc = res.getValue().second;
+ how.removeRedundantOperations();
+ } catch (const RSFatalException& e) {
+ return Status(ErrorCodes::UnrecoverableRollbackError,
+ str::stream()
+ << "need to rollback, but unable to determine common point between"
+ " local and remote oplog: " << e.what(),
+ 18752);
}
+ log() << "rollback common point is " << how.commonPoint;
log() << "rollback 3 fixup";
-
- replCoord->incrementRollbackID();
try {
+ ON_BLOCK_EXIT([&] { replCoord->incrementRollbackID(); });
syncFixUp(txn, how, rollbackSource, replCoord);
} catch (const RSFatalException& e) {
- error() << "exception during rollback: " << e.what();
- return Status(ErrorCodes::UnrecoverableRollbackError,
- str::stream() << "exception during rollback: " << e.what(),
- 18753);
- } catch (...) {
- replCoord->incrementRollbackID();
-
- throw;
+ return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753);
}
- replCoord->incrementRollbackID();
if (MONGO_FAIL_POINT(rollbackHangBeforeFinish)) {
// This log output is used in js tests so please leave it.
@@ -900,8 +820,6 @@ Status _syncRollback(OperationContext* txn,
}
}
- // Success; leave "ROLLBACK" state intact until applier thread has reloaded the new minValid.
- // Otherwise, the applier could transition the node to SECONDARY with an out-of-date minValid.
return Status::OK();
}
@@ -910,8 +828,8 @@ Status _syncRollback(OperationContext* txn,
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
- ReplicationCoordinator* replCoord,
- const SleepSecondsFn& sleepSecondsFn) {
+ boost::optional<int> requiredRBID,
+ ReplicationCoordinator* replCoord) {
invariant(txn);
invariant(replCoord);
@@ -919,22 +837,11 @@ Status syncRollback(OperationContext* txn,
DisableDocumentValidation validationDisabler(txn);
txn->setReplicatedWrites(false);
- Status status = _syncRollback(txn, localOplog, rollbackSource, replCoord, sleepSecondsFn);
+ Status status = _syncRollback(txn, localOplog, rollbackSource, requiredRBID, replCoord);
log() << "rollback finished" << rsLog;
return status;
}
-Status syncRollback(OperationContext* txn,
- const OplogInterface& localOplog,
- const RollbackSource& rollbackSource,
- ReplicationCoordinator* replCoord) {
- return syncRollback(txn,
- localOplog,
- rollbackSource,
- replCoord,
- [](Seconds seconds) { sleepsecs(durationCount<Seconds>(seconds)); });
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index c88e9fd27c0..8ee7dd04367 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -28,12 +28,10 @@
#pragma once
-#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
-#include "mongo/base/status_with.h"
#include "mongo/db/jsobj.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/util/time_support.h"
+#include "mongo/db/record_id.h"
+#include "mongo/db/repl/optime.h"
namespace mongo {
@@ -44,7 +42,6 @@ class OperationContext;
namespace repl {
class OplogInterface;
-class OpTime;
class ReplicationCoordinator;
class RollbackSource;
@@ -58,7 +55,7 @@ class RollbackSource;
* - undo operations by fetching all documents affected, then replaying
* the sync source's oplog until we reach the time in the oplog when we fetched the last
* document.
- * This function can throw std::exception on failures.
+ * This function can throw exceptions on failures.
* This function runs a command on the sync source to detect if the sync source rolls back
* while our rollback is in progress.
*
@@ -69,21 +66,77 @@ class RollbackSource;
* supports fetching documents and copying collections.
* @param replCoord Used to track the rollback ID and to change the follower state
*
- * Failures: Most failures are returned as a status but some failures throw an std::exception.
+ * If requiredRBID is supplied, we error if the upstream node has a different RBID (ie it rolled
+ * back) after fetching any information from it.
+ *
+ * Failures: If a Status with code UnrecoverableRollbackError is returned, the caller must exit
+ * fatally. All other errors should be considered recoverable regardless of whether reported as a
+ * status or exception.
*/
-
-using SleepSecondsFn = stdx::function<void(Seconds)>;
-
-Status syncRollback(OperationContext* txn,
- const OplogInterface& localOplog,
- const RollbackSource& rollbackSource,
- ReplicationCoordinator* replCoord,
- const SleepSecondsFn& sleepSecondsFn);
-
Status syncRollback(OperationContext* txn,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
+ boost::optional<int> requiredRBID,
ReplicationCoordinator* replCoord);
+/**
+ * This namespace contains internal details of the rollback system. It is only exposed in a header
+ * for unittesting. Nothing here should be used outside of rs_rollback.cpp or its unittest.
+ */
+namespace rollback_internal {
+
+struct DocID {
+ BSONObj ownedObj;
+ const char* ns;
+ BSONElement _id;
+ bool operator<(const DocID& other) const;
+ bool operator==(const DocID& other) const;
+
+ static DocID minFor(const char* ns) {
+ auto obj = BSON("" << MINKEY);
+ return {obj, ns, obj.firstElement()};
+ }
+
+ static DocID maxFor(const char* ns) {
+ auto obj = BSON("" << MAXKEY);
+ return {obj, ns, obj.firstElement()};
+ }
+};
+
+struct FixUpInfo {
+ // note this is a set -- if there are many $inc's on a single document we need to rollback,
+ // we only need to refetch it once.
+ std::set<DocID> docsToRefetch;
+
+ // Key is collection namespace. Value is name of index to drop.
+ std::multimap<std::string, std::string> indexesToDrop;
+
+ std::set<std::string> collectionsToDrop;
+ std::set<std::string> collectionsToResyncData;
+ std::set<std::string> collectionsToResyncMetadata;
+
+ OpTime commonPoint;
+ RecordId commonPointOurDiskloc;
+
+ int rbid; // remote server's current rollback sequence #
+
+ void removeAllDocsToRefetchFor(const std::string& collection);
+ void removeRedundantOperations();
+};
+
+// Indicates that rollback cannot complete and the server must abort.
+class RSFatalException : public std::exception {
+public:
+ RSFatalException(std::string m = "replica set fatal exception") : msg(m) {}
+ virtual const char* what() const throw() {
+ return msg.c_str();
+ }
+
+private:
+ std::string msg;
+};
+
+Status updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo, const BSONObj& ourObj);
+} // namespace rollback_internal
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 3f768d79f4b..3a324aff179 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -33,6 +33,7 @@
#include <list>
#include <utility>
+#include "mongo/bson/json.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
@@ -63,6 +64,7 @@ namespace {
using namespace mongo;
using namespace mongo::repl;
+using namespace mongo::repl::rollback_internal;
const OplogInterfaceMock::Operations kEmptyMockOperations;
@@ -173,8 +175,6 @@ void RSRollbackTest::tearDown() {
setGlobalReplicationCoordinator(nullptr);
}
-void noSleep(Seconds seconds) {}
-
TEST_F(RSRollbackTest, InconsistentMinValid) {
repl::setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0));
repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0));
@@ -182,36 +182,12 @@ TEST_F(RSRollbackTest, InconsistentMinValid) {
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
}
-TEST_F(RSRollbackTest, SetFollowerModeFailed) {
- class ReplicationCoordinatorSetFollowerModeMock : public ReplicationCoordinatorMock {
- public:
- ReplicationCoordinatorSetFollowerModeMock()
- : ReplicationCoordinatorMock(createReplSettings()) {}
- MemberState getMemberState() const override {
- return MemberState::RS_DOWN;
- }
- bool setFollowerMode(const MemberState& newState) override {
- return false;
- }
- };
- _coordinator = new ReplicationCoordinatorSetFollowerModeMock();
- setGlobalReplicationCoordinator(_coordinator);
-
- ASSERT_EQUALS(ErrorCodes::OperationFailed,
- syncRollback(_txn.get(),
- OplogInterfaceMock(kEmptyMockOperations),
- RollbackSourceMock(std::unique_ptr<OplogInterface>(
- new OplogInterfaceMock(kEmptyMockOperations))),
- _coordinator,
- noSleep).code());
-}
-
TEST_F(RSRollbackTest, OplogStartMissing) {
OpTime ts(Timestamp(Seconds(1), 0), 0);
auto operation =
@@ -223,8 +199,8 @@ TEST_F(RSRollbackTest, OplogStartMissing) {
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
operation,
}))),
- _coordinator,
- noSleep).code());
+ {},
+ _coordinator).code());
}
TEST_F(RSRollbackTest, NoRemoteOpLog) {
@@ -235,8 +211,8 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) {
OplogInterfaceMock({operation}),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
}
@@ -257,12 +233,36 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) {
OplogInterfaceMock({operation}),
RollbackSourceLocal(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
- _coordinator,
- noSleep),
+ {},
+ _coordinator),
UserException,
ErrorCodes::UnknownError);
}
+TEST_F(RSRollbackTest, RemoteGetRollbackIdDiffersFromRequiredRBID) {
+ OpTime ts(Timestamp(Seconds(1), 0), 0);
+ auto operation =
+ std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
+
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)) {}
+ int getRollbackId() const override {
+ return 2;
+ }
+ };
+
+ ASSERT_THROWS_CODE(syncRollback(_txn.get(),
+ OplogInterfaceMock({operation}),
+ RollbackSourceLocal(std::unique_ptr<OplogInterface>(
+ new OplogInterfaceMock(kEmptyMockOperations))),
+ {1},
+ _coordinator),
+ UserException,
+ ErrorCodes::Error(40362));
+}
+
TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) {
createOplog(_txn.get());
OpTime ts(Timestamp(Seconds(1), 0), 1);
@@ -274,8 +274,8 @@ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) {
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
operation,
}))),
- _coordinator,
- noSleep));
+ {},
+ _coordinator));
}
/**
@@ -341,8 +341,8 @@ int _testRollbackDelete(OperationContext* txn,
ASSERT_OK(syncRollback(txn,
OplogInterfaceMock({deleteOperation, commonOperation}),
rollbackSource,
- coordinator,
- noSleep));
+ {},
+ coordinator));
ASSERT_TRUE(rollbackSource.called);
Lock::DBLock dbLock(txn->lockState(), "test", MODE_S);
@@ -415,8 +415,8 @@ TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -474,8 +474,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) {
_txn.get(),
OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep));
+ {},
+ _coordinator));
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("rollback drop index: collection: test.t. index: a_1"));
@@ -531,8 +531,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep));
+ {},
+ _coordinator));
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("rollback drop index: collection: test.t. index: a_1"));
@@ -578,8 +578,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -622,8 +622,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -665,8 +665,8 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) {
auto status = syncRollback(_txn.get(),
OplogInterfaceMock({insertDocumentOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -700,8 +700,8 @@ TEST_F(RSRollbackTest, RollbackUnknownCommand) {
RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
commonOperation,
}))),
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18751, status.location());
}
@@ -735,11 +735,51 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({dropCollectionOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep));
+ {},
+ _coordinator));
ASSERT_TRUE(rollbackSource.called);
}
+TEST_F(RSRollbackTest, RollbackDropCollectionCommandFailsIfRBIDChangesWhileSyncingCollection) {
+ createOplog(_txn.get());
+ auto commonOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
+ auto dropCollectionOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op"
+ << "c"
+ << "ns"
+ << "test.t"
+ << "o" << BSON("drop"
+ << "t")),
+ RecordId(2));
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)), copyCollectionCalled(false) {}
+ int getRollbackId() const override {
+ return copyCollectionCalled ? 1 : 0;
+ }
+ void copyCollectionFromRemote(OperationContext* txn,
+ const NamespaceString& nss) const override {
+ copyCollectionCalled = true;
+ }
+ mutable bool copyCollectionCalled;
+ };
+ RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ commonOperation,
+ })));
+
+ _createCollection(_txn.get(), "test.t", CollectionOptions());
+ ASSERT_THROWS_CODE(syncRollback(_txn.get(),
+ OplogInterfaceMock({dropCollectionOperation, commonOperation}),
+ rollbackSource,
+ 0,
+ _coordinator),
+ DBException,
+ 40365);
+ ASSERT(rollbackSource.copyCollectionCalled);
+}
+
BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list<BSONObj> ops) {
BSONObjBuilder entry;
entry << "ts" << ts << "h" << 1LL << "op"
@@ -848,8 +888,8 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({applyOpsOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep));
+ {},
+ _coordinator));
ASSERT_EQUALS(4U, rollbackSource.searchedIds.size());
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1));
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2));
@@ -887,8 +927,8 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({createCollectionOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep));
+ {},
+ _coordinator));
{
Lock::DBLock dbLock(_txn->lockState(), "test", MODE_S);
auto db = dbHolder().get(_txn.get(), "test");
@@ -928,8 +968,8 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) {
ASSERT_OK(syncRollback(_txn.get(),
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep));
+ {},
+ _coordinator));
stopCapturingLogMessages();
ASSERT_TRUE(rollbackSource.called);
for (const auto& message : getCapturedLogMessages()) {
@@ -967,10 +1007,89 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt
syncRollback(_txn.get(),
OplogInterfaceMock({collectionModificationOperation, commonOperation}),
rollbackSource,
- _coordinator,
- noSleep);
+ {},
+ _coordinator);
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18753, status.location());
}
+TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) {
+ const auto validOplogEntry = fromjson("{op: 'i', ns: 'test.t', o: {_id:1, a: 1}}");
+ FixUpInfo fui;
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry));
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry.removeField("ns")),
+ RSFatalException);
+}
+
+TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) {
+ const auto validOplogEntry = fromjson("{op: 'i', ns: 'test.t', o: {_id:1, a: 1}}");
+ FixUpInfo fui;
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry));
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry.removeField("o")),
+ RSFatalException);
+}
+
+TEST(RSRollbackTest, LocalEntryWithoutO2IsFatal) {
+ const auto validOplogEntry =
+ fromjson("{op: 'u', ns: 'test.t', o2: {_id: 1}, o: {_id:1, a: 1}}");
+ FixUpInfo fui;
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry));
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry.removeField("o2")),
+ RSFatalException);
+}
+
+// The testcases used here are trying to detect off-by-one errors in
+// FixUpInfo::removeAllDocsToRefectchFor.
+TEST(FixUpInfoTest, RemoveAllDocsToRefetchForWorks) {
+ const auto normalHolder = BSON("" << OID::gen());
+ const auto normalKey = normalHolder.firstElement();
+
+ // Can't use ASSERT_EQ with this since it isn't ostream-able. Failures will at least give you
+ // the size. If that isn't enough, use GDB.
+ using DocSet = std::set<DocID>;
+
+ FixUpInfo fui;
+ fui.docsToRefetch = {
+ DocID::minFor("a"),
+ DocID{{}, "a", normalKey},
+ DocID::maxFor("a"),
+
+ DocID::minFor("b"),
+ DocID{{}, "b", normalKey},
+ DocID::maxFor("b"),
+
+ DocID::minFor("c"),
+ DocID{{}, "c", normalKey},
+ DocID::maxFor("c"),
+ };
+
+ // Remove from the middle.
+ fui.removeAllDocsToRefetchFor("b");
+ ASSERT((fui.docsToRefetch ==
+ DocSet{
+ DocID::minFor("a"),
+ DocID{{}, "a", normalKey},
+ DocID::maxFor("a"),
+
+ DocID::minFor("c"),
+ DocID{{}, "c", normalKey},
+ DocID::maxFor("c"),
+ }))
+ << "remaining docs: " << fui.docsToRefetch.size();
+
+ // Remove from the end.
+ fui.removeAllDocsToRefetchFor("c");
+ ASSERT((fui.docsToRefetch ==
+ DocSet{
+ DocID::minFor("a"), // This comment helps clang-format.
+ DocID{{}, "a", normalKey},
+ DocID::maxFor("a"),
+ }))
+ << "remaining docs: " << fui.docsToRefetch.size();
+
+ // Everything else.
+ fui.removeAllDocsToRefetchFor("a");
+ ASSERT((fui.docsToRefetch == DocSet{})) << "remaining docs: " << fui.docsToRefetch.size();
+}
+
} // namespace