summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/min_valid_on_pv_change.js72
-rw-r--r--jstests/replsets/rslib.js16
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp58
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl_test.cpp49
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp4
-rw-r--r--src/mongo/db/repl/storage_interface.h13
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp31
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h5
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp26
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h7
11 files changed, 252 insertions, 31 deletions
diff --git a/jstests/noPassthrough/min_valid_on_pv_change.js b/jstests/noPassthrough/min_valid_on_pv_change.js
new file mode 100644
index 00000000000..e89108b9cb5
--- /dev/null
+++ b/jstests/noPassthrough/min_valid_on_pv_change.js
@@ -0,0 +1,72 @@
+// Test minValid is always set to existent OpTime on PV upgrade and downgrade,
+// so that sync source resolver can find a sync source that has the minValid.
+
+(function() {
+ "use strict";
+
+ load("jstests/replsets/rslib.js");
+
+ var testName = "min_valid_on_pv_change";
+ var replTest = new ReplSetTest({name: testName, nodes: 3});
+ var nodes = replTest.nodeList();
+
+ replTest.startSet();
+ var config = {
+ "_id": testName,
+ "members": [
+ {"_id": 0, "host": nodes[0]},
+ {"_id": 1, "host": nodes[1], priority: 0},
+ {"_id": 2, "host": nodes[2], priority: 0},
+ ]
+ };
+
+ // Set verbosity for replication on all nodes.
+ setLogVerbosity(replTest.nodes, {"replication": {"verbosity": 3}, "query": {"verbosity": 3}});
+
+ replTest.initiate(config);
+ var primary = replTest.getPrimary();
+ var secondary = replTest.getSecondary();
+ var coll = primary.getCollection("test.foo");
+
+ function checkMinValidExistsOnPrimary(conn, expectedTerm) {
+ var minValid = conn.getDB("local").replset.minvalid.findOne();
+ assert.eq(minValid.t, expectedTerm, "got unexpected minValid " + tojson(minValid));
+ var lastOpTime = getLastOpTime(primary);
+ if (expectedTerm == -1) {
+ assert.eq(minValid.ts, lastOpTime, "got unexpected minValid " + tojson(minValid));
+ } else {
+ assert.docEq({t: minValid.t, ts: minValid.ts}, lastOpTime);
+ }
+ }
+
+ assert.writeOK(
+ coll.insert({a: 1}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS}));
+ // Verify the minValid is written in PV1 so its term is 1.
+ checkMinValidExistsOnPrimary(secondary, 1);
+
+ jsTestLog("Downgrading to pv0");
+ var conf = replTest.getReplSetConfigFromNode(0);
+ conf.protocolVersion = 0;
+ conf.version++;
+ reconfig(replTest, conf);
+ assert.writeOK(
+ coll.insert({a: 2}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS}));
+ // Verify the minValid is written in PV0 so its term is -1.
+ checkMinValidExistsOnPrimary(secondary, -1);
+
+ jsTestLog("Upgrading to pv1");
+ conf.protocolVersion = 1;
+ conf.version++;
+ reconfig(replTest, conf);
+ assert.writeOK(
+ coll.insert({a: 3}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS}));
+ // Verify the minValid is written in PV1 after PV upgrade so its term is 0.
+ checkMinValidExistsOnPrimary(secondary, 0);
+
+ jsTestLog("Restarting the secondary");
+ replTest.restart(replTest.getSecondary());
+ // Verify the secondary can find the sync source successfully and replicate the doc.
+ assert.writeOK(
+ coll.insert({a: 4}, {writeConcern: {w: 3}, wtimeout: ReplSetTest.kDefaultTimeoutMS}));
+
+})();
diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js
index ecaf21e5d46..1cb338ceeae 100644
--- a/jstests/replsets/rslib.js
+++ b/jstests/replsets/rslib.js
@@ -13,6 +13,7 @@ var waitForState;
var reInitiateWithoutThrowingOnAbortedMember;
var awaitRSClientHosts;
var getLastOpTime;
+var setLogVerbosity;
(function() {
"use strict";
@@ -433,4 +434,19 @@ var getLastOpTime;
var connStatus = replSetStatus.members.filter(m => m.self)[0];
return connStatus.optime;
};
+
+ /**
+ * Set log verbosity on all given nodes.
+ * e.g. setLogVerbosity(replTest.nodes, { "replication": {"verbosity": 3} });
+ */
+ setLogVerbosity = function(nodes, logVerbosity) {
+ var verbosity = {
+ "setParameter": 1,
+ "logComponentVerbosity": logVerbosity,
+ };
+ nodes.forEach(function(node) {
+ assert.commandWorked(node.adminCommand(verbosity));
+ });
+ };
+
}());
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index 27cc61c0271..3896d3a13e2 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/repl/replication_consistency_markers_impl.h"
+#include "mongo/db/bson/bson_helper.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -94,31 +95,33 @@ boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinVali
void ReplicationConsistencyMarkersImpl::_updateMinValidDocument(OperationContext* opCtx,
const BSONObj& updateSpec) {
Status status = _storageInterface->putSingleton(opCtx, _minValidNss, updateSpec);
+ invariantOK(status);
+}
+
+void ReplicationConsistencyMarkersImpl::initializeMinValidDocument(OperationContext* opCtx) {
+ LOG(3) << "Initializing minValid document";
+
+ // This initializes the values of the required fields if they are not already set.
+ // If one of the fields is already set, the $max will prefer the existing value since it
+ // will always be greater than the provided ones.
+ auto spec = BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName
+ << Timestamp()
+ << MinValidDocument::kMinValidTermFieldName
+ << OpTime::kUninitializedTerm));
+
+ Status status = _storageInterface->putSingleton(opCtx, _minValidNss, spec);
// If the collection doesn't exist, create it and try again.
if (status == ErrorCodes::NamespaceNotFound) {
status = _storageInterface->createCollection(opCtx, _minValidNss, CollectionOptions());
fassertStatusOK(40509, status);
- status = _storageInterface->putSingleton(opCtx, _minValidNss, updateSpec);
+ status = _storageInterface->putSingleton(opCtx, _minValidNss, spec);
}
fassertStatusOK(40467, status);
}
-void ReplicationConsistencyMarkersImpl::initializeMinValidDocument(OperationContext* opCtx) {
- LOG(3) << "Initializing minValid document";
-
- // This initializes the values of the required fields if they are not already set.
- // If one of the fields is already set, the $max will prefer the existing value since it
- // will always be greater than the provided ones.
- _updateMinValidDocument(opCtx,
- BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName
- << Timestamp()
- << MinValidDocument::kMinValidTermFieldName
- << OpTime::kUninitializedTerm)));
-}
-
bool ReplicationConsistencyMarkersImpl::getInitialSyncFlag(OperationContext* opCtx) const {
auto doc = _getMinValidDocument(opCtx);
invariant(doc); // Initialized at startup so it should never be missing.
@@ -185,11 +188,28 @@ void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* o
const OpTime& minValid) {
LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON()
<< ")";
- _updateMinValidDocument(opCtx,
- BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName
- << minValid.getTimestamp()
- << MinValidDocument::kMinValidTermFieldName
- << minValid.getTerm())));
+
+ auto& termField = MinValidDocument::kMinValidTermFieldName;
+ auto& tsField = MinValidDocument::kMinValidTimestampFieldName;
+
+ // Always update both fields of optime.
+ auto updateSpec =
+ BSON("$set" << BSON(tsField << minValid.getTimestamp() << termField << minValid.getTerm()));
+ BSONObj query;
+ if (minValid.getTerm() == OpTime::kUninitializedTerm) {
+ // Only compare timestamps in PV0, but update both fields of optime.
+ // e.g { ts: { $lt: Timestamp 1508961481000|2 } }
+ query = BSON(tsField << LT << minValid.getTimestamp());
+ } else {
+ // Set the minValid only if the given term is higher or the terms are the same but
+ // the given timestamp is higher.
+ // e.g. { $or: [ { t: { $lt: 1 } }, { t: 1, ts: { $lt: Timestamp 1508961481000|6 } } ] }
+ query = BSON(
+ OR(BSON(termField << LT << minValid.getTerm()),
+ BSON(termField << minValid.getTerm() << tsField << LT << minValid.getTimestamp())));
+ }
+ Status status = _storageInterface->updateSingleton(opCtx, _minValidNss, query, updateSpec);
+ invariantOK(status);
}
void ReplicationConsistencyMarkersImpl::removeOldOplogDeleteFromPointField(
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
index 69a233379ed..36b9bca7fc7 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
@@ -288,6 +288,55 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) {
ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled);
}
+TEST_F(ReplicationConsistencyMarkersTest, SetMinValidOnPVChange) {
+ auto minValidNss = makeNamespace(_agent, "minValid");
+ auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint");
+ auto checkpointTimestampNss = makeNamespace(_agent, "checkpointTimestamp");
+
+ ReplicationConsistencyMarkersImpl consistencyMarkers(
+ getStorageInterface(), minValidNss, oplogTruncateAfterPointNss, checkpointTimestampNss);
+ auto opCtx = getOperationContext();
+ consistencyMarkers.initializeMinValidDocument(opCtx);
+
+ auto advanceAndCheckMinValidOpTime = [&](OpTime advanceTo, OpTime expected) {
+ consistencyMarkers.setMinValidToAtLeast(opCtx, advanceTo);
+ ASSERT_EQUALS(expected, consistencyMarkers.getMinValid(opCtx));
+ };
+
+ // Set minValid in PV 1.
+ OpTime startOpTime({Seconds(20), 0}, 1LL);
+ advanceAndCheckMinValidOpTime(startOpTime, startOpTime);
+
+ // In rollback, minValid is when the date becomes consistent and never goes back.
+ OpTime rollbackOpTime({Seconds(10), 0}, 1LL);
+ advanceAndCheckMinValidOpTime(rollbackOpTime, startOpTime);
+
+ // Writes arrive, so minValid advances.
+ OpTime opTime1({Seconds(30), 0}, 1LL);
+ advanceAndCheckMinValidOpTime(opTime1, opTime1);
+
+ // A new term starts and oplog diverges, so the timestamp is lower.
+ OpTime newTermOpTime({Seconds(20), 0}, 2LL);
+ advanceAndCheckMinValidOpTime(newTermOpTime, newTermOpTime);
+
+ // We should never advance minValid to a lower term, but verify it never goes back even if the
+ // timestamp is higher.
+ OpTime invalidOpTime({Seconds(80), 0}, 1LL);
+ advanceAndCheckMinValidOpTime(invalidOpTime, newTermOpTime);
+
+ // PV downgrade to PV0
+ OpTime downgradeOpTime({Seconds(50), 0}, -1LL);
+ advanceAndCheckMinValidOpTime(downgradeOpTime, downgradeOpTime);
+
+ // Writes arrive in PV0.
+ OpTime opTime2({Seconds(60), 0}, -1LL);
+ advanceAndCheckMinValidOpTime(opTime2, opTime2);
+
+ // PV upgrade again.
+ OpTime upgradeOpTime({Seconds(70), 0}, 0LL);
+ advanceAndCheckMinValidOpTime(upgradeOpTime, upgradeOpTime);
+}
+
TEST_F(ReplicationConsistencyMarkersTest, OplogTruncateAfterPointUpgrade) {
auto minValidNss = makeNamespace(_agent, "minValid");
auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint");
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 5c0665e7c9a..caaab70b6f5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2450,6 +2450,8 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt
return status;
}
+ _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx);
+
auto lastAppliedOpTime = getMyLastAppliedOpTime();
// Since the JournalListener has not yet been set up, we must manually set our
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 30ffeffef04..8a72ffb7f2d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/service_context.h"
@@ -521,8 +522,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
// Start data replication after the config has been installed.
if (shouldStartDataReplication) {
- _externalState->startThreads(_settings);
auto opCtx = cc().makeOperationContext();
+ _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx.get());
+ _externalState->startThreads(_settings);
_startDataReplication(opCtx.get());
}
}
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index afbeb3561a2..29bc435cbc5 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -233,6 +233,19 @@ public:
const NamespaceString& nss,
const BSONObj& update) = 0;
+ /**
+ * Updates a singleton document in a collection. Never upsert.
+ *
+ * If the collection has more than 1 document, the update will only be performed on the first
+ * one found.
+ * Returns 'NamespaceNotFound' if the collection does not exist. This does not implicitly
+ * create the collection so that the caller can create the collection with any collection
+ * options they want (ex: capped, temp, collation, etc.).
+ */
+ virtual Status updateSingleton(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update) = 0;
/**
* Finds a single document in the collection referenced by the specified _id.
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 2e048f66c3f..4d3b59455fb 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -742,19 +742,13 @@ StatusWith<BSONObj> makeUpsertQuery(const BSONElement& idKey) {
return query;
}
-Status _upsertWithQuery(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& update) {
- UpdateRequest request(nss);
- request.setQuery(query);
- request.setUpdates(update);
- request.setUpsert(true);
+Status _updateWithQuery(OperationContext* opCtx, const UpdateRequest& request) {
invariant(!request.isMulti()); // We only want to update one document for performance.
invariant(!request.shouldReturnAnyDocs());
invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy());
- return writeConflictRetry(opCtx, "_upsertWithQuery", nss.ns(), [&] {
+ auto& nss = request.getNamespaceString();
+ return writeConflictRetry(opCtx, "_updateWithQuery", nss.ns(), [&] {
// ParsedUpdate needs to be inside the write conflict retry loop because it may create a
// CanonicalQuery whose ownership will be transferred to the plan executor in
// getExecutorUpdate().
@@ -769,7 +763,7 @@ Status _upsertWithQuery(OperationContext* opCtx,
autoColl,
nss,
str::stream() << "Unable to update documents in " << nss.ns() << " using query "
- << query);
+ << request.getQuery());
if (!collectionResult.isOK()) {
return collectionResult.getStatus();
}
@@ -848,7 +842,22 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx,
Status StorageInterfaceImpl::putSingleton(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& update) {
- return _upsertWithQuery(opCtx, nss, {}, update);
+ UpdateRequest request(nss);
+ request.setQuery({});
+ request.setUpdates(update);
+ request.setUpsert(true);
+ return _updateWithQuery(opCtx, request);
+}
+
+Status StorageInterfaceImpl::updateSingleton(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update) {
+ UpdateRequest request(nss);
+ request.setQuery(query);
+ request.setUpdates(update);
+ invariant(!request.isUpsert());
+ return _updateWithQuery(opCtx, request);
}
Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 94d973ed4e7..ee4702530c9 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -114,6 +114,11 @@ public:
const NamespaceString& nss,
const BSONObj& update) override;
+ Status updateSingleton(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update) override;
+
StatusWith<BSONObj> findById(OperationContext* opCtx,
const NamespaceString& nss,
const BSONElement& idKey) override;
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 0ce6898d3ec..3f191e6c191 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -1755,6 +1755,32 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesFirstDocumentWhenCollectionI
_assertDocumentsInCollectionEquals(opCtx, nss, {BSON("_id" << 0 << "x" << 2), doc2});
}
+TEST_F(StorageInterfaceImplTest, UpdateSingletonNeverUpserts) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+ auto update = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+ ASSERT_OK(storage.updateSingleton(opCtx, nss, {}, update));
+ ASSERT_EQ(ErrorCodes::CollectionIsEmpty, storage.findSingleton(opCtx, nss));
+ _assertDocumentsInCollectionEquals(opCtx, nss, std::vector<mongo::BSONObj>());
+}
+
+TEST_F(StorageInterfaceImplTest, UpdateSingletonUpdatesDocumentWhenCollectionIsNotEmpty) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+ auto doc1 = BSON("_id" << 0 << "x" << 0);
+ ASSERT_OK(
+ storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}, OpTime::kUninitializedTerm));
+ auto update = BSON("$set" << BSON("x" << 1));
+ ASSERT_OK(storage.updateSingleton(opCtx, nss, BSON("_id" << 0), update));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "x" << 1),
+ unittest::assertGet(storage.findSingleton(opCtx, nss)));
+ _assertDocumentsInCollectionEquals(opCtx, nss, {BSON("_id" << 0 << "x" << 1)});
+}
+
TEST_F(StorageInterfaceImplTest, FindByIdReturnsNamespaceNotFoundWhenDatabaseDoesNotExist) {
auto opCtx = getOperationContext();
StorageInterfaceImpl storage;
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 9c88532781f..bc16ff01bbc 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -221,6 +221,13 @@ public:
return Status{ErrorCodes::IllegalOperation, "putSingleton not implemented."};
}
+ Status updateSingleton(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update) override {
+ return Status{ErrorCodes::IllegalOperation, "updateSingleton not implemented."};
+ }
+
StatusWith<BSONObj> findById(OperationContext* opCtx,
const NamespaceString& nss,
const BSONElement& idKey) override {