diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-12-15 18:07:51 -0500 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2017-01-23 16:18:49 -0500 |
commit | f0cf61dbcdc569e1b3642097787068c7b1139273 (patch) | |
tree | 9848d2f4dd9120b4f8e4f354ab4b447b88e4b8b8 | |
parent | acf33ea3cf2b9443246cbc67fae104636c7e7790 (diff) | |
download | mongo-f0cf61dbcdc569e1b3642097787068c7b1139273.tar.gz |
SERVER-27123 Only update the commit point as a secondary from oplog queries against your sync source
(cherry picked from commit 87f49488f1b5c872daa71fd2fd9b5d744409a817)
22 files changed, 421 insertions, 112 deletions
diff --git a/jstests/libs/write_concern_util.js b/jstests/libs/write_concern_util.js index c87ec667591..d7541b3b9a6 100644 --- a/jstests/libs/write_concern_util.js +++ b/jstests/libs/write_concern_util.js @@ -17,18 +17,23 @@ function shardCollectionWithChunks(st, coll) { assert.eq(coll.count(), numberDoc); } -// Stops replication at a server. +// Stops replication on the given server(s). function stopServerReplication(conn) { - var errMsg = 'Failed to enable rsSyncApplyStop failpoint.'; + if (conn.length) { + conn.forEach(function(n) { + stopServerReplication(n); + }); + return; + } + var errMsg = 'Failed to enable stopOplogFetcher failpoint.'; assert.commandWorked( - conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}), + conn.getDB('admin').runCommand({configureFailPoint: 'stopOplogFetcher', mode: 'alwaysOn'}), errMsg); } // Stops replication at all replicaset secondaries. function stopReplicationOnSecondaries(rs) { - var secondaries = rs.getSecondaries(); - secondaries.forEach(stopServerReplication); + stopServerReplication(rs.getSecondaries()); } // Stops replication at all shard secondaries. @@ -36,23 +41,29 @@ function stopReplicationOnSecondariesOfAllShards(st) { st._rsObjects.forEach(stopReplicationOnSecondaries); } -// Restarts replication at a server. +// Restarts replication on the given server(s). function restartServerReplication(conn) { - var errMsg = 'Failed to disable rsSyncApplyStop failpoint.'; + if (conn.length) { + conn.forEach(function(n) { + restartServerReplication(n); + }); + return; + } + + var errMsg = 'Failed to disable stopOplogFetcher failpoint.'; assert.commandWorked( - conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}), + conn.getDB('admin').runCommand({configureFailPoint: 'stopOplogFetcher', mode: 'off'}), errMsg); } // Restarts replication at all nodes in a replicaset. function restartReplSetReplication(rs) { - rs.nodes.forEach(restartServerReplication); + restartServerReplication(rs.nodes); } // Restarts replication at all replicaset secondaries. function restartReplicationOnSecondaries(rs) { - var secondaries = rs.getSecondaries(); - secondaries.forEach(restartServerReplication); + restartServerReplication(rs.getSecondaries()); } // Restarts replication at all nodes in a sharded cluster. diff --git a/jstests/replsets/double_rollback.js b/jstests/replsets/double_rollback.js index 42aa40fae6a..342775c2545 100644 --- a/jstests/replsets/double_rollback.js +++ b/jstests/replsets/double_rollback.js @@ -12,7 +12,9 @@ (function() { 'use strict'; + load("jstests/libs/check_log.js"); + load("jstests/replsets/rslib.js"); var name = "double_rollback"; var dbName = "test"; @@ -32,14 +34,6 @@ var nodes = rst.startSet(); rst.initiate(); - function waitForState(node, state) { - assert.soonNoExcept(function() { - assert.commandWorked(node.adminCommand( - {replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS})); - return true; - }); - } - function stepUp(rst, node) { var primary = rst.getPrimary(); if (primary != node) { diff --git a/jstests/replsets/read_committed_after_rollback.js b/jstests/replsets/read_committed_after_rollback.js index e434483f99b..033a83ad2d9 100644 --- a/jstests/replsets/read_committed_after_rollback.js +++ b/jstests/replsets/read_committed_after_rollback.js @@ -143,10 +143,10 @@ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. // now be visible as a committed read to both oldPrimary and newPrimary. assert.commandWorked( pureSecondary.adminCommand({configureFailPoint: "rsSyncApplyStop", mode: "off"})); - var gleResponse = - newPrimaryColl.runCommand({getLastError: 1, w: 'majority', wtimeout: 5 * 1000 * 60}); - assert.commandWorked(gleResponse); - assert.eq(null, gleResponse.err, "GLE detected write error: " + tojson(gleResponse)); + // Do a write to the new primary so that the old primary can establish a sync source to learn + // about the new commit. + assert.writeOK(newPrimary.getDB(name).unrelatedCollection.insert( + {a: 1}, {writeConcern: {w: 'majority', wtimeout: replTest.kDefaultTimeoutMS}})); assert.eq(doCommittedRead(newPrimaryColl), 'new'); assert.eq(doCommittedRead(oldPrimaryColl), 'new'); }()); diff --git a/jstests/replsets/read_committed_stale_history.js b/jstests/replsets/read_committed_stale_history.js new file mode 100644 index 00000000000..9793a47576e --- /dev/null +++ b/jstests/replsets/read_committed_stale_history.js @@ -0,0 +1,148 @@ +/* + * Tests that a node on a stale branch of history won't incorrectly mark its ops as committed even + * when hearing about a commit point with a higher optime from a new primary. + */ +(function() { + 'use strict'; + + load("jstests/libs/check_log.js"); + load("jstests/libs/write_concern_util.js"); + load("jstests/replsets/rslib.js"); + + var name = "readCommittedStaleHistory"; + var dbName = "wMajorityCheck"; + var collName = "stepdown"; + + var rst = new ReplSetTest({ + name: name, + nodes: [ + {}, + {}, + {rsConfig: {priority: 0}}, + ], + nodeOptions: {enableMajorityReadConcern: ""}, + useBridge: true + }); + + if (!startSetIfSupportsReadMajority(rst)) { + jsTest.log("skipping test since storage engine doesn't support committed reads"); + return; + } + + var nodes = rst.nodes; + rst.initiate(); + + /** + * Waits for the given node to be in state primary *and* have finished drain mode and thus + * be available for writes. + */ + function waitForPrimary(node) { + assert.soon(function() { + return node.adminCommand('ismaster').ismaster; + }); + } + + function stepUp(node) { + var primary = rst.getPrimary(); + if (primary != node) { + assert.throws(function() { + primary.adminCommand({replSetStepDown: 60 * 5}); + }); + } + waitForPrimary(node); + } + + // Asserts that the given document is not visible in the committed snapshot on the given node. + function checkDocNotCommitted(node, doc) { + var docs = + node.getDB(dbName).getCollection(collName).find(doc).readConcern('majority').toArray(); + assert.eq(0, docs.length, tojson(docs)); + } + + jsTestLog("Make sure node 0 is primary."); + rst.getPrimary(); + stepUp(nodes[0]); + var primary = rst.getPrimary(); + var secondaries = rst.getSecondaries(); + assert.eq(nodes[0], primary); + // Wait for all data bearing nodes to get up to date. + assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert( + {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}})); + + // Stop the secondaries from replicating. + stopServerReplication(secondaries); + // Stop the primary from being able to complete stepping down. + assert.commandWorked( + nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'})); + + jsTestLog("Do a write that won't ever reach a majority of nodes"); + assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert({a: 2})); + + // Ensure that the write that was just done is not visible in the committed snapshot. + checkDocNotCommitted(nodes[0], {a: 2}); + + // Prevent the primary from rolling back later on. + assert.commandWorked( + nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'alwaysOn'})); + + jsTest.log("Disconnect primary from all secondaries"); + nodes[0].disconnect(nodes[1]); + nodes[0].disconnect(nodes[2]); + + jsTest.log("Wait for a new primary to be elected"); + // Allow the secondaries to replicate again. + restartServerReplication(secondaries); + + waitForPrimary(nodes[1]); + + jsTest.log("Do a write to the new primary"); + assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert( + {a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}})); + + jsTest.log("Reconnect the old primary to the rest of the nodes"); + nodes[1].reconnect(nodes[0]); + nodes[2].reconnect(nodes[0]); + + // Sleep 10 seconds to allow some heartbeats to be processed, so we can verify that the + // heartbeats don't cause the stale primary to incorrectly advance the commit point. + sleep(10000); + + checkDocNotCommitted(nodes[0], {a: 2}); + + jsTest.log("Allow the old primary to finish stepping down and become secondary"); + var res = null; + try { + res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'}); + } catch (e) { + // Expected - once we disable the fail point the stepdown will proceed and it's racy whether + // the stepdown closes all connections before or after the configureFailPoint command + // returns + } + if (res) { + assert.commandWorked(res); + } + rst.waitForState(nodes[0], ReplSetTest.State.SECONDARY); + + // At this point the former primary will attempt to go into rollback, but the + // 'rollbackHangBeforeStart' will prevent it from doing so. + checkDocNotCommitted(nodes[0], {a: 2}); + checkLog.contains(nodes[0], 'rollback - rollbackHangBeforeStart fail point enabled'); + checkDocNotCommitted(nodes[0], {a: 2}); + + jsTest.log("Allow the original primary to roll back its write and catch up to the new primary"); + assert.commandWorked( + nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'off'})); + + assert.soonNoExcept(function() { + return null == nodes[0].getDB(dbName).getCollection(collName).findOne({a: 2}); + }, "Original primary never rolled back its write"); + + rst.awaitReplication(); + + // Ensure that the old primary got the write that the new primary did and sees it as committed. + assert.neq( + null, + nodes[0].getDB(dbName).getCollection(collName).find({a: 3}).readConcern('majority').next()); + + rst.stopSet(); +}()); diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js index beebf99159a..03ce30de2ab 100644 --- a/jstests/replsets/rslib.js +++ b/jstests/replsets/rslib.js @@ -7,6 +7,7 @@ var reconfig; var awaitOpTime; var startSetIfSupportsReadMajority; var waitUntilAllNodesCaughtUp; +var waitForState; var reInitiateWithoutThrowingOnAbortedMember; var awaitRSClientHosts; var getLastOpTime; @@ -212,6 +213,17 @@ var getLastOpTime; }; /** + * Waits for the given node to reach the given state, ignoring network errors. + */ + waitForState = function(node, state) { + assert.soonNoExcept(function() { + assert.commandWorked(node.adminCommand( + {replSetTest: 1, waitForMemberState: state, timeoutMillis: 60 * 1000 * 5})); + return true; + }); + }; + + /** * Starts each node in the given replica set if the storage engine supports readConcern *'majority'. * Returns true if the replica set was started successfully and false otherwise. diff --git a/jstests/replsets/write_concern_after_stepdown.js b/jstests/replsets/write_concern_after_stepdown.js index c8493ea4fb6..95a08a005e0 100644 --- a/jstests/replsets/write_concern_after_stepdown.js +++ b/jstests/replsets/write_concern_after_stepdown.js @@ -5,6 +5,9 @@ (function() { 'use strict'; + load("jstests/replsets/rslib.js"); + load("jstests/libs/write_concern_util.js"); + var name = "writeConcernStepDownAndBackUp"; var dbName = "wMajorityCheck"; var collName = "stepdownAndBackUp"; @@ -21,14 +24,6 @@ var nodes = rst.startSet(); rst.initiate(); - function waitForState(node, state) { - assert.soonNoExcept(function() { - assert.commandWorked(node.adminCommand( - {replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS})); - return true; - }); - } - function waitForPrimary(node) { assert.soon(function() { return node.adminCommand('ismaster').ismaster; @@ -55,10 +50,7 @@ {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}})); // Stop the secondaries from replicating. - secondaries.forEach(function(node) { - assert.commandWorked( - node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'})); - }); + stopServerReplication(secondaries); // Stop the primary from being able to complete stepping down. assert.commandWorked( nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'})); @@ -79,10 +71,7 @@ jsTest.log("Wait for a new primary to be elected"); // Allow the secondaries to replicate again. - secondaries.forEach(function(node) { - assert.commandWorked( - node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'})); - }); + restartServerReplication(secondaries); waitForPrimary(nodes[1]); diff --git a/jstests/replsets/write_concern_after_stepdown_and_stepup.js b/jstests/replsets/write_concern_after_stepdown_and_stepup.js index a5a0f0b6d64..e2667d4f66a 100644 --- a/jstests/replsets/write_concern_after_stepdown_and_stepup.js +++ b/jstests/replsets/write_concern_after_stepdown_and_stepup.js @@ -6,6 +6,9 @@ (function() { 'use strict'; + load("jstests/replsets/rslib.js"); + load("jstests/libs/write_concern_util.js"); + var name = "writeConcernStepDownAndBackUp"; var dbName = "wMajorityCheck"; var collName = "stepdownAndBackUp"; @@ -22,14 +25,6 @@ var nodes = rst.startSet(); rst.initiate(); - function waitForState(node, state) { - assert.soonNoExcept(function() { - assert.commandWorked(node.adminCommand( - {replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS})); - return true; - }); - } - function waitForPrimary(node) { assert.soon(function() { return node.adminCommand('ismaster').ismaster; @@ -56,10 +51,7 @@ {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}})); // Stop the secondaries from replicating. - secondaries.forEach(function(node) { - assert.commandWorked( - node.adminCommand({configureFailPoint: 'stopOplogFetcher', mode: 'alwaysOn'})); - }); + stopServerReplication(secondaries); // Stop the primary from calling into awaitReplication() assert.commandWorked(nodes[0].adminCommand( {configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'alwaysOn'})); @@ -86,10 +78,7 @@ jsTest.log("Wait for a new primary to be elected"); // Allow the secondaries to replicate again. - secondaries.forEach(function(node) { - assert.commandWorked( - node.adminCommand({configureFailPoint: 'stopOplogFetcher', mode: 'off'})); - }); + restartServerReplication(secondaries); waitForPrimary(nodes[1]); diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 607e29e5aa7..7a5a088aae5 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -110,6 +110,9 @@ size_t getSize(const BSONObj& o) { MONGO_FP_DECLARE(pauseRsBgSyncProducer); +// Failpoint which causes rollback to hang before starting. +MONGO_FP_DECLARE(rollbackHangBeforeStart); + // The count of items in the buffer static Counter64 bufferCountGauge; static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count", @@ -626,6 +629,15 @@ void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection) { + if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) { + // This log output is used in js tests so please leave it. + log() << "rollback - rollbackHangBeforeStart fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) { + mongo::sleepsecs(1); + } + } + // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 418d1de7455..238f4717aa6 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -61,7 +61,7 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp } void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& metadata) { - _replicationCoordinator->processReplSetMetadata(metadata); + _replicationCoordinator->processReplSetMetadata(metadata, true /*advance the commit point*/); if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { _replicationCoordinator->cancelAndRescheduleElectionTimeout(); } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 8e6c521c71d..45ec02b0ad2 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -55,6 +55,7 @@ OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOp void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata& metadata) { metadataProcessed = metadata; + metadataWasProcessed = true; } bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source, diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 45b755d525a..423e8f401dc 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -72,6 +72,7 @@ public: // Set by processMetadata. rpc::ReplSetMetadata metadataProcessed; + bool metadataWasProcessed = false; // Set by shouldStopFetching. HostAndPort lastSyncSourceChecked; diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index f5ddf7e3923..0bba80369cd 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -433,24 +433,6 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, } const auto& queryResponse = result.getValue(); - rpc::ReplSetMetadata metadata; - - // Forward metadata (containing liveness information) to data replicator external state. - bool receivedMetadata = - queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); - if (receivedMetadata) { - const auto& metadataObj = queryResponse.otherFields.metadata; - auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); - if (!metadataResult.isOK()) { - error() << "invalid replication metadata from sync source " << _fetcher->getSource() - << ": " << metadataResult.getStatus() << ": " << metadataObj; - _finishCallback(metadataResult.getStatus()); - return; - } - metadata = metadataResult.getValue(); - _dataReplicatorExternalState->processMetadata(metadata); - } - const auto& documents = queryResponse.documents; auto firstDocToApply = documents.cbegin(); @@ -487,6 +469,25 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, } auto info = validateResult.getValue(); + // Process replset metadata. It is important that this happen after we've validated the + // first batch, so we don't progress our knowledge of the commit point from a + // response that triggers a rollback. + rpc::ReplSetMetadata metadata; + bool receivedMetadata = + queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); + if (receivedMetadata) { + const auto& metadataObj = queryResponse.otherFields.metadata; + auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); + if (!metadataResult.isOK()) { + error() << "invalid replication metadata from sync source " << _fetcher->getSource() + << ": " << metadataResult.getStatus() << ": " << metadataObj; + _finishCallback(metadataResult.getStatus()); + return; + } + metadata = metadataResult.getValue(); + _dataReplicatorExternalState->processMetadata(metadata); + } + // Increment stats. We read all of the docs in the query. opsReadStats.increment(info.networkDocumentCount); networkByteStats.increment(info.networkDocumentBytes); diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index f3ea0760d35..1c77b47bd3e 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -552,7 +552,7 @@ BSONObj makeCursorResponse(CursorId cursorId, TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) { auto shutdownState = processSingleBatch( - {makeCursorResponse(0, {}), + {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)), Milliseconds(0)}); @@ -564,12 +564,29 @@ TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadat BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj, Milliseconds(0)}); + ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); + ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed); ASSERT_EQUALS(metadata.getPrimaryIndex(), dataReplicatorExternalState->metadataProcessed.getPrimaryIndex()); } +TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { + rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2); + BSONObjBuilder bob; + ASSERT_OK(metadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, + processSingleBatch( + {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); +} + TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) { ASSERT_EQUALS(ErrorCodes::RemoteOplogStale, processSingleBatch(makeCursorResponse(0, {}))->getStatus()); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 66fabbc11a0..c33582a0aba 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -479,14 +479,13 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result) = 0; /** - * Processes the ReplSetMetadata returned from a command run against another replica set - * member and updates protocol version 1 information (most recent optime that is committed, - * member id of the current PRIMARY, the current config version and the current term). - * - * TODO(dannenberg): Move this method to be testing only if it does not end up being used - * to process the find and getmore metadata responses from the DataReplicator. + * Processes the ReplSetMetadata returned from a command run against another + * replica set member and so long as the config version in the metadata matches the replica set + * config version this node currently has, updates the current term and optionally updates + * this node's notion of the commit point. */ - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) = 0; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) = 0; /** * Elections under protocol version 1 are triggered by a timer. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index b7cc6b9813c..7e1b66afb87 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2145,12 +2145,13 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) result->append("config", _rsConfig.toBSON()); } -void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) { +void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) { EventHandle evh; { LockGuard topoLock(_topoMutex); - evh = _processReplSetMetadata_incallback(replMetadata); + evh = _processReplSetMetadata_incallback(replMetadata, advanceCommitPoint); } if (evh) { @@ -2164,11 +2165,13 @@ void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() { } EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback( - const rpc::ReplSetMetadata& replMetadata) { + const rpc::ReplSetMetadata& replMetadata, bool advanceCommitPoint) { if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { return EventHandle(); } - _setLastCommittedOpTime(replMetadata.getLastOpCommitted()); + if (advanceCommitPoint) { + _setLastCommittedOpTime(replMetadata.getLastOpCommitted()); + } return _updateTerm_incallback(replMetadata.getTerm()); } @@ -3157,7 +3160,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& curre } void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { - if (!_getMemberState_inlock().primary()) { + if (!_getMemberState_inlock().primary() || _stepDownPending) { return; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 35ad76af702..0478624183f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -206,7 +206,8 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result) override; - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) override; virtual void cancelAndRescheduleElectionTimeout() override; @@ -983,11 +984,13 @@ private: /** * Callback that processes the ReplSetMetadata returned from a command run against another - * replica set member and updates protocol version 1 information (most recent optime that is - * committed, member id of the current PRIMARY, the current config version and the current term) + * replica set member and so long as the config version in the metadata matches the replica set + * config version this node currently has, updates the current term and optionally updates + * this node's notion of the commit point. * Returns the finish event which is invalid if the process has already finished. */ - EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata); + EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint); /** * Blesses a snapshot to be used for new committed reads. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 0c2df9d090a..6c344de33cb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -164,7 +164,9 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( if (replMetadata.isOK()) { // Asynchronous stepdown could happen, but it will wait for _topoMutex and execute // after this function, so we cannot and don't need to wait for it to finish. - _processReplSetMetadata_incallback(replMetadata.getValue()); + // Arbiters are the only nodes allowed to advance their commit point via heartbeats. + bool advanceCommitPoint = getMemberState().arbiter(); + _processReplSetMetadata_incallback(replMetadata.getValue(), advanceCommitPoint); } } const Date_t now = _replExecutor.now(); @@ -176,10 +178,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( networkTime = cbData.response.elapsedMillis.value_or(Milliseconds{0}); // TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this // and update tests. - _updateTerm_incallback(hbStatusResponse.getValue().getTerm()); - // Postpone election timeout if we have a successful heartbeat response from the primary. const auto& hbResponse = hbStatusResponse.getValue(); - if (hbResponse.hasState() && hbResponse.getState().primary()) { + _updateTerm_incallback(hbResponse.getTerm()); + // Postpone election timeout if we have a successful heartbeat response from the primary. + if (hbResponse.hasState() && hbResponse.getState().primary() && + hbResponse.getTerm() == _topCoord->getTerm()) { cancelAndRescheduleElectionTimeout(); } } else { diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index f9d12233c3f..0ebaa43cc77 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -369,7 +369,7 @@ TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeatMetadata) { << "syncSourceIndex" << 1))); ASSERT_OK(metadata.getStatus()); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQ(getReplCoord()->getMyLastAppliedOpTime().getTimestamp(), expected.getTimestamp()); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 69816fe9a67..757a422bf70 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -3836,7 +3836,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // higher configVersion @@ -3852,7 +3852,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); } @@ -3901,7 +3901,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet << 1 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime()); @@ -3918,7 +3918,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet << 1 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); } @@ -3963,7 +3963,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); @@ -3981,7 +3981,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); @@ -3999,14 +3999,14 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata3.getValue()); + getReplCoord()->processReplSetMetadata(metadata3.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); } TEST_F(ReplCoordTest, - TermAndLastCommittedOpTimeUpdateWhenHeartbeatResponseWithMetadataHasFresherValues) { + LastCommittedOpTimeNotUpdatedEvenWhenHeartbeatResponseWithMetadataHasFresherValues) { // Ensure that the metadata is processed if it is contained in a heartbeat response. assertStartSuccess(BSON("_id" << "mySet" @@ -4032,7 +4032,75 @@ TEST_F(ReplCoordTest, auto replCoord = getReplCoord(); auto config = replCoord->getConfig(); - // Higher term - should update term and lastCommittedOpTime. + // Higher term - should update term but not last committed optime. + StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( + rpc::kReplSetMetadataFieldName + << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 3) + << "configVersion" + << config.getConfigVersion() + << "primaryIndex" + << 1 + << "term" + << 3 + << "syncSourceIndex" + << 1))); + BSONObjBuilder metadataBuilder; + ASSERT_OK(metadata.getValue().writeToMetadata(&metadataBuilder)); + auto metadataObj = metadataBuilder.obj(); + + auto net = getNet(); + net->enterNetwork(); + + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + const auto& request = noi->getRequest(); + ASSERT_EQUALS(HostAndPort("node2", 12345), request.target); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + ReplSetHeartbeatResponse hbResp; + hbResp.setConfigVersion(config.getConfigVersion()); + hbResp.setSetName(config.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true), metadataObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(3, getReplCoord()->getTerm()); + ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); +} + +TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) { + // Ensure that the metadata is processed if it is contained in a heartbeat response. + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" + << 0 + << "arbiterOnly" + << true) + << BSON("host" + << "node2:12345" + << "_id" + << 1)) + << "protocolVersion" + << 1), + HostAndPort("node1", 12345)); + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + auto txn = makeOperationContext(); + getReplCoord()->updateTerm(txn.get(), 1); + ASSERT_EQUALS(1, getReplCoord()->getTerm()); + + auto replCoord = getReplCoord(); + auto config = replCoord->getConfig(); + + // Higher term - should update term and lastCommittedOpTime since arbiters learn of the + // commit point via heartbeats. StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( rpc::kReplSetMetadataFieldName << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" @@ -4273,7 +4341,7 @@ TEST_F(ReplCoordTest, } TEST_F(ReplCoordTest, - CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimary) { + RescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInSameTerm) { assertStartSuccess(BSON("_id" << "mySet" << "protocolVersion" @@ -4311,6 +4379,8 @@ TEST_F(ReplCoordTest, ReplSetHeartbeatResponse hbResp; hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setTerm(replCoord->getTerm()); + // Heartbeat response is scheduled with a delay so that we can be sure that // the election was rescheduled due to the heartbeat response. auto heartbeatWhen = net->now() + Seconds(1); @@ -4325,6 +4395,60 @@ TEST_F(ReplCoordTest, } TEST_F(ReplCoordTest, + DontRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInDiffertTerm) { + assertStartSuccess(BSON("_id" + << "mySet" + << "protocolVersion" + << 1 + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" + << 0) + << BSON("host" + << "node2:12345" + << "_id" + << 1))), + HostAndPort("node1", 12345)); + + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + + auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest(); + ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen); + + auto net = getNet(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + auto&& request = noi->getRequest(); + log() << "processing " << request.cmdObj; + ASSERT_EQUALS(HostAndPort("node2", 12345), request.target); + + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + // Respond to node1's heartbeat command to indicate that node2 is PRIMARY. + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setTerm(replCoord->getTerm() - 1); + + // Heartbeat response is scheduled with a delay so that we can be sure that + // the election was rescheduled due to the heartbeat response. + auto heartbeatWhen = net->now() + Seconds(1); + net->scheduleResponse(noi, heartbeatWhen, makeResponseStatus(hbResp.toBSON(true))); + net->runUntil(heartbeatWhen); + ASSERT_EQUALS(heartbeatWhen, net->now()); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_GREATER_THAN(heartbeatWhen + replCoord->getConfig().getElectionTimeoutPeriod(), + replCoord->getElectionTimeout_forTest()); +} + +TEST_F(ReplCoordTest, CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseWithoutState) { assertStartSuccess(BSON("_id" << "mySet" @@ -4679,7 +4803,7 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { // Set last committed optime via metadata. rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); - getReplCoord()->processReplSetMetadata(syncSourceMetadata); + getReplCoord()->processReplSetMetadata(syncSourceMetadata, true); getReplCoord()->onSnapshotCreate(optime, SnapshotName(1)); BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand( diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index d6e4679e78c..ee2c819bc4d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -249,7 +249,8 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result) // TODO } -void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {} +void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) {} void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {} diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index d21c3c8e40d..67d6a555274 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -155,7 +155,8 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result); - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata); + void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) override; virtual void cancelAndRescheduleElectionTimeout() override; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 528ce49dbea..5c582792127 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -640,7 +640,7 @@ public: // New style update position command has metadata, which may inform the // upstream of a higher term. auto metadata = metadataResult.getValue(); - replCoord->processReplSetMetadata(metadata); + replCoord->processReplSetMetadata(metadata, false /*don't advance the commit point*/); } // In the case of an update from a member with an invalid replica set config, |