diff options
author | Mathias Stearn <redbeard0531@gmail.com> | 2016-10-07 18:42:20 -0400 |
---|---|---|
committer | Mathias Stearn <redbeard0531@gmail.com> | 2016-10-17 14:36:31 -0400 |
commit | 5db0a55a264ee326bff5598249639ef479628f37 (patch) | |
tree | 8a92aadd22d5be199af9b87e1f6388f1bd0ec632 | |
parent | 99f8d760848b0be69b8934b00d33552b1295d5d9 (diff) | |
download | mongo-5db0a55a264ee326bff5598249639ef479628f37.tar.gz |
SERVER-7200 Write oplog entries on secondaries before applying
Manual backport of 34c6c691a038eac1ac3ee16e1eedc54aab964774 along with fixes
and tests from:
b5d2b06f8a08171fd96ef8d128c4f7ecedcb8f93
dc83fb0433fcae6e72f035df7458473b59223eb5
fec839b99f4b9e08016112fe8b9492e327af91b8
bf86770c8a5de97b30bc008ad59e34de99065c60
34 files changed, 960 insertions, 335 deletions
diff --git a/jstests/noPassthrough/minvalid.js b/jstests/noPassthrough/minvalid.js index 6f22e65e2ca..d31f6d58da7 100644 --- a/jstests/noPassthrough/minvalid.js +++ b/jstests/noPassthrough/minvalid.js @@ -21,9 +21,10 @@ var lastOp = local.oplog.rs.find().sort({$natural: -1}).limit(1).next(); printjson(lastOp); print("3: change minvalid"); -// primaries don't populate minvalid by default -local.replset.minvalid.insert( - {ts: new Timestamp(lastOp.ts.t, lastOp.ts.i + 1), h: new NumberLong("1234567890")}); +assert.writeOK(local.replset.minvalid.update( + {}, + {$set: {ts: new Timestamp(lastOp.ts.t, lastOp.ts.i + 1), h: new NumberLong("1234567890")}}, + {upsert: true})); printjson(local.replset.minvalid.findOne()); print("4: restart"); diff --git a/jstests/replsets/clean_shutdown_oplog_state.js b/jstests/replsets/clean_shutdown_oplog_state.js index 51dba43ff98..3a856db1245 100644 --- a/jstests/replsets/clean_shutdown_oplog_state.js +++ b/jstests/replsets/clean_shutdown_oplog_state.js @@ -57,9 +57,10 @@ var conn = MongoRunner.runMongod(options); assert.neq(null, conn, "secondary failed to start"); - // Following a clean shutdown of a 3.2 node, the oplog must exactly match the applied - // operations. Additionally, the begin field must not be in the minValid document and the ts - // must match the top of the oplog (SERVER-25353). + // Following clean shutdown of a node, the oplog must exactly match the applied operations. + // Additionally, the begin field must not be in the minValid document, the ts must match the + // top of the oplog (SERVER-25353), and the oplogDeleteFromPoint must be null (SERVER-7200 and + // SERVER-25071). var oplogDoc = conn.getCollection('local.oplog.rs') .find({ns: 'test.coll'}) .sort({$natural: -1}) @@ -68,9 +69,20 @@ var minValidDoc = conn.getCollection('local.replset.minvalid').find().sort({$natural: -1}).limit(1)[0]; printjson({oplogDoc: oplogDoc, collDoc: collDoc, minValidDoc: minValidDoc}); - assert.eq(collDoc._id, oplogDoc.o._id); - assert(!('begin' in minValidDoc), 'begin in minValidDoc'); - assert.eq(minValidDoc.ts, oplogDoc.ts); + try { + assert.eq(collDoc._id, oplogDoc.o._id); + assert(!('begin' in minValidDoc), 'begin in minValidDoc'); + assert.eq(minValidDoc.ts, oplogDoc.ts); + if ('oplogDeleteFromPoint' in minValidDoc) { + // If present it must be the null timestamp. + assert.eq(minValidDoc.oplogDeleteFromPoint, Timestamp()); + } + } catch (e) { + jsTest.log( + "Look above and make sure clean shutdown finished without resorting to SIGKILL." + + "\nUnfortunately that currently doesn't fail the test."); + throw e; + } rst.stopSet(); })(); diff --git a/jstests/replsets/oplog_replay_on_startup.js b/jstests/replsets/oplog_replay_on_startup.js new file mode 100644 index 00000000000..0864c781d0c --- /dev/null +++ b/jstests/replsets/oplog_replay_on_startup.js @@ -0,0 +1,419 @@ +// SERVER-7200 On startup, replica set nodes delete oplog state past the oplog delete point and +// apply any remaining unapplied ops before coming up as a secondary. +// +// @tags: [requires_persistence] +(function() { + "use strict"; + + var ns = "test.coll"; + + var rst = new ReplSetTest({ + nodes: 1, + }); + + rst.startSet(); + rst.initiate(); + + var conn = rst.getPrimary(); // Waits for PRIMARY state. + var term = conn.getCollection('local.oplog.rs').find().sort({$natural: -1}).limit(1).next().t; + if (typeof(term) == 'undefined') { + term = -1; // Use a dummy term for PV0. + } + + function runTest({ + oplogEntries, + collectionContents, + deletePoint, + begin, + minValid, + expectedState, + expectedApplied, + }) { + if (term != -1) { + term++; // Each test gets a new term on PV1 to ensure OpTimes always move forward. + } + + conn = rst.restart(0, {noReplSet: true}); // Restart as a standalone node. + assert.neq(null, conn, "failed to restart"); + var oplog = conn.getCollection('local.oplog.rs'); + var minValidColl = conn.getCollection('local.replset.minvalid'); + var coll = conn.getCollection(ns); + + // Reset state to empty. + assert.commandWorked(oplog.runCommand('emptycapped')); + coll.drop(); + assert.commandWorked(coll.runCommand('create')); + + var ts = (num) => num === null ? Timestamp() : Timestamp(1000, num); + + oplogEntries.forEach((num) => { + assert.writeOK(oplog.insert({ + ts: ts(num), + t: term, + h: NumberLong(1), + op: 'i', + ns: ns, + o: {_id: num}, + })); + }); + + collectionContents.forEach((num) => { + assert.writeOK(coll.insert({_id: num})); + }); + + var injectedMinValidDoc = { + _id: ObjectId(), + + // minvalid: + ts: ts(minValid), + t: term, + + // appliedThrough + begin: { + ts: ts(begin), + t: term, + }, + + oplogDeleteFromPoint: ts(deletePoint), + }; + + // This weird mechanism is the only way to bypass mongod's attempt to fill in null + // Timestamps. + assert.writeOK(minValidColl.remove({})); + assert.writeOK(minValidColl.update({}, {$set: injectedMinValidDoc}, {upsert: true})); + assert.eq(minValidColl.findOne(), + injectedMinValidDoc, + "If the Timestamps differ, the server may be filling in the null timestamps"); + + try { + conn = rst.restart(0); // Restart in replSet mode again. + } catch (e) { + assert.eq(expectedState, 'FATAL', 'node failed to restart: ' + e); + return; + } + + // Wait for the node to go to SECONDARY if it is able. + assert.soon( + () => + conn.adminCommand('serverStatus').metrics.repl.apply.attemptsToBecomeSecondary > 0, + () => conn.adminCommand('serverStatus').metrics.repl.apply.attemptsToBecomeSecondary); + + var isMaster = conn.adminCommand('ismaster'); + switch (expectedState) { + case 'SECONDARY': + // Primary is also acceptable since once a node becomes secondary, it will try to + // become primary if it is eligible and has enough votes (which this node does). + // This is supposed to test that we reach secondary, not that we stay there. + assert(isMaster.ismaster || isMaster.secondary, + 'not PRIMARY or SECONDARY: ' + tojson(isMaster)); + + // Wait for node to become primary. This is nesessary to avoid the find below + // failing with "NotMasterOrSecondary" errors if it happens to run while the + // node is in drain mode while becoming primary. + conn = rst.getPrimary(); + break; + + case 'RECOVERING': + assert(!isMaster.ismaster && !isMaster.secondary, + 'not in RECOVERING: ' + tojson(isMaster)); + + // Restart as a standalone node again so we can read from the collection. + conn = rst.restart(0, {noReplSet: true}); + break; + + case 'FATAL': + doassert("server startup didn't fail when it should have"); + break; + + default: + doassert('expectedState ' + expectedState + ' is not supported'); + } + + // Ensure the oplog has the entries it should have and none that it shouldn't. + assert.eq(conn.getCollection('local.oplog.rs') + .find({ns: ns, op: 'i'}) + .sort({$natural: 1}) + .map((op) => op.o._id), + expectedApplied); + + // Ensure that all ops that should have been applied were. + conn.setSlaveOk(true); + assert.eq(conn.getCollection(ns).find().sort({_id: 1}).map((obj) => obj._id), + expectedApplied); + } + + // + // Normal 3.4 cases + // + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: null, + minValid: null, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: null, + minValid: 2, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: null, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: 3, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: 4, + begin: 3, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5, 6], + collectionContents: [1, 2, 3], + deletePoint: 4, + begin: 3, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3, /*4,*/ 5, 6], + collectionContents: [1, 2, 3], + deletePoint: 4, + begin: 3, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5, 6], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: 3, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3, 4, 5, 6], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5, 6], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: 3, + minValid: 6, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3, 4, 5, 6], + }); + + // + // 3.2 -> 3.4 upgrade cases + // + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: 3, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5], + collectionContents: [1, 2, 3], + deletePoint: null, + begin: 3, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3, 4, 5], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5], + collectionContents: [1, 2, 3, 4, 5], + deletePoint: null, + begin: null, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3, 4, 5], + }); + + // + // 3.4 -> 3.2 -> 3.4 downgrade/reupgrade cases + // + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: 4, + begin: 3, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5], + collectionContents: [1, 2, 3], + deletePoint: 4, + begin: 3, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3, /*4,*/ 5, 6], + collectionContents: [1, 2, 3], + deletePoint: 4, + begin: 3, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: 2, + begin: null, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3], + deletePoint: 2, + begin: 3, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5], + collectionContents: [1, 2, 3], + deletePoint: 2, + begin: 3, + minValid: 6, + expectedState: 'RECOVERING', + expectedApplied: [1, 2, 3, 4, 5], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5, 6], + collectionContents: [1, 2, 3], + deletePoint: 2, + begin: 3, + minValid: 6, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3, 4, 5, 6], + }); + + // + // These states should be impossible to get into. + // + + runTest({ + oplogEntries: [1, 2, 3], + collectionContents: [1, 2, 3, 4], + deletePoint: null, + begin: 4, + minValid: null, // doesn't matter. + expectedState: 'FATAL', + }); + + runTest({ + oplogEntries: [4, 5, 6], + collectionContents: [1, 2], + deletePoint: 2, + begin: 3, + minValid: null, // doesn't matter. + expectedState: 'FATAL', + }); + + runTest({ + oplogEntries: [4, 5, 6], + collectionContents: [1, 2], + deletePoint: null, + begin: 3, + minValid: null, // doesn't matter. + expectedState: 'FATAL', + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5, 6], + collectionContents: [1, 2, 3], + deletePoint: 2, + begin: 3, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3, 4, 5, 6], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5, 6], + collectionContents: [1, 2, 3, 4, 5], + deletePoint: null, + begin: 5, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3, 4, 5, 6], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5, 6], + collectionContents: [1, 2, 3, 4, 5], + deletePoint: null, + begin: 5, + minValid: null, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3, 4, 5, 6], + }); + + runTest({ + oplogEntries: [1, 2, 3, 4, 5], + collectionContents: [1], + deletePoint: 4, + begin: 1, + minValid: 3, + expectedState: 'SECONDARY', + expectedApplied: [1, 2, 3], + }); + + rst.stopSet(); +})(); diff --git a/jstests/replsets/oplog_truncated_on_recovery.js b/jstests/replsets/oplog_truncated_on_recovery.js index 96be5865cc3..4d469178691 100644 --- a/jstests/replsets/oplog_truncated_on_recovery.js +++ b/jstests/replsets/oplog_truncated_on_recovery.js @@ -1,10 +1,10 @@ /** - * This test will ensure that a failed a batch apply will remove the any oplog + * This test will ensure that recovery from a failed batch application will remove the oplog * entries from that batch. * * To do this we: * -- Create single node replica set - * -- Set minvalid manually on primary way ahead (5 minutes) + * -- Set minvalid manually on primary way ahead (5 days) * -- Write some oplog entries newer than minvalid.start * -- Ensure restarted primary comes up in recovering and truncates the oplog * -- Success! @@ -40,35 +40,36 @@ // Write op log(assert.writeOK(testDB.foo.save({_id: 1, a: 1}, {writeConcern: {w: 1}}))); - // Set minvalid to something far in the future for the current primary, to - // simulate recovery. - // Note: This is so far in the future (5 days) that it will never become - // secondary. + // Set minvalid to something far in the future for the current primary, to simulate recovery. + // Note: This is so far in the future (5 days) that it will never become secondary. var farFutureTS = new Timestamp( Math.floor(new Date().getTime() / 1000) + (60 * 60 * 24 * 5 /* in five days */), 0); var rsgs = assert.commandWorked(localDB.adminCommand("replSetGetStatus")); log(rsgs); var primaryOpTime = rsgs.members[0].optime; - var primaryLastTS = rsgs.members[0].optime.ts; - log(primaryLastTS); + log(primaryOpTime); // Set the start of the failed batch - primaryOpTime.ts = new Timestamp(primaryOpTime.ts.t, primaryOpTime.ts.i + 1); + // TODO this test should restart in stand-alone mode to futz with the state rather than trying + // to do it on a running primary. - log(primaryLastTS); jsTest.log("future TS: " + tojson(farFutureTS) + ", date:" + tsToDate(farFutureTS)); - // We do an update in case there is a minvalid document on the primary - // already. - // If the doc doesn't exist then upsert:true will create it, and the - // writeConcern ensures - // that update returns details of the write, like whether an update or - // insert was performed. - log(assert.writeOK( - minvalidColl.update({}, - {ts: farFutureTS, t: NumberLong(-1), begin: primaryOpTime}, - {upsert: true, writeConcern: {w: 1}}))); + var divergedTS = new Timestamp(primaryOpTime.ts.t, primaryOpTime.ts.i + 1); + // We do an update in case there is a minvalid document on the primary already. + // If the doc doesn't exist then upsert:true will create it, and the writeConcern ensures + // that update returns details of the write, like whether an update or insert was performed. + log(assert.writeOK(minvalidColl.update({}, + { + ts: farFutureTS, + t: NumberLong(-1), + begin: primaryOpTime, + oplogDeleteFromPoint: divergedTS + }, + {upsert: true, writeConcern: {w: 1}}))); - log(assert.writeOK(localDB.oplog.rs.insert({_id: 0, ts: primaryOpTime.ts, op: "n", term: -1}))); + // Insert a diverged oplog entry that will be truncated after restart. + log(assert.writeOK(localDB.oplog.rs.insert( + {_id: 0, ts: divergedTS, op: "n", h: NumberLong(0), t: NumberLong(-1)}))); log(localDB.oplog.rs.find().toArray()); log(assert.commandWorked(localDB.adminCommand("replSetGetStatus"))); log("restart primary"); @@ -88,7 +89,7 @@ var lastTS = localDB.oplog.rs.find().sort({$natural: -1}).limit(-1).next().ts; log(localDB.oplog.rs.find().toArray()); - assert.eq(primaryLastTS, lastTS); + assert.eq(primaryOpTime.ts, lastTS); return true; }); diff --git a/jstests/replsets/slave_delay_clean_shutdown.js b/jstests/replsets/slave_delay_clean_shutdown.js new file mode 100644 index 00000000000..db08dfab228 --- /dev/null +++ b/jstests/replsets/slave_delay_clean_shutdown.js @@ -0,0 +1,61 @@ +// SERVER-21118 don't hang at shutdown or apply ops too soon with slaveDelay. +// +// @tags: [requires_persistence] +load('jstests/replsets/rslib.js'); +(function() { + "use strict"; + + var ns = "test.coll"; + + var rst = new ReplSetTest({ + nodes: 2, + }); + + var conf = rst.getReplSetConfig(); + conf.members[1].votes = 0; + conf.members[1].priority = 0; + conf.members[1].hidden = true; + conf.members[1].slaveDelay = 0; // Set later. + + rst.startSet(); + rst.initiate(conf); + + var master = rst.getPrimary(); // Waits for PRIMARY state. + + // Push some ops through before setting slave delay. + assert.writeOK(master.getCollection(ns).insert([{}, {}, {}], {writeConcern: {w: 2}})); + + // Set slaveDelay and wait for secondary to receive the change. + conf = rst.getReplSetConfigFromNode(); + conf.version++; + conf.members[1].slaveDelay = 24 * 60 * 60; + reconfig(rst, conf); + assert.soon(() => rst.getReplSetConfigFromNode(1).members[1].slaveDelay > 0, + () => rst.getReplSetConfigFromNode(1)); + + sleep(2000); // The secondary apply loop only checks for slaveDelay changes once per second. + var secondary = rst.getSecondary(); + const lastOp = getLatestOp(secondary); + + assert.writeOK(master.getCollection(ns).insert([{}, {}, {}])); + assert.soon(() => secondary.adminCommand('serverStatus').metrics.repl.buffer.count > 0, + () => secondary.adminCommand('serverStatus').metrics.repl); + assert.neq(getLatestOp(master), lastOp); + assert.eq(getLatestOp(secondary), lastOp); + + sleep(2000); // Prevent the test from passing by chance. + assert.eq(getLatestOp(secondary), lastOp); + + // Make sure shutdown won't take a long time due to I/O. + secondary.adminCommand('fsync'); + + // Shutting down shouldn't take long. + assert.lt(Date.timeFunc(() => rst.stop(1)), 60 * 1000); + + secondary = rst.restart(1); + assert.eq(getLatestOp(secondary), lastOp); + sleep(2000); // Prevent the test from passing by chance. + assert.eq(getLatestOp(secondary), lastOp); + + rst.stopSet(); +})(); diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 86576565532..2bd2d003b49 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -1273,8 +1273,6 @@ void exitCleanly(ExitCode code) { getGlobalServiceContext()->setKillAllOperations(); - repl::getGlobalReplicationCoordinator()->shutdown(); - Client& client = cc(); ServiceContext::UniqueOperationContext uniqueTxn; OperationContext* txn = client.getOperationContext(); @@ -1283,6 +1281,8 @@ void exitCleanly(ExitCode code) { txn = uniqueTxn.get(); } + + repl::getGlobalReplicationCoordinator()->shutdown(txn); ShardingState::get(txn)->shutDown(txn); // We should always be able to acquire the global lock at shutdown. diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 2563de2d9ea..007a37c5c25 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -710,7 +710,9 @@ env.Library( 'roll_back_local_operations.cpp', ], LIBDEPS=[ + 'optime', '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/util/foundation', ], ) diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index e34c4f73335..a461d5f2c8c 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -325,6 +325,13 @@ void BackgroundSync::_produce(OperationContext* txn) { syncSourceReader.resetConnection(); // no more references to oplog reader from here on. + // Set the applied point if unset. This is most likely the first time we've established a sync + // source since stepping down or otherwise clearing the applied point. We need to set this here, + // before the OplogWriter gets a chance to append to the oplog. + if (getAppliedThrough(txn).isNull()) { + setAppliedThrough(txn, _replCoord->getMyLastAppliedOpTime()); + } + Status fetcherReturnStatus = Status::OK(); auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback, this, @@ -424,13 +431,13 @@ void BackgroundSync::_produce(OperationContext* txn) { } // check that we are at minvalid, otherwise we cannot roll back as we may be in an // inconsistent state - BatchBoundaries boundaries = getMinValid(txn); - if (!boundaries.start.isNull() || boundaries.end > lastApplied) { + const auto minValid = getMinValid(txn); + if (lastApplied < minValid) { fassertNoTrace(18750, Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << "need to rollback, but in inconsistent state. " - << "minvalid: " << boundaries.end.toString() + << "minvalid: " << minValid.toString() << " > our last optime: " << lastApplied.toString())); } diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp index 90753cff0f4..02a56cfab2f 100644 --- a/src/mongo/db/repl/minvalid.cpp +++ b/src/mongo/db/repl/minvalid.cpp @@ -47,121 +47,142 @@ namespace mongo { namespace repl { namespace { -const char initialSyncFlagString[] = "doingInitialSync"; -const BSONObj initialSyncFlag(BSON(initialSyncFlagString << true)); -const char minvalidNS[] = "local.replset.minvalid"; -const char beginFieldName[] = "begin"; -} // namespace +const char kInitialSyncFlagFieldName[] = "doingInitialSync"; +const BSONObj kInitialSyncFlag(BSON(kInitialSyncFlagFieldName << true)); +NamespaceString minValidNss("local.replset.minvalid"); +const char kBeginFieldName[] = "begin"; +const char kOplogDeleteFromPointFieldName[] = "oplogDeleteFromPoint"; -// Writes -void clearInitialSyncFlag(OperationContext* txn) { +BSONObj getMinValidDocument(OperationContext* txn) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IS); + Lock::DBLock dblk(txn->lockState(), minValidNss.db(), MODE_IS); + Lock::CollectionLock lk(txn->lockState(), minValidNss.ns(), MODE_IS); + BSONObj doc; + bool found = Helpers::getSingleton(txn, minValidNss.ns().c_str(), doc); + invariant(found || doc.isEmpty()); + return doc; + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValidDocument", minValidNss.ns()); + + MONGO_UNREACHABLE; +} + +void updateMinValidDocument(OperationContext* txn, const BSONObj& updateSpec) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IX); - // TODO: Investigate correctness of taking MODE_IX for DB/Collection locks - Lock::DBLock dblk(txn->lockState(), "local", MODE_X); - Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag)); + // For now this needs to be MODE_X because it sometimes creates the collection. + Lock::DBLock dblk(txn->lockState(), minValidNss.db(), MODE_X); + Helpers::putSingleton(txn, minValidNss.ns().c_str(), updateSpec); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "updateMinValidDocument", minValidNss.ns()); +} +} // namespace +// Writes +void clearInitialSyncFlag(OperationContext* txn) { auto replCoord = repl::ReplicationCoordinator::get(txn); OpTime time = replCoord->getMyLastAppliedOpTime(); + updateMinValidDocument(txn, + BSON("$unset" + << kInitialSyncFlag << "$set" + << BSON("ts" << time.getTimestamp() << "t" << time.getTerm() + << kBeginFieldName << time.toBSON()))); txn->recoveryUnit()->waitUntilDurable(); replCoord->setMyLastDurableOpTime(time); LOG(3) << "clearing initial sync flag"; } void setInitialSyncFlag(OperationContext* txn) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dblk(txn->lockState(), "local", MODE_X); - Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag)); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS); - + updateMinValidDocument(txn, BSON("$set" << kInitialSyncFlag)); txn->recoveryUnit()->waitUntilDurable(); LOG(3) << "setting initial sync flag"; } -void setMinValid(OperationContext* txn, const OpTime& endOpTime, const DurableRequirement durReq) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dblk(txn->lockState(), "local", MODE_X); - Helpers::putSingleton( - txn, - minvalidNS, - BSON("$set" << BSON("ts" << endOpTime.getTimestamp() << "t" << endOpTime.getTerm()) - << "$unset" << BSON(beginFieldName << 1))); +bool getInitialSyncFlag() { + OperationContextImpl txn; + return getInitialSyncFlag(&txn); +} +bool getInitialSyncFlag(OperationContext* txn) { + const BSONObj doc = getMinValidDocument(txn); + const auto flag = doc[kInitialSyncFlagFieldName].trueValue(); + LOG(3) << "returning initial sync flag value of " << flag; + return flag; +} + +OpTime getMinValid(OperationContext* txn) { + const BSONObj doc = getMinValidDocument(txn); + const auto opTimeStatus = OpTime::parseFromOplogEntry(doc); + // If any of the keys (fields) are missing from the minvalid document, we return + // a null OpTime. + if (opTimeStatus == ErrorCodes::NoSuchKey) { + return {}; } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setMinValid", minvalidNS); - if (durReq == DurableRequirement::Strong) { - txn->recoveryUnit()->waitUntilDurable(); + if (!opTimeStatus.isOK()) { + severe() << "Error parsing minvalid entry: " << doc + << ", with status:" << opTimeStatus.getStatus(); + fassertFailedNoTrace(40052); } - LOG(3) << "setting minvalid: " << endOpTime.toString() << "(" << endOpTime.toBSON() << ")"; + + OpTime minValid = opTimeStatus.getValue(); + LOG(3) << "returning minvalid: " << minValid.toString() << "(" << minValid.toBSON() << ")"; + + return minValid; +} +void setMinValid(OperationContext* txn, const OpTime& minValid) { + LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON() + << ")"; + updateMinValidDocument( + txn, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); } -void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) { - const OpTime& start(boundaries.start); - const OpTime& end(boundaries.end); - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dblk(txn->lockState(), "local", MODE_X); - Helpers::putSingleton(txn, - minvalidNS, - BSON("$set" << BSON("ts" << end.getTimestamp() << "t" << end.getTerm() - << beginFieldName << start.toBSON()))); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setMinValid", minvalidNS); - // NOTE: No need to ensure durability here since starting a batch isn't a problem unless - // writes happen after, in which case this marker (minvalid) will be written already. - LOG(3) << "setting minvalid: " << boundaries.start.toString() << "(" - << boundaries.start.toBSON() << ") -> " << boundaries.end.toString() << "(" - << boundaries.end.toBSON() << ")"; +void setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) { + LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON() + << ")"; + updateMinValidDocument( + txn, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); } -// Reads -bool getInitialSyncFlag() { - OperationContextImpl txn; - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(&txn, MODE_IS); - Lock::DBLock dblk(txn.lockState(), "local", MODE_IS); - Lock::CollectionLock lk(txn.lockState(), minvalidNS, MODE_IS); - BSONObj mv; - bool found = Helpers::getSingleton(&txn, minvalidNS, mv); - - if (found) { - const auto flag = mv[initialSyncFlagString].trueValue(); - LOG(3) << "return initial flag value of " << flag; - return flag; - } - LOG(3) << "return initial flag value of false"; - return false; +void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) { + LOG(3) << "setting oplog delete from point to: " << timestamp.toStringPretty(); + updateMinValidDocument(txn, BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp))); +} + +Timestamp getOplogDeleteFromPoint(OperationContext* txn) { + const BSONObj doc = getMinValidDocument(txn); + Timestamp out = {}; + if (auto field = doc[kOplogDeleteFromPointFieldName]) { + out = field.timestamp(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(&txn, "getInitialSyncFlags", minvalidNS); - MONGO_UNREACHABLE; + LOG(3) << "returning oplog delete from point: " << out; + return out; } -BatchBoundaries getMinValid(OperationContext* txn) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IS); - Lock::DBLock dblk(txn->lockState(), "local", MODE_IS); - Lock::CollectionLock lk(txn->lockState(), minvalidNS, MODE_IS); - BSONObj mv; - bool found = Helpers::getSingleton(txn, minvalidNS, mv); - if (found) { - auto status = OpTime::parseFromOplogEntry(mv.getObjectField(beginFieldName)); - OpTime start(status.isOK() ? status.getValue() : OpTime{}); - OpTime end(fassertStatusOK(28771, OpTime::parseFromOplogEntry(mv))); - LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> " - << end.toString() << "(" << end.toBSON() << ")"; - - return BatchBoundaries(start, end); - } - LOG(3) << "returning empty minvalid"; - return BatchBoundaries{OpTime{}, OpTime{}}; +void setAppliedThrough(OperationContext* txn, const OpTime& optime) { + LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")"; + if (optime.isNull()) { + updateMinValidDocument(txn, BSON("$unset" << BSON(kBeginFieldName << 1))); + } else { + updateMinValidDocument(txn, BSON("$set" << BSON(kBeginFieldName << optime.toBSON()))); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS); -} } + +OpTime getAppliedThrough(OperationContext* txn) { + const BSONObj doc = getMinValidDocument(txn); + const auto opTimeStatus = OpTime::parseFromOplogEntry(doc.getObjectField(kBeginFieldName)); + if (!opTimeStatus.isOK()) { + // Return null OpTime on any parse failure, including if "begin" is missing. + return {}; + } + + OpTime appliedThrough = opTimeStatus.getValue(); + LOG(3) << "returning appliedThrough: " << appliedThrough.toString() << "(" + << appliedThrough.toBSON() << ")"; + + return appliedThrough; } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/minvalid.h b/src/mongo/db/repl/minvalid.h index d92d5d0c4a6..ba28e0f5927 100644 --- a/src/mongo/db/repl/minvalid.h +++ b/src/mongo/db/repl/minvalid.h @@ -36,17 +36,6 @@ class OperationContext; namespace repl { -struct BatchBoundaries { - BatchBoundaries(const OpTime s, const OpTime e) : start(s), end(e) {} - OpTime start; - OpTime end; -}; - -enum class DurableRequirement { - None, // Does not require any durability of the write. - Strong, // Requires journal or checkpoint write. -}; - /** * Helper functions for maintaining a single document in the local.replset.minvalid collection. * @@ -79,30 +68,42 @@ void setInitialSyncFlag(OperationContext* txn); /** * Returns true if the initial sync flag is set (and presumed active). */ +bool getInitialSyncFlag(OperationContext* txn); bool getInitialSyncFlag(); +/** + * The minValid value is the earliest (minimum) Timestamp that must be applied in order to + * consider the dataset consistent. + */ +void setMinValid(OperationContext* txn, const OpTime& minValid); +OpTime getMinValid(OperationContext* txn); /** - * Returns the bounds of the current apply batch, if active. If start is null/missing, and - * end is equal to the last oplog entry then we are in a consistent state and ready for reads. + * Sets minValid only if it is not already higher than endOpTime. + * Warning, this compares the term and timestamp independently. Do not use if the current + * minValid could be from the other fork of a rollback. */ -BatchBoundaries getMinValid(OperationContext* txn); +void setMinValidToAtLeast(OperationContext* txn, const OpTime& endOpTime); /** - * The minValid value is the earliest (minimum) Timestamp that must be applied in order to - * consider the dataset consistent. - * - * This is called when a batch finishes. - * - * Wait for durable writes (which will block on journaling/checkpointing) when specified. - * + * On startup all oplog entries with a value >= the oplog delete from point should be deleted. + * If null, no documents should be deleted. */ -void setMinValid(OperationContext* ctx, const OpTime& endOpTime, const DurableRequirement durReq); +void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp); +Timestamp getOplogDeleteFromPoint(OperationContext* txn); /** - * The bounds indicate an apply is active and we are not in a consistent state to allow reads - * or transition from a non-visible state to primary/secondary. + * The applied through point is a persistent record of where we've applied through. If null, the + * applied through point is the top of the oplog. + */ +void setAppliedThrough(OperationContext* txn, const OpTime& optime); + +/** + * You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead. + * + * This reads the value from storage which isn't always updated when the ReplicationCoordinator + * is. */ -void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries); +OpTime getAppliedThrough(OperationContext* txn); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 03c2aa06acf..d98db32fd43 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -304,7 +304,7 @@ unique_ptr<OplogDocWriter> _logOpWriter(OperationContext* txn, } } // end anon namespace -// Truncates the oplog to and including the "truncateTimestamp" entry. +// Truncates the oplog after and including the "truncateTimestamp" entry. void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { const NamespaceString oplogNss(rsOplogName); ScopedTransaction transaction(txn, MODE_IX); @@ -318,36 +318,41 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { } // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. - bool foundSomethingToTruncate = false; - RecordId lastRecordId; - BSONObj lastOplogEntry; + RecordId oldestIDToDelete; // Non-null if there is something to delete. auto oplogRs = oplogCollection->getRecordStore(); - auto oplogReverseCursor = oplogRs->getCursor(txn, false); - bool first = true; + auto oplogReverseCursor = oplogRs->getCursor(txn, /*forward=*/false); + size_t count = 0; while (auto next = oplogReverseCursor->next()) { - lastOplogEntry = next->data.releaseToBson(); - lastRecordId = next->id; + const BSONObj entry = next->data.releaseToBson(); + const RecordId id = next->id; + count++; - const auto tsElem = lastOplogEntry["ts"]; - - if (first) { + const auto tsElem = entry["ts"]; + if (count == 1) { if (tsElem.eoo()) - LOG(2) << "Oplog tail entry: " << lastOplogEntry; + LOG(2) << "Oplog tail entry: " << entry; else LOG(2) << "Oplog tail entry ts field: " << tsElem; - first = false; } if (tsElem.timestamp() < truncateTimestamp) { - break; + // If count == 1, that means that we have nothing to delete because everything in the + // oplog is < truncateTimestamp. + if (count != 1) { + invariant(!oldestIDToDelete.isNull()); + oplogCollection->temp_cappedTruncateAfter( + txn, oldestIDToDelete, /*inclusive=*/true); + } + return; } - foundSomethingToTruncate = true; + oldestIDToDelete = id; } - if (foundSomethingToTruncate) { - oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false); - } + severe() << "Reached end of oplog looking for oplog entry before " + << truncateTimestamp.toStringPretty() + << " but couldn't find any after looking through " << count << " entries."; + fassertFailedNoTrace(40296); } /* we write to local.oplog.rs: diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index c1b6e37f82f..f0cb34c7a1e 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -165,10 +165,9 @@ void OplogReader::connectToSyncSource(OperationContext* txn, log() << "our last optime : " << lastOpTimeFetched; log() << "oldest available is " << oldestOpTimeSeen; log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; - setMinValid(txn, {lastOpTimeFetched, oldestOpTimeSeen}); auto status = replCoord->setMaintenanceMode(true); if (!status.isOK()) { - warning() << "Failed to transition into maintenance mode."; + warning() << "Failed to transition into maintenance mode: " << status; } bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING); if (!worked) { diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 630b9277b48..dd55fcca799 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -133,7 +133,7 @@ public: * components of the replication system to shut down and stop any threads they are using, * blocking until all replication-related shutdown tasks are complete. */ - virtual void shutdown() = 0; + virtual void shutdown(OperationContext* txn) = 0; /** * Returns a pointer to the ReplicationExecutor. diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index aba9726fc04..b595f606a61 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -80,7 +80,7 @@ public: * Performs any necessary external state specific shutdown tasks, such as cleaning up * the threads it started. */ - virtual void shutdown() = 0; + virtual void shutdown(OperationContext* txn) = 0; /** * Creates the oplog, writes the first entry and stores the replica set config document. Sets @@ -90,10 +90,20 @@ public: const BSONObj& config, bool updateReplOpTime) = 0; + /** - * Writes a message about our transition to primary to the oplog. + * Called as part of the process of transitioning to primary and run with the global X lock and + * the replication coordinator mutex acquired, so no majoirty writes are allowed while in this + * state. See the call site in ReplicationCoordinatorImpl for details about when and how it is + * called. + * + * Among other things, this writes a message about our transition to primary to the oplog if + * isV1 and and returns the optime of that message. If !isV1, returns the optime of the last op + * in the oplog. + * + * Throws on errors. */ - virtual void logTransitionToPrimaryToOplog(OperationContext* txn) = 0; + virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) = 0; /** * Simple wrapper around SyncSourceFeedback::forwardSlaveProgress. Signals to the diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 75b08a4fd76..67abc29502e 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -42,6 +42,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/op_observer.h" @@ -70,6 +71,7 @@ #include "mongo/util/net/hostandport.h" #include "mongo/util/net/message_port.h" #include "mongo/util/net/sock.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { @@ -116,7 +118,7 @@ void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* repl::startMasterSlave(txn); } -void ReplicationCoordinatorExternalStateImpl::shutdown() { +void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_threadMutex); if (_startedThreads) { log() << "Stopping replication applier threads"; @@ -129,6 +131,13 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() { if (_snapshotThread) _snapshotThread->shutdown(); + + if (getOplogDeleteFromPoint(txn).isNull() && + loadLastOpTime(txn) == getAppliedThrough(txn)) { + // Clear the appliedThrough marker to indicate we are consistent with the top of the + // oplog. + setAppliedThrough(txn, {}); + } } } @@ -169,24 +178,45 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati wuow.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs"); + + // This initializes the minvalid document with a null "ts" because older versions (<=3.2) + // get angry if the minValid document is present but doesn't have a "ts" field. + // Consider removing this once we no longer need to support downgrading to 3.2. + setMinValidToAtLeast(txn, {}); } catch (const DBException& ex) { return ex.toStatus(); } return Status::OK(); } -void ReplicationCoordinatorExternalStateImpl::logTransitionToPrimaryToOplog(OperationContext* txn) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(txn, MODE_X); +OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn, + bool isV1ElectionProtocol) { + invariant(txn->lockState()->isW()); + + // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be + // done before we add anything to our oplog. + invariant(getOplogDeleteFromPoint(txn).isNull()); + setAppliedThrough(txn, {}); + + if (isV1ElectionProtocol) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction scopedXact(txn, MODE_X); - WriteUnitOfWork wuow(txn); - txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(txn, - BSON("msg" - << "new primary")); - wuow.commit(); + WriteUnitOfWork wuow(txn); + txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage( + txn, + BSON("msg" + << "new primary")); + wuow.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, "logging transition to primary to oplog", "local.oplog.rs"); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "logging transition to primary to oplog", "local.oplog.rs"); + const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn)); + + dropAllTempCollections(txn); + + return opTimeToReturn; } void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() { @@ -301,18 +331,94 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp } void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) { - auto mv = getMinValid(txn); + if (getInitialSyncFlag(txn)) { + return; // Initial Sync will take over so no cleanup is needed. + } - if (!mv.start.isNull()) { - // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog. - LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON(); - truncateOplogTo(txn, mv.start.getTimestamp()); + const auto deleteFromPoint = getOplogDeleteFromPoint(txn); + const auto appliedThrough = getAppliedThrough(txn); + + const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() && + // This version should never have a non-null deleteFromPoint with a null appliedThrough. + // This scenario means that we downgraded after unclean shutdown, then the downgraded node + // deleted the ragged end of our oplog, then did a clean shutdown. + !appliedThrough.isNull() && + // Similarly we should never have an appliedThrough higher than the deleteFromPoint. This + // means that the downgraded node deleted our ragged end then applied ahead of our + // deleteFromPoint and then had an unclean shutdown before upgrading. We are ok with + // applying these ops because older versions wrote to the oplog from a single thread so we + // know they are in order. + !(appliedThrough.getTimestamp() >= deleteFromPoint); + if (needToDeleteEndOfOplog) { + log() << "Removing unapplied entries starting at: " << deleteFromPoint; + truncateOplogTo(txn, deleteFromPoint); + } + setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint + + if (appliedThrough.isNull()) { + // No follow-up work to do. + return; + } + + // Check if we have any unapplied ops in our oplog. It is important that this is done after + // deleting the ragged end of the oplog. + const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn)); + if (appliedThrough == topOfOplog) { + return; // We've applied all the valid oplog we have. + } else if (appliedThrough > topOfOplog) { + severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog + << '.'; + fassertFailedNoTrace(40313); + } + + log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " + << topOfOplog << " (inclusive)."; + + DBDirectClient db(txn); + auto cursor = db.query(rsOplogName, + QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), + /*batchSize*/ 0, + /*skip*/ 0, + /*projection*/ nullptr, + QueryOption_OplogReplay); + + // Check that the first document matches our appliedThrough point then skip it since it's + // already been applied. + if (!cursor->more()) { + // This should really be impossible because we check above that the top of the oplog is + // strictly > appliedThrough. If this fails it represents a serious bug in either the + // storage engine or query's implementation of OplogReplay. + severe() << "Couldn't find any entries in the oplog >= " << appliedThrough + << " which should be impossible."; + fassertFailedNoTrace(40293); + } + auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())); + if (firstOpTimeFound != appliedThrough) { + severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is " + << firstOpTimeFound; + fassertFailedNoTrace(40292); + } + + // Apply remaining ops one at at time, but don't log them because they are already logged. + const bool wereWritesReplicated = txn->writesAreReplicated(); + ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); }); + txn->setReplicatedWrites(false); + + while (cursor->more()) { + auto entry = cursor->nextSafe(); + fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true)); + setAppliedThrough(txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); } } StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) { // TODO: handle WriteConflictExceptions below try { + // If we are doing an initial sync do not read from the oplog. + if (getInitialSyncFlag(txn)) { + return {ErrorCodes::InitialSyncFailure, "In the middle of an initial sync."}; + } + BSONObj oplogEntry; if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) { return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument, diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 4ddc917d129..26667fbba96 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -53,11 +53,11 @@ public: virtual ~ReplicationCoordinatorExternalStateImpl(); virtual void startThreads(const ReplSettings& settings) override; virtual void startMasterSlave(OperationContext* txn); - virtual void shutdown(); + virtual void shutdown(OperationContext* txn); virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config, bool updateReplOpTime); - virtual void logTransitionToPrimaryToOplog(OperationContext* txn); + OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) override; virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext* txn); virtual bool isSelf(const HostAndPort& host); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index d1f6f567e72..d174145751c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -69,7 +69,7 @@ Status ReplicationCoordinatorExternalStateMock::initializeReplSetStorage(Operati return storeLocalConfigDocument(txn, config); } -void ReplicationCoordinatorExternalStateMock::shutdown() {} +void ReplicationCoordinatorExternalStateMock::shutdown(OperationContext*) {} void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {} OID ReplicationCoordinatorExternalStateMock::ensureMe(OperationContext*) { @@ -233,8 +233,12 @@ bool ReplicationCoordinatorExternalStateMock::isReadCommittedSupportedByStorageE return true; } -void ReplicationCoordinatorExternalStateMock::logTransitionToPrimaryToOplog(OperationContext* txn) { - _lastOpTime = OpTime(Timestamp(1, 0), 1); +OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* txn, + bool isV1ElectionProtocol) { + if (isV1ElectionProtocol) { + _lastOpTime = OpTime(Timestamp(1, 0), 1); + } + return fassertStatusOK(40297, _lastOpTime); } } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index c16b0760c0d..4aa05861787 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -54,11 +54,11 @@ public: virtual ~ReplicationCoordinatorExternalStateMock(); virtual void startThreads(const ReplSettings& settings) override; virtual void startMasterSlave(OperationContext*); - virtual void shutdown(); + virtual void shutdown(OperationContext*); virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config, bool updateReplOpTime); - virtual void logTransitionToPrimaryToOplog(OperationContext* txn); + OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) override; virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext*); virtual bool isSelf(const HostAndPort& host); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 45c3c0644d4..8772ee39829 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -351,7 +351,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { fassertFailedNoTrace(28545); } - // Returns the last optime from the oplog, possibly truncating first if we need to recover. + // Read the last op from the oplog after cleaning up any partially applied batches. _externalState->cleanUpLastApplyBatch(txn); auto lastOpTimeStatus = _externalState->loadLastOpTime(txn); @@ -492,7 +492,7 @@ void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) { } } -void ReplicationCoordinatorImpl::shutdown() { +void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { // Shutdown must: // * prevent new threads from blocking in awaitReplication // * wake up all existing threads blocking in awaitReplication @@ -525,7 +525,7 @@ void ReplicationCoordinatorImpl::shutdown() { // joining the replication executor is blocking so it must be run outside of the mutex _replExecutor.shutdown(); _replExecutor.join(); - _externalState->shutdown(); + _externalState->shutdown(txn); } const ReplSettings& ReplicationCoordinatorImpl::getSettings() const { @@ -675,7 +675,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { // _isWaitingForDrainToComplete, set the flag allowing non-local database writes and // drop the mutex. At this point, no writes can occur from other threads, due to the // global exclusive lock. - // 4.) Drop all temp collections. + // 4.) Drop all temp collections, and log the drops to the oplog. // 5.) Log transition to primary in the oplog and set that OpTime as the floor for what we will // consider to be committed. // 6.) Drop the global exclusive lock. @@ -687,6 +687,9 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { // external writes will be processed. This is important so that a new temp collection isn't // introduced on the new primary before we drop all the temp collections. + // When we go to drop all temp collections, we must replicate the drops. + invariant(txn->writesAreReplicated()); + stdx::unique_lock<stdx::mutex> lk(_mutex); if (!_isWaitingForDrainToComplete) { return; @@ -697,32 +700,30 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite globalWriteLock(txn->lockState()); - lk.lock(); + if (!_isWaitingForDrainToComplete) { return; } _isWaitingForDrainToComplete = false; - _canAcceptNonLocalWrites = true; _drainFinishedCond.notify_all(); - lk.unlock(); - - _externalState->dropAllTempCollections(txn); - // This is done for compatibility with PV0 replicas wrt how "n" ops are processed. - if (isV1ElectionProtocol()) { - _externalState->logTransitionToPrimaryToOplog(txn); + if (!_getMemberState_inlock().primary()) { + // We must have decided not to transition to primary while waiting for the applier to drain. + // Skip the rest of this function since it should only be done when really transitioning. + return; } - StatusWith<OpTime> lastOpTime = _externalState->loadLastOpTime(txn); - fassertStatusOK(28665, lastOpTime.getStatus()); - _setFirstOpTimeOfMyTerm(lastOpTime.getValue()); + invariant(!_canAcceptNonLocalWrites); + _canAcceptNonLocalWrites = true; + lk.unlock(); + _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(txn, isV1ElectionProtocol())); lk.lock(); + // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged - // our election in logTransitionToPrimaryToOplog(), above. + // our election in onTransitionToPrimary(), above. _updateLastCommittedOpTime_inlock(); - lk.unlock(); log() << "transition to primary complete; database writes are now permitted" << rsLog; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 3cc0d6e4995..0e1a72bab9a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -113,7 +113,7 @@ public: virtual void startReplication(OperationContext* txn) override; - virtual void shutdown() override; + virtual void shutdown(OperationContext* txn) override; virtual ReplicationExecutor* getExecutor() override { return &_replExecutor; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index deb917413a8..04de8fef35e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -2423,9 +2423,10 @@ TEST_F(ReplCoordTest, IsMaster) { } TEST_F(ReplCoordTest, LogAMessageWhenShutDownBeforeReplicationStartUpFinished) { + OperationContextNoop txn; init(); startCapturingLogMessages(); - getReplCoord()->shutdown(); + getReplCoord()->shutdown(&txn); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("shutdown() called before startReplication() finished")); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 3ea4d8eef42..7d3f987878f 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -52,7 +52,7 @@ void ReplicationCoordinatorMock::startReplication(OperationContext* txn) { // TODO } -void ReplicationCoordinatorMock::shutdown() { +void ReplicationCoordinatorMock::shutdown(OperationContext* txn) { // TODO } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 4a21d9ad705..15315cd5a8f 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -54,7 +54,7 @@ public: virtual void startReplication(OperationContext* txn); - virtual void shutdown(); + virtual void shutdown(OperationContext* txn); virtual ReplicationExecutor* getExecutor() override { return nullptr; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 412d1165a24..9106d4058b3 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -368,9 +368,10 @@ void ReplCoordTest::simulateSuccessfulElection() { } void ReplCoordTest::shutdown() { + OperationContextReplMock txn; invariant(_callShutdown); _net->exitNetwork(); - _repl->shutdown(); + _repl->shutdown(&txn); _callShutdown = false; } diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp index 2117458e9f4..92312d0d21c 100644 --- a/src/mongo/db/repl/roll_back_local_operations.cpp +++ b/src/mongo/db/repl/roll_back_local_operations.cpp @@ -41,6 +41,10 @@ namespace repl { namespace { +OpTime getOpTime(const OplogInterface::Iterator::Value& oplogValue) { + return fassertStatusOK(40298, OpTime::parseFromOplogEntry(oplogValue.first)); +} + Timestamp getTimestamp(const BSONObj& operation) { return operation["ts"].timestamp(); } @@ -116,7 +120,7 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations _scanned++; if (getHash(_localOplogValue) == getHash(operation)) { return StatusWith<RollbackCommonPoint>( - std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second)); + std::make_pair(getOpTime(_localOplogValue), _localOplogValue.second)); } auto status = _rollbackOperation(_localOplogValue.first); if (!status.isOK()) { @@ -139,14 +143,11 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations "Need to process additional remote operations."); } - if (getTimestamp(_localOplogValue) < getTimestamp(operation)) { - _scanned++; - return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey, - "Unable to determine common point. " - "Need to process additional remote operations."); - } - - return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId()); + invariant(getTimestamp(_localOplogValue) < getTimestamp(operation)); + _scanned++; + return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey, + "Unable to determine common point. " + "Need to process additional remote operations."); } StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations( diff --git a/src/mongo/db/repl/roll_back_local_operations.h b/src/mongo/db/repl/roll_back_local_operations.h index 20eb923083d..87a940ce57b 100644 --- a/src/mongo/db/repl/roll_back_local_operations.h +++ b/src/mongo/db/repl/roll_back_local_operations.h @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/record_id.h" #include "mongo/db/repl/oplog_interface.h" +#include "mongo/db/repl/optime.h" #include "mongo/stdx/functional.h" namespace mongo { @@ -49,7 +50,7 @@ public: */ using RollbackOperationFn = stdx::function<Status(const BSONObj&)>; - using RollbackCommonPoint = std::pair<Timestamp, RecordId>; + using RollbackCommonPoint = std::pair<OpTime, RecordId>; /** * Initializes rollback processor with a valid local oplog. diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp index 06af9890571..80334710d57 100644 --- a/src/mongo/db/repl/roll_back_local_operations_test.cpp +++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp @@ -107,7 +107,7 @@ TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) { RollBackLocalOperations finder(localOplog, rollbackOperation); auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_EQUALS(commonOperation.first, i->first); @@ -164,7 +164,7 @@ TEST(RollBackLocalOperationsTest, SkipRemoteOperations) { } auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_EQUALS(commonOperation.first, i->first); @@ -197,7 +197,7 @@ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) { } auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_EQUALS(commonOperation.first, i->first); @@ -269,7 +269,7 @@ TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) { return Status::OK(); }); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_EQUALS(commonOperation.first, i->first); @@ -288,7 +288,7 @@ TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) { return Status::OK(); }); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); } @@ -306,7 +306,7 @@ TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) { return Status::OK(); }); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_TRUE(called); } diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 23877df274a..a2b4bc1ccf5 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -79,8 +79,8 @@ MONGO_FP_DECLARE(failInitSyncWithBufferedEntriesLeft); void truncateAndResetOplog(OperationContext* txn, ReplicationCoordinator* replCoord, BackgroundSync* bgsync) { - // Clear minvalid - setMinValid(txn, OpTime(), DurableRequirement::None); + // Add field to minvalid document to tell us to restart initial sync if we crash + setInitialSyncFlag(txn); AutoGetDb autoDb(txn, "local", MODE_X); massert(28585, "no local database found", autoDb.getDb()); @@ -341,9 +341,6 @@ Status _initialSync() { return Status(ErrorCodes::InitialSyncFailure, msg); } - // Add field to minvalid document to tell us to restart initial sync if we crash - setInitialSyncFlag(&txn); - log() << "initial sync drop all databases"; dropAllDatabasesExceptLocal(&txn); @@ -457,19 +454,10 @@ Status _initialSync() { log() << "initial sync finishing up"; - { - ScopedTransaction scopedXact(&txn, MODE_IX); - AutoGetDb autodb(&txn, "local", MODE_X); - OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastAppliedOpTime()); - log() << "set minValid=" << lastOpTimeWritten; - - // Initial sync is now complete. Flag this by setting minValid to the last thing we synced. - setMinValid(&txn, lastOpTimeWritten, DurableRequirement::None); - BackgroundSync::get()->setInitialSyncRequestedFlag(false); - } - + // Initial sync is now complete. // Clear the initial sync flag -- cannot be done under a db lock, or recursive. clearInitialSyncFlag(&txn); + BackgroundSync::get()->setInitialSyncRequestedFlag(false); // Clear maint. mode. while (replCoord->getMaintenanceMode()) { diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index d52032cb6a4..7b40e510127 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -159,7 +159,7 @@ struct FixUpInfo { set<string> collectionsToResyncData; set<string> collectionsToResyncMetadata; - Timestamp commonPoint; + OpTime commonPoint; RecordId commonPointOurDiskloc; int rbid; // remote server's current rollback sequence # @@ -391,9 +391,12 @@ void syncFixUp(OperationContext* txn, // we have items we are writing that aren't from a point-in-time. thus best not to come // online until we get to that point in freshness. + // TODO this is still wrong because we don't record that we are in rollback, and we can't really + // recover. OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid)); log() << "minvalid=" << minValid; - setMinValid(txn, {OpTime{}, minValid}); + setAppliedThrough(txn, {}); // Use top of oplog. + setMinValid(txn, minValid); // any full collection resyncs required? if (!fixUpInfo.collectionsToResyncData.empty() || @@ -497,8 +500,8 @@ void syncFixUp(OperationContext* txn, } else { OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid)); log() << "minvalid=" << minValid; - const OpTime start{fixUpInfo.commonPoint, OpTime::kUninitializedTerm}; - setMinValid(txn, {start, minValid}); + setMinValid(txn, minValid); + setAppliedThrough(txn, fixUpInfo.commonPoint); } } catch (const DBException& e) { err = "can't get/set minvalid: "; @@ -768,7 +771,7 @@ void syncFixUp(OperationContext* txn, log() << "rollback 6"; // clean up oplog - LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toStringPretty(); + LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint; { const NamespaceString oplogNss(rsOplogName); ScopedTransaction transaction(txn, MODE_IX); diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 835389c3494..3f768d79f4b 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -159,7 +159,8 @@ void RSRollbackTest::setUp() { setGlobalReplicationCoordinator(_coordinator); setOplogCollectionName(); - repl::setMinValid(_txn.get(), {OpTime{}, OpTime{}}); + repl::setAppliedThrough(_txn.get(), OpTime{}); + repl::setMinValid(_txn.get(), OpTime{}); } void RSRollbackTest::tearDown() { @@ -175,8 +176,8 @@ void RSRollbackTest::tearDown() { void noSleep(Seconds seconds) {} TEST_F(RSRollbackTest, InconsistentMinValid) { - repl::setMinValid(_txn.get(), - {OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)}); + repl::setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0)); + repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0)); auto status = syncRollback(_txn.get(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr<OplogInterface>( diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 7c929b6eae1..a336c8de68d 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -108,6 +108,11 @@ static Counter64 opsAppliedStats; // The oplog entries applied static ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats); +// Number of times we tried to go live as a secondary. +static Counter64 attemptsToBecomeSecondary; +static ServerStatusMetricField<Counter64> displayAttemptsToBecomeSecondary( + "repl.apply.attemptsToBecomeSecondary", &attemptsToBecomeSecondary); + MONGO_FP_DECLARE(rsSyncApplyStop); // Number and time of each ApplyOps worker pool round @@ -495,9 +500,7 @@ void fillWriterVectors(OperationContext* txn, // Applies a batch of oplog entries, by using a set of threads to apply the operations and then // writes the oplog entries to the local oplog. -OpTime SyncTail::multiApply(OperationContext* txn, - const OpQueue& ops, - boost::optional<BatchBoundaries> boundaries) { +OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { invariant(_applyFunc); if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { @@ -528,12 +531,8 @@ OpTime SyncTail::multiApply(OperationContext* txn, fassertFailed(28527); } - if (boundaries) { - setMinValid(txn, *boundaries); // Mark us as in the middle of a batch. - } - - applyOps(writerVectors, &_writerPool, _applyFunc, this); - + // Since we write the oplog from a single thread in-order, we don't need to use the + // oplogDeleteFromPoint. OpTime lastOpTime; { ON_BLOCK_EXIT([&] { _writerPool.join(); }); @@ -545,25 +544,28 @@ OpTime SyncTail::multiApply(OperationContext* txn, lastOpTime = writeOpsToOplog(txn, raws); } + setMinValidToAtLeast(txn, lastOpTime); // Mark us as in the middle of a batch. + + applyOps(writerVectors, &_writerPool, _applyFunc, this); + _writerPool.join(); + // Due to SERVER-24933 we can't enter inShutdown while holding the PBWM lock. invariant(!inShutdownStrict()); - if (boundaries) { - setMinValid(txn, boundaries->end, DurableRequirement::None); // Mark batch as complete. - } + setAppliedThrough(txn, lastOpTime); // Mark batch as complete. return lastOpTime; } namespace { -void tryToGoLiveAsASecondary(OperationContext* txn, - ReplicationCoordinator* replCoord, - const BatchBoundaries& minValidBoundaries, - const OpTime& lastWriteOpTime) { +void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* replCoord) { if (replCoord->isInPrimaryOrSecondaryState()) { return; } + // This needs to happen after the attempt so readers can be sure we've already tried. + ON_BLOCK_EXIT([] { attemptsToBecomeSecondary.increment(); }); + ScopedTransaction transaction(txn, MODE_S); Lock::GlobalRead readLock(txn->lockState()); @@ -578,15 +580,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn, return; } - // If an apply batch is active then we cannot transition. - if (!minValidBoundaries.start.isNull()) { - return; - } - - // Must have applied/written to minvalid, so return if not. - // -- If 'lastWriteOpTime' is null/uninitialized then we can't transition. - // -- If 'lastWriteOpTime' is less than the end of the last batch then we can't transition. - if (lastWriteOpTime.isNull() || minValidBoundaries.end > lastWriteOpTime) { + // We can't go to SECONDARY until we reach minvalid. + if (replCoord->getMyLastAppliedOpTime() < getMinValid(txn)) { return; } @@ -693,9 +688,6 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) { ? new ApplyBatchFinalizerForJournal(replCoord) : new ApplyBatchFinalizer(replCoord)}; - auto minValidBoundaries = getMinValid(&txn); - OpTime originalEndOpTime(minValidBoundaries.end); - OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()}; while (!inShutdown()) { OpQueue ops; @@ -705,7 +697,7 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) { return; } - tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime); + tryToGoLiveAsASecondary(&txn, replCoord); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become // ready in time, we'll loop again so we can do the above checks periodically. @@ -717,9 +709,7 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) { invariant(!ops.empty()); - const BSONObj lastOp = ops.back().raw; - - if (lastOp.isEmpty()) { + if (ops.front().raw.isEmpty()) { // This means that the network thread has coalesced and we have processed all of its // data. invariant(ops.getDeque().size() == 1); @@ -727,63 +717,40 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) { replCoord->signalDrainComplete(&txn); } - // Reset some values when triggered in case it was from a rollback. - minValidBoundaries = getMinValid(&txn); - lastWriteOpTime = replCoord->getMyLastAppliedOpTime(); - originalEndOpTime = minValidBoundaries.end; - continue; // This wasn't a real op. Don't try to apply it. } - const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)); - // TODO: Make ">=" once SERVER-21988 is fixed. - if (lastWriteOpTime > lastOpTime) { - // Error for the oplog to go back in time. + // Extract some info from ops that we'll need after releasing the batch below. + const auto firstOpTimeInBatch = + fassertStatusOK(40299, OpTime::parseFromOplogEntry(ops.front().raw)); + const auto lastOpTimeInBatch = + fassertStatusOK(28773, OpTime::parseFromOplogEntry(ops.back().raw)); + + // Make sure the oplog doesn't go back in time or repeat an entry. + if (firstOpTimeInBatch <= replCoord->getMyLastAppliedOpTime()) { fassert(34361, Status(ErrorCodes::OplogOutOfOrder, - str::stream() << "Attempted to apply an earlier oplog entry (ts: " - << lastOpTime.getTimestamp().toStringPretty() - << ") when our lastWrittenOptime was " - << lastWriteOpTime.toString())); + str::stream() << "Attempted to apply an oplog entry (" + << firstOpTimeInBatch.toString() + << ") which is not greater than our last applied OpTime (" + << replCoord->getMyLastAppliedOpTime().toString() + << ").")); } - // Set minValid to the last OpTime that needs to be applied, in this batch or from the - // (last) failed batch, whichever is larger. - // This will cause this node to go into RECOVERING state - // if we should crash and restart before updating finishing. - minValidBoundaries.start = OpTime(getLastSetTimestamp(), OpTime::kUninitializedTerm); - - - // Take the max of the first endOptime (if we recovered) and the end of our batch. - - // Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch) - // ensures that we keep pushing out the point where we can become consistent - // and allow reads. If we recover and end up doing smaller batches we must pass the - // originalEndOpTime before we are good. - // - // For example: - // batch apply, 20-40, end = 40 - // batch failure, - // restart - // batch apply, 20-25, end = max(25, 40) = 40 - // batch apply, 25-45, end = 45 - minValidBoundaries.end = std::max(originalEndOpTime, lastOpTime); - - - lastWriteOpTime = multiApply(&txn, ops, minValidBoundaries); - if (lastWriteOpTime.isNull()) { + const bool fail = multiApply(&txn, ops).isNull(); + if (fail) { // fassert if oplog application failed for any reasons other than shutdown. error() << "Failed to apply " << ops.getDeque().size() - << " operations - batch start:" << minValidBoundaries.start - << " end:" << minValidBoundaries.end; + << " operations - batch start:" << firstOpTimeInBatch + << " end:" << lastOpTimeInBatch; fassert(34360, inShutdownStrict()); // Return without setting minvalid in the case of shutdown. return; } - setNewTimestamp(lastWriteOpTime.getTimestamp()); - minValidBoundaries.start = {}; - finalizer->record(lastWriteOpTime); + // Update various things that care about our last applied optime. + setNewTimestamp(lastOpTimeInBatch.getTimestamp()); + finalizer->record(lastOpTimeInBatch); } } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 539a00d933d..313a16bf48b 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -143,6 +143,11 @@ public: return _deque.back(); } + const OplogEntry& front() const { + invariant(!_deque.empty()); + return _deque.front(); + } + private: std::deque<OplogEntry> _deque; size_t _size; @@ -194,9 +199,7 @@ protected: // Apply a batch of operations, using multiple threads. // If boundries is supplied, will update minValid document at begin and end of batch. // Returns the last OpTime applied during the apply batch, ops.end["ts"] basically. - OpTime multiApply(OperationContext* txn, - const OpQueue& ops, - boost::optional<BatchBoundaries> boundaries = {}); + OpTime multiApply(OperationContext* txn, const OpQueue& ops); private: class OpQueueBatcher; diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js index b1f2a94a735..0d1e225a990 100644 --- a/src/mongo/shell/assert.js +++ b/src/mongo/shell/assert.js @@ -3,6 +3,9 @@ doassert = function(msg, obj) { if (typeof(msg) == "function") msg = msg(); + if (typeof(msg) == "object") + msg = tojson(msg); + if (typeof(msg) == "string" && msg.indexOf("assert") == 0) print(msg); else diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index 8e6d5a05525..b9776fd0de1 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -917,6 +917,12 @@ var ReplSetTest = function(opts) { printjson(this.nodes); + // Clean up after noReplSet to ensure it doesn't effect future restarts. + if (options.noReplSet) { + this.nodes[n].fullOptions.replSet = defaults.replSet; + delete this.nodes[n].fullOptions.noReplSet; + } + wait = wait || false; if (!wait.toFixed) { if (wait) |