summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@10gen.com>2021-10-04 22:27:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-04 23:57:08 +0000
commit4a230c070cb187604d07d04598a912b4feca937d (patch)
tree906ef27e1f00a5eedd7d85d814d31d8bc069a1de
parentdacbdf22acd913991b7dbb89efc0943be15e9e66 (diff)
downloadmongo-4a230c070cb187604d07d04598a912b4feca937d.tar.gz
SERVER-58636 Allow initial syncing nodes to see entries from uncommitted batches on secondaries
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js130
-rw-r--r--src/mongo/db/commands/find_cmd.cpp34
-rw-r--r--src/mongo/db/repl/oplog_applier_utils.cpp12
-rw-r--r--src/mongo/db/storage/recovery_unit.h32
-rw-r--r--src/mongo/db/storage/snapshot_helper.cpp11
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp16
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h8
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp10
9 files changed, 257 insertions, 0 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 8c591c529ec..42c927a7497 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -94,6 +94,8 @@ last-continuous:
test_file: jstests/core/timeseries/timeseries_bucket_rename.js
- ticket: SERVER-59424
test_file: jstests/sharding/change_stream_show_migration_events.js
+ - ticket: SERVER-58636
+ test_file: jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
@@ -348,6 +350,8 @@ last-lts:
test_file: jstests/aggregation/expressions/date_add_subtract.js
- ticket: SERVER-59424
test_file: jstests/sharding/change_stream_show_migration_events.js
+ - ticket: SERVER-58636
+ test_file: jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
diff --git a/jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js b/jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js
new file mode 100644
index 00000000000..e446f04d726
--- /dev/null
+++ b/jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js
@@ -0,0 +1,130 @@
+/**
+ * It was previously possible during initial sync for the oplog fetcher to miss a drop that the
+ * cloner observed, specifically when it happens during a not-yet-finalized batch on the sync
+ * source. This tests that this is no longer possible.
+ *
+ * @tags: [requires_replication]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+
+const rst = new ReplSetTest({nodes: 3});
+rst.startSet();
+rst.initiate();
+
+const dbName = "testdb";
+const collName = "testcoll";
+const testNss = dbName + "." + collName;
+
+const primary = rst.getPrimary();
+const syncSource = rst.getSecondaries()[0];
+let initSyncNode = rst.getSecondaries()[1];
+
+var verbosityCmd = {
+ "setParameter": 1,
+ "logComponentVerbosity": {
+ "replication": {"verbosity": 3},
+ }
+};
+assert.commandWorked(syncSource.adminCommand(verbosityCmd));
+
+jsTestLog("Inserting some docs on the primary");
+const primaryDB = primary.getDB(dbName);
+assert.commandWorked(primaryDB.getCollection(collName).insert({"a": 1}));
+assert.commandWorked(primaryDB.getCollection(collName).insert({"b": 2}));
+rst.awaitReplication();
+
+const clonerFailPoint = "hangBeforeClonerStage";
+const failPointData = {
+ cloner: "CollectionCloner",
+ stage: "listIndexes",
+ nss: testNss
+};
+
+jsTestLog("Restarting last node for initial sync");
+let startupParams = {};
+startupParams["logComponentVerbosity"] = tojson({replication: 3});
+startupParams["initialSyncSourceReadPreference"] = "secondaryPreferred";
+startupParams["failpoint." + clonerFailPoint] = tojson({mode: "alwaysOn", data: failPointData});
+initSyncNode = rst.restart(initSyncNode, {startClean: true, setParameter: startupParams});
+
+jsTestLog("Waiting for initial syncing node to hit failpoint");
+assert.commandWorked(initSyncNode.adminCommand({
+ waitForFailPoint: clonerFailPoint,
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+
+jsTestLog("Adding more data to initial sync");
+assert.commandWorked(primaryDB.getCollection(collName).insert({"c": 3}));
+rst.awaitReplication(undefined /* timeout */, undefined /*secondaryOpTimeType */, [syncSource]);
+
+const dropFailPoint =
+ configureFailPoint(syncSource, "hangAfterApplyingCollectionDropOplogEntry", {dbName: dbName});
+
+assert(primaryDB.getCollection(collName).drop({writeConcern: {w: 1}}));
+
+jsTestLog("Waiting for sync source to hit drop failpoint");
+dropFailPoint.wait();
+sleep(10 * 1000);
+
+// Enable this so we too can see the drop entry.
+const allowExternalReadsFp =
+ configureFailPoint(syncSource, "allowExternalReadsForReverseOplogScanRule");
+
+const syncSourceEntries =
+ syncSource.getCollection('local.oplog.rs').find({ns: /testdb/i}).sort({$natural: -1}).toArray();
+
+syncSourceEntries.forEach(function(entry) {
+ jsTestLog("Sync source entry: " + tojson(entry));
+});
+
+const latestSyncSourceEntry = syncSourceEntries[0];
+
+assert(latestSyncSourceEntry.o, () => tojson(latestSyncSourceEntry));
+assert(latestSyncSourceEntry.o.drop, () => tojson(latestSyncSourceEntry));
+assert.eq(collName, latestSyncSourceEntry.o.drop, () => tojson(latestSyncSourceEntry));
+
+const targetStopTs = latestSyncSourceEntry.ts;
+
+allowExternalReadsFp.off();
+
+jsTestLog("Resuming initial sync");
+assert.commandWorked(initSyncNode.adminCommand({configureFailPoint: clonerFailPoint, mode: 'off'}));
+
+jsTestLog("Waiting for initial sync node to reach correct stopTimestamp");
+assert.soonNoExcept(function() {
+ const nodeStatus = assert.commandWorked(initSyncNode.adminCommand({replSetGetStatus: 1}));
+ assert(nodeStatus, () => tojson(nodeStatus));
+ assert(nodeStatus.initialSyncStatus, () => tojson(nodeStatus));
+ // Is actually the 'stopTimestamp'.
+ assert(nodeStatus.initialSyncStatus.initialSyncOplogEnd, () => tojson(nodeStatus));
+ const currentStopTs = nodeStatus.initialSyncStatus.initialSyncOplogEnd;
+ assert.eq(currentStopTs, targetStopTs, () => tojson(nodeStatus));
+
+ const comparison = bsonWoCompare(currentStopTs, targetStopTs);
+
+ if (comparison == 0) {
+ // We reached the stopTs we wanted.
+ return true;
+ }
+
+ // We should never not exceed that timestamp.
+ assert.lte(currentStopTs, targetStopTs, () => tojson(nodeStatus));
+ return false;
+});
+
+// Now that the stopTimestamp is far enough to include the drop, we also need to allow
+// the fetcher to actually replicate those entries.
+jsTestLog("Resuming batch application on the secondary");
+dropFailPoint.off();
+
+jsTestLog("Waiting for initial sync to complete");
+rst.waitForState(initSyncNode, ReplSetTest.State.SECONDARY); // will time out on error
+
+rst.awaitReplication();
+rst.stopSet();
+})();
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 4120b38cd6d..39dee1b5f29 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -59,10 +59,13 @@
#include "mongo/db/transaction_participant.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(allowExternalReadsForReverseOplogScanRule);
+
const auto kTermField = "term"_sd;
// Older client code before FCV 5.1 could still send 'ntoreturn' to mean a 'limit' or 'batchSize'.
@@ -424,6 +427,37 @@ public:
opCtx->lockState()->skipAcquireTicket();
}
+ // If this read represents a reverse oplog scan, we want to bypass oplog visibility
+ // rules in the case of secondaries. We normally only read from these nodes at batch
+ // boundaries, but in this specific case we should fetch all new entries, to be
+ // consistent with any catalog changes that might be observable before the batch is
+ // finalized. This special rule for reverse oplog scans is needed by replication
+ // initial sync, for the purposes of calculating the stopTimestamp correctly.
+ boost::optional<PinReadSourceBlock> pinReadSourceBlock;
+ if (isOplogNss) {
+ auto reverseScan = false;
+
+ auto cmdSort = findCommand->getSort();
+ if (!cmdSort.isEmpty()) {
+ BSONElement natural = cmdSort[query_request_helper::kNaturalSortField];
+ if (natural) {
+ reverseScan = natural.safeNumberInt() < 0;
+ }
+ }
+
+ auto isInternal = (opCtx->getClient()->session() &&
+ (opCtx->getClient()->session()->getTags() &
+ transport::Session::kInternalClient));
+
+ if (MONGO_unlikely(allowExternalReadsForReverseOplogScanRule.shouldFail())) {
+ isInternal = true;
+ }
+
+ if (reverseScan && isInternal) {
+ pinReadSourceBlock.emplace(opCtx->recoveryUnit());
+ }
+ }
+
// Acquire locks. If the query is on a view, we release our locks and convert the query
// request into an aggregation command.
boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> ctx;
diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp
index a6ed6f8a0ed..91a09be2e93 100644
--- a/src/mongo/db/repl/oplog_applier_utils.cpp
+++ b/src/mongo/db/repl/oplog_applier_utils.cpp
@@ -40,9 +40,12 @@
#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/util/fail_point.h"
#include "mongo/logv2/log.h"
+MONGO_FAIL_POINT_DEFINE(hangAfterApplyingCollectionDropOplogEntry);
+
namespace mongo {
namespace repl {
CachedCollectionProperties::CollectionProperties
@@ -267,6 +270,15 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
incrementOpsAppliedStats();
return status;
});
+ if (op.getCommandType() == mongo::repl::OplogEntry::CommandType::kDrop) {
+ hangAfterApplyingCollectionDropOplogEntry.executeIf(
+ [&](const BSONObj&) {
+ hangAfterApplyingCollectionDropOplogEntry.pauseWhileSet();
+ LOGV2(5863600,
+ "Hanging due to 'hangAfterApplyingCollectionDropOplogEntry' failpoint.");
+ },
+ [&](const BSONObj& data) { return (nss.db() == data["dbName"].str()); });
+ }
return status;
}
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 2e641cd94d0..5be6ce96050 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -466,6 +466,18 @@ public:
};
/**
+ * Pinning informs callers not to change the ReadSource on this RecoveryUnit. Callers are
+ * expected to first check isReadSourcePinned before attempting to change the ReadSource. An
+ * error may occur otherwise.
+ * See also `PinReadSourceBlock` for a RAII-style solution.
+ */
+ virtual void pinReadSource() {}
+ virtual void unpinReadSource() {}
+ virtual bool isReadSourcePinned() const {
+ return false;
+ }
+
+ /**
* Sets whether this operation intends to perform reads that do not need to keep data in the
* storage engine cache. This can be useful for operations that do large, one-time scans of
* data, and will attempt to keep higher-priority data from being evicted from the cache. This
@@ -762,4 +774,24 @@ private:
uint64_t _mySnapshotId;
};
+/**
+ * RAII-style class to manage pinning and unpinning the readSource.
+ */
+class PinReadSourceBlock {
+ PinReadSourceBlock(const PinReadSourceBlock&) = delete;
+ PinReadSourceBlock& operator=(const PinReadSourceBlock&) = delete;
+
+public:
+ explicit PinReadSourceBlock(RecoveryUnit* recoveryUnit) : _recoveryUnit(recoveryUnit) {
+ _recoveryUnit->pinReadSource();
+ }
+
+ ~PinReadSourceBlock() {
+ _recoveryUnit->unpinReadSource();
+ }
+
+private:
+ RecoveryUnit* const _recoveryUnit;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/storage/snapshot_helper.cpp b/src/mongo/db/storage/snapshot_helper.cpp
index 73b8a5eee25..db0d1e3c207 100644
--- a/src/mongo/db/storage/snapshot_helper.cpp
+++ b/src/mongo/db/storage/snapshot_helper.cpp
@@ -141,6 +141,17 @@ ReadSourceChange shouldChangeReadSource(OperationContext* opCtx, const Namespace
}
const auto existing = opCtx->recoveryUnit()->getTimestampReadSource();
+ if (opCtx->recoveryUnit()->isReadSourcePinned()) {
+ LOGV2_DEBUG(5863601,
+ 2,
+ "Not changing readSource as it is pinned",
+ "current"_attr = RecoveryUnit::toString(existing),
+ "rejected"_attr = readAtLastApplied
+ ? RecoveryUnit::toString(RecoveryUnit::ReadSource::kLastApplied)
+ : RecoveryUnit::toString(RecoveryUnit::ReadSource::kNoTimestamp));
+ return {boost::none, false};
+ }
+
if (existing == RecoveryUnit::ReadSource::kNoTimestamp) {
// Shifting from reading without a timestamp to reading with a timestamp can be dangerous
// because writes will appear to vanish. This case is intended for new reads on secondaries
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 3e05d6b0f55..f95dc90b7bf 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -955,6 +955,8 @@ void WiredTigerRecoveryUnit::setRoundUpPreparedTimestamps(bool value) {
void WiredTigerRecoveryUnit::setTimestampReadSource(ReadSource readSource,
boost::optional<Timestamp> provided) {
+ tassert(5863604, "Cannot change ReadSource as it is pinned.", !isReadSourcePinned());
+
LOGV2_DEBUG(22416,
3,
"setting timestamp read source",
@@ -976,6 +978,20 @@ RecoveryUnit::ReadSource WiredTigerRecoveryUnit::getTimestampReadSource() const
return _timestampReadSource;
}
+void WiredTigerRecoveryUnit::pinReadSource() {
+ LOGV2_DEBUG(5863602, 3, "Pinning read source on WT recovery unit");
+ _readSourcePinned = true;
+}
+
+void WiredTigerRecoveryUnit::unpinReadSource() {
+ LOGV2_DEBUG(5863603, 3, "Unpinning WT recovery unit read source");
+ _readSourcePinned = false;
+}
+
+bool WiredTigerRecoveryUnit::isReadSourcePinned() const {
+ return _readSourcePinned;
+}
+
void WiredTigerRecoveryUnit::beginIdle() {
// Close all cursors, we don't want to keep any old cached cursors around.
if (_session) {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index 6d129a7d532..70b13ffa3f2 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -154,6 +154,12 @@ public:
ReadSource getTimestampReadSource() const override;
+ void pinReadSource() override;
+
+ void unpinReadSource() override;
+
+ bool isReadSourcePinned() const override;
+
virtual void setOrderedCommit(bool orderedCommit) override {
_orderedCommit = orderedCommit;
}
@@ -292,6 +298,8 @@ private:
// When 'true', data read from disk should not be kept in the storage engine cache.
bool _readOnce = false;
+ bool _readSourcePinned = false;
+
// The behavior of handling prepare conflicts.
PrepareConflictBehavior _prepareConflictBehavior{PrepareConflictBehavior::kEnforce};
// Dictates whether to round up prepare and commit timestamp of a prepared transaction.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
index d8b5ba12141..a6cb4495b12 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
@@ -876,5 +876,15 @@ DEATH_TEST_F(WiredTigerRecoveryUnitTestFixture,
}
}
+DEATH_TEST_F(WiredTigerRecoveryUnitTestFixture,
+ MayNotChangeReadSourceWhilePinned,
+ "Cannot change ReadSource as it is pinned.") {
+
+ // Storage engine operations require at least Global IS.
+ Lock::GlobalLock lk(clientAndCtx1.second.get(), MODE_IS);
+ ru1->pinReadSource();
+ ru1->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap);
+}
+
} // namespace
} // namespace mongo