diff options
-rw-r--r-- | jstests/noPassthrough/min_valid_on_pv_change.js | 72 | ||||
-rw-r--r-- | jstests/replsets/rslib.js | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_consistency_markers_impl.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_consistency_markers_impl_test.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 7 |
11 files changed, 252 insertions, 31 deletions
diff --git a/jstests/noPassthrough/min_valid_on_pv_change.js b/jstests/noPassthrough/min_valid_on_pv_change.js new file mode 100644 index 00000000000..e89108b9cb5 --- /dev/null +++ b/jstests/noPassthrough/min_valid_on_pv_change.js @@ -0,0 +1,72 @@ +// Test minValid is always set to existent OpTime on PV upgrade and downgrade, +// so that sync source resolver can find a sync source that has the minValid. + +(function() { + "use strict"; + + load("jstests/replsets/rslib.js"); + + var testName = "min_valid_on_pv_change"; + var replTest = new ReplSetTest({name: testName, nodes: 3}); + var nodes = replTest.nodeList(); + + replTest.startSet(); + var config = { + "_id": testName, + "members": [ + {"_id": 0, "host": nodes[0]}, + {"_id": 1, "host": nodes[1], priority: 0}, + {"_id": 2, "host": nodes[2], priority: 0}, + ] + }; + + // Set verbosity for replication on all nodes. + setLogVerbosity(replTest.nodes, {"replication": {"verbosity": 3}, "query": {"verbosity": 3}}); + + replTest.initiate(config); + var primary = replTest.getPrimary(); + var secondary = replTest.getSecondary(); + var coll = primary.getCollection("test.foo"); + + function checkMinValidExistsOnPrimary(conn, expectedTerm) { + var minValid = conn.getDB("local").replset.minvalid.findOne(); + assert.eq(minValid.t, expectedTerm, "got unexpected minValid " + tojson(minValid)); + var lastOpTime = getLastOpTime(primary); + if (expectedTerm == -1) { + assert.eq(minValid.ts, lastOpTime, "got unexpected minValid " + tojson(minValid)); + } else { + assert.docEq({t: minValid.t, ts: minValid.ts}, lastOpTime); + } + } + + assert.writeOK( + coll.insert({a: 1}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS})); + // Verify the minValid is written in PV1 so its term is 1. + checkMinValidExistsOnPrimary(secondary, 1); + + jsTestLog("Downgrading to pv0"); + var conf = replTest.getReplSetConfigFromNode(0); + conf.protocolVersion = 0; + conf.version++; + reconfig(replTest, conf); + assert.writeOK( + coll.insert({a: 2}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS})); + // Verify the minValid is written in PV0 so its term is -1. + checkMinValidExistsOnPrimary(secondary, -1); + + jsTestLog("Upgrading to pv1"); + conf.protocolVersion = 1; + conf.version++; + reconfig(replTest, conf); + assert.writeOK( + coll.insert({a: 3}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS})); + // Verify the minValid is written in PV1 after PV upgrade so its term is 0. + checkMinValidExistsOnPrimary(secondary, 0); + + jsTestLog("Restarting the secondary"); + replTest.restart(replTest.getSecondary()); + // Verify the secondary can find the sync source successfully and replicate the doc. + assert.writeOK( + coll.insert({a: 4}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS})); + +})(); diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js index ecaf21e5d46..1cb338ceeae 100644 --- a/jstests/replsets/rslib.js +++ b/jstests/replsets/rslib.js @@ -13,6 +13,7 @@ var waitForState; var reInitiateWithoutThrowingOnAbortedMember; var awaitRSClientHosts; var getLastOpTime; +var setLogVerbosity; (function() { "use strict"; @@ -433,4 +434,19 @@ var getLastOpTime; var connStatus = replSetStatus.members.filter(m => m.self)[0]; return connStatus.optime; }; + + /** + * Set log verbosity on all given nodes. + * e.g. setLogVerbosity(replTest.nodes, { "replication": {"verbosity": 3} }); + */ + setLogVerbosity = function(nodes, logVerbosity) { + var verbosity = { + "setParameter": 1, + "logComponentVerbosity": logVerbosity, + }; + nodes.forEach(function(node) { + assert.commandWorked(node.adminCommand(verbosity)); + }); + }; + }()); diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index 27cc61c0271..3896d3a13e2 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -31,6 +31,7 @@ #include "mongo/db/repl/replication_consistency_markers_impl.h" +#include "mongo/db/bson/bson_helper.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -94,31 +95,33 @@ boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinVali void ReplicationConsistencyMarkersImpl::_updateMinValidDocument(OperationContext* opCtx, const BSONObj& updateSpec) { Status status = _storageInterface->putSingleton(opCtx, _minValidNss, updateSpec); + invariantOK(status); +} + +void ReplicationConsistencyMarkersImpl::initializeMinValidDocument(OperationContext* opCtx) { + LOG(3) << "Initializing minValid document"; + + // This initializes the values of the required fields if they are not already set. + // If one of the fields is already set, the $max will prefer the existing value since it + // will always be greater than the provided ones. + auto spec = BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName + << Timestamp() + << MinValidDocument::kMinValidTermFieldName + << OpTime::kUninitializedTerm)); + + Status status = _storageInterface->putSingleton(opCtx, _minValidNss, spec); // If the collection doesn't exist, create it and try again. if (status == ErrorCodes::NamespaceNotFound) { status = _storageInterface->createCollection(opCtx, _minValidNss, CollectionOptions()); fassertStatusOK(40509, status); - status = _storageInterface->putSingleton(opCtx, _minValidNss, updateSpec); + status = _storageInterface->putSingleton(opCtx, _minValidNss, spec); } fassertStatusOK(40467, status); } -void ReplicationConsistencyMarkersImpl::initializeMinValidDocument(OperationContext* opCtx) { - LOG(3) << "Initializing minValid document"; - - // This initializes the values of the required fields if they are not already set. - // If one of the fields is already set, the $max will prefer the existing value since it - // will always be greater than the provided ones. - _updateMinValidDocument(opCtx, - BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName - << Timestamp() - << MinValidDocument::kMinValidTermFieldName - << OpTime::kUninitializedTerm))); -} - bool ReplicationConsistencyMarkersImpl::getInitialSyncFlag(OperationContext* opCtx) const { auto doc = _getMinValidDocument(opCtx); invariant(doc); // Initialized at startup so it should never be missing. @@ -185,11 +188,28 @@ void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* o const OpTime& minValid) { LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON() << ")"; - _updateMinValidDocument(opCtx, - BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName - << minValid.getTimestamp() - << MinValidDocument::kMinValidTermFieldName - << minValid.getTerm()))); + + auto& termField = MinValidDocument::kMinValidTermFieldName; + auto& tsField = MinValidDocument::kMinValidTimestampFieldName; + + // Always update both fields of optime. + auto updateSpec = + BSON("$set" << BSON(tsField << minValid.getTimestamp() << termField << minValid.getTerm())); + BSONObj query; + if (minValid.getTerm() == OpTime::kUninitializedTerm) { + // Only compare timestamps in PV0, but update both fields of optime. + // e.g { ts: { $lt: Timestamp 1508961481000|2 } } + query = BSON(tsField << LT << minValid.getTimestamp()); + } else { + // Set the minValid only if the given term is higher or the terms are the same but + // the given timestamp is higher. + // e.g. { $or: [ { t: { $lt: 1 } }, { t: 1, ts: { $lt: Timestamp 1508961481000|6 } } ] } + query = BSON( + OR(BSON(termField << LT << minValid.getTerm()), + BSON(termField << minValid.getTerm() << tsField << LT << minValid.getTimestamp()))); + } + Status status = _storageInterface->updateSingleton(opCtx, _minValidNss, query, updateSpec); + invariantOK(status); } void ReplicationConsistencyMarkersImpl::removeOldOplogDeleteFromPointField( diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp index 69a233379ed..36b9bca7fc7 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp @@ -288,6 +288,55 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled); } +TEST_F(ReplicationConsistencyMarkersTest, SetMinValidOnPVChange) { + auto minValidNss = makeNamespace(_agent, "minValid"); + auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint"); + auto checkpointTimestampNss = makeNamespace(_agent, "checkpointTimestamp"); + + ReplicationConsistencyMarkersImpl consistencyMarkers( + getStorageInterface(), minValidNss, oplogTruncateAfterPointNss, checkpointTimestampNss); + auto opCtx = getOperationContext(); + consistencyMarkers.initializeMinValidDocument(opCtx); + + auto advanceAndCheckMinValidOpTime = [&](OpTime advanceTo, OpTime expected) { + consistencyMarkers.setMinValidToAtLeast(opCtx, advanceTo); + ASSERT_EQUALS(expected, consistencyMarkers.getMinValid(opCtx)); + }; + + // Set minValid in PV 1. + OpTime startOpTime({Seconds(20), 0}, 1LL); + advanceAndCheckMinValidOpTime(startOpTime, startOpTime); + + // In rollback, minValid is when the date becomes consistent and never goes back. + OpTime rollbackOpTime({Seconds(10), 0}, 1LL); + advanceAndCheckMinValidOpTime(rollbackOpTime, startOpTime); + + // Writes arrive, so minValid advances. + OpTime opTime1({Seconds(30), 0}, 1LL); + advanceAndCheckMinValidOpTime(opTime1, opTime1); + + // A new term starts and oplog diverges, so the timestamp is lower. + OpTime newTermOpTime({Seconds(20), 0}, 2LL); + advanceAndCheckMinValidOpTime(newTermOpTime, newTermOpTime); + + // We should never advance minValid to a lower term, but verify it never goes back even if the + // timestamp is higher. + OpTime invalidOpTime({Seconds(80), 0}, 1LL); + advanceAndCheckMinValidOpTime(invalidOpTime, newTermOpTime); + + // PV downgrade to PV0 + OpTime downgradeOpTime({Seconds(50), 0}, -1LL); + advanceAndCheckMinValidOpTime(downgradeOpTime, downgradeOpTime); + + // Writes arrive in PV0. + OpTime opTime2({Seconds(60), 0}, -1LL); + advanceAndCheckMinValidOpTime(opTime2, opTime2); + + // PV upgrade again. + OpTime upgradeOpTime({Seconds(70), 0}, 0LL); + advanceAndCheckMinValidOpTime(upgradeOpTime, upgradeOpTime); +} + TEST_F(ReplicationConsistencyMarkersTest, OplogTruncateAfterPointUpgrade) { auto minValidNss = makeNamespace(_agent, "minValid"); auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint"); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 5c0665e7c9a..caaab70b6f5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2450,6 +2450,8 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt return status; } + _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx); + auto lastAppliedOpTime = getMyLastAppliedOpTime(); // Since the JournalListener has not yet been set up, we must manually set our diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 30ffeffef04..8a72ffb7f2d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -45,6 +45,7 @@ #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/service_context.h" @@ -521,8 +522,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( // Start data replication after the config has been installed. if (shouldStartDataReplication) { - _externalState->startThreads(_settings); auto opCtx = cc().makeOperationContext(); + _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx.get()); + _externalState->startThreads(_settings); _startDataReplication(opCtx.get()); } } diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index afbeb3561a2..29bc435cbc5 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -233,6 +233,19 @@ public: const NamespaceString& nss, const BSONObj& update) = 0; + /** + * Updates a singleton document in a collection. Never upsert. + * + * If the collection has more than 1 document, the update will only be performed on the first + * one found. + * Returns 'NamespaceNotFound' if the collection does not exist. This does not implicitly + * create the collection so that the caller can create the collection with any collection + * options they want (ex: capped, temp, collation, etc.). + */ + virtual Status updateSingleton(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update) = 0; /** * Finds a single document in the collection referenced by the specified _id. diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 2e048f66c3f..4d3b59455fb 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -742,19 +742,13 @@ StatusWith<BSONObj> makeUpsertQuery(const BSONElement& idKey) { return query; } -Status _upsertWithQuery(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& query, - const BSONObj& update) { - UpdateRequest request(nss); - request.setQuery(query); - request.setUpdates(update); - request.setUpsert(true); +Status _updateWithQuery(OperationContext* opCtx, const UpdateRequest& request) { invariant(!request.isMulti()); // We only want to update one document for performance. invariant(!request.shouldReturnAnyDocs()); invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy()); - return writeConflictRetry(opCtx, "_upsertWithQuery", nss.ns(), [&] { + auto& nss = request.getNamespaceString(); + return writeConflictRetry(opCtx, "_updateWithQuery", nss.ns(), [&] { // ParsedUpdate needs to be inside the write conflict retry loop because it may create a // CanonicalQuery whose ownership will be transferred to the plan executor in // getExecutorUpdate(). @@ -769,7 +763,7 @@ Status _upsertWithQuery(OperationContext* opCtx, autoColl, nss, str::stream() << "Unable to update documents in " << nss.ns() << " using query " - << query); + << request.getQuery()); if (!collectionResult.isOK()) { return collectionResult.getStatus(); } @@ -848,7 +842,22 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx, Status StorageInterfaceImpl::putSingleton(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& update) { - return _upsertWithQuery(opCtx, nss, {}, update); + UpdateRequest request(nss); + request.setQuery({}); + request.setUpdates(update); + request.setUpsert(true); + return _updateWithQuery(opCtx, request); +} + +Status StorageInterfaceImpl::updateSingleton(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update) { + UpdateRequest request(nss); + request.setQuery(query); + request.setUpdates(update); + invariant(!request.isUpsert()); + return _updateWithQuery(opCtx, request); } Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx, diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 94d973ed4e7..ee4702530c9 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -114,6 +114,11 @@ public: const NamespaceString& nss, const BSONObj& update) override; + Status updateSingleton(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update) override; + StatusWith<BSONObj> findById(OperationContext* opCtx, const NamespaceString& nss, const BSONElement& idKey) override; diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 0ce6898d3ec..3f191e6c191 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -1755,6 +1755,32 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesFirstDocumentWhenCollectionI _assertDocumentsInCollectionEquals(opCtx, nss, {BSON("_id" << 0 << "x" << 2), doc2}); } +TEST_F(StorageInterfaceImplTest, UpdateSingletonNeverUpserts) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + auto update = BSON("$set" << BSON("_id" << 0 << "x" << 1)); + ASSERT_OK(storage.updateSingleton(opCtx, nss, {}, update)); + ASSERT_EQ(ErrorCodes::CollectionIsEmpty, storage.findSingleton(opCtx, nss)); + _assertDocumentsInCollectionEquals(opCtx, nss, std::vector<mongo::BSONObj>()); +} + +TEST_F(StorageInterfaceImplTest, UpdateSingletonUpdatesDocumentWhenCollectionIsNotEmpty) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + auto doc1 = BSON("_id" << 0 << "x" << 0); + ASSERT_OK( + storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}, OpTime::kUninitializedTerm)); + auto update = BSON("$set" << BSON("x" << 1)); + ASSERT_OK(storage.updateSingleton(opCtx, nss, BSON("_id" << 0), update)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "x" << 1), + unittest::assertGet(storage.findSingleton(opCtx, nss))); + _assertDocumentsInCollectionEquals(opCtx, nss, {BSON("_id" << 0 << "x" << 1)}); +} + TEST_F(StorageInterfaceImplTest, FindByIdReturnsNamespaceNotFoundWhenDatabaseDoesNotExist) { auto opCtx = getOperationContext(); StorageInterfaceImpl storage; diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 9c88532781f..bc16ff01bbc 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -221,6 +221,13 @@ public: return Status{ErrorCodes::IllegalOperation, "putSingleton not implemented."}; } + Status updateSingleton(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update) override { + return Status{ErrorCodes::IllegalOperation, "updateSingleton not implemented."}; + } + StatusWith<BSONObj> findById(OperationContext* opCtx, const NamespaceString& nss, const BSONElement& idKey) override { |