From 4a230c070cb187604d07d04598a912b4feca937d Mon Sep 17 00:00:00 2001 From: Vesselina Ratcheva Date: Mon, 4 Oct 2021 22:27:25 +0000 Subject: SERVER-58636 Allow initial syncing nodes to see entries from uncommitted batches on secondaries --- etc/backports_required_for_multiversion_tests.yml | 4 + ...tial_sync_replicate_drop_mid_secondary_batch.js | 130 +++++++++++++++++++++ src/mongo/db/commands/find_cmd.cpp | 34 ++++++ src/mongo/db/repl/oplog_applier_utils.cpp | 12 ++ src/mongo/db/storage/recovery_unit.h | 32 +++++ src/mongo/db/storage/snapshot_helper.cpp | 11 ++ .../wiredtiger/wiredtiger_recovery_unit.cpp | 16 +++ .../storage/wiredtiger/wiredtiger_recovery_unit.h | 8 ++ .../wiredtiger/wiredtiger_recovery_unit_test.cpp | 10 ++ 9 files changed, 257 insertions(+) create mode 100644 jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 8c591c529ec..42c927a7497 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -94,6 +94,8 @@ last-continuous: test_file: jstests/core/timeseries/timeseries_bucket_rename.js - ticket: SERVER-59424 test_file: jstests/sharding/change_stream_show_migration_events.js + - ticket: SERVER-58636 + test_file: jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: @@ -348,6 +350,8 @@ last-lts: test_file: jstests/aggregation/expressions/date_add_subtract.js - ticket: SERVER-59424 test_file: jstests/sharding/change_stream_show_migration_events.js + - ticket: SERVER-58636 + test_file: jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js b/jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js new file mode 100644 index 00000000000..e446f04d726 --- /dev/null +++ b/jstests/replsets/initial_sync_replicate_drop_mid_secondary_batch.js @@ -0,0 +1,130 @@ +/** + * It was previously possible during initial sync for the oplog fetcher to miss a drop that the + * cloner observed, specifically when it happens during a not-yet-finalized batch on the sync + * source. This tests that this is no longer possible. + * + * @tags: [requires_replication] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +const rst = new ReplSetTest({nodes: 3}); +rst.startSet(); +rst.initiate(); + +const dbName = "testdb"; +const collName = "testcoll"; +const testNss = dbName + "." + collName; + +const primary = rst.getPrimary(); +const syncSource = rst.getSecondaries()[0]; +let initSyncNode = rst.getSecondaries()[1]; + +var verbosityCmd = { + "setParameter": 1, + "logComponentVerbosity": { + "replication": {"verbosity": 3}, + } +}; +assert.commandWorked(syncSource.adminCommand(verbosityCmd)); + +jsTestLog("Inserting some docs on the primary"); +const primaryDB = primary.getDB(dbName); +assert.commandWorked(primaryDB.getCollection(collName).insert({"a": 1})); +assert.commandWorked(primaryDB.getCollection(collName).insert({"b": 2})); +rst.awaitReplication(); + +const clonerFailPoint = "hangBeforeClonerStage"; +const failPointData = { + cloner: "CollectionCloner", + stage: "listIndexes", + nss: testNss +}; + +jsTestLog("Restarting last node for initial sync"); +let startupParams = {}; +startupParams["logComponentVerbosity"] = tojson({replication: 3}); +startupParams["initialSyncSourceReadPreference"] = "secondaryPreferred"; +startupParams["failpoint." + clonerFailPoint] = tojson({mode: "alwaysOn", data: failPointData}); +initSyncNode = rst.restart(initSyncNode, {startClean: true, setParameter: startupParams}); + +jsTestLog("Waiting for initial syncing node to hit failpoint"); +assert.commandWorked(initSyncNode.adminCommand({ + waitForFailPoint: clonerFailPoint, + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); + +jsTestLog("Adding more data to initial sync"); +assert.commandWorked(primaryDB.getCollection(collName).insert({"c": 3})); +rst.awaitReplication(undefined /* timeout */, undefined /*secondaryOpTimeType */, [syncSource]); + +const dropFailPoint = + configureFailPoint(syncSource, "hangAfterApplyingCollectionDropOplogEntry", {dbName: dbName}); + +assert(primaryDB.getCollection(collName).drop({writeConcern: {w: 1}})); + +jsTestLog("Waiting for sync source to hit drop failpoint"); +dropFailPoint.wait(); +sleep(10 * 1000); + +// Enable this so we too can see the drop entry. +const allowExternalReadsFp = + configureFailPoint(syncSource, "allowExternalReadsForReverseOplogScanRule"); + +const syncSourceEntries = + syncSource.getCollection('local.oplog.rs').find({ns: /testdb/i}).sort({$natural: -1}).toArray(); + +syncSourceEntries.forEach(function(entry) { + jsTestLog("Sync source entry: " + tojson(entry)); +}); + +const latestSyncSourceEntry = syncSourceEntries[0]; + +assert(latestSyncSourceEntry.o, () => tojson(latestSyncSourceEntry)); +assert(latestSyncSourceEntry.o.drop, () => tojson(latestSyncSourceEntry)); +assert.eq(collName, latestSyncSourceEntry.o.drop, () => tojson(latestSyncSourceEntry)); + +const targetStopTs = latestSyncSourceEntry.ts; + +allowExternalReadsFp.off(); + +jsTestLog("Resuming initial sync"); +assert.commandWorked(initSyncNode.adminCommand({configureFailPoint: clonerFailPoint, mode: 'off'})); + +jsTestLog("Waiting for initial sync node to reach correct stopTimestamp"); +assert.soonNoExcept(function() { + const nodeStatus = assert.commandWorked(initSyncNode.adminCommand({replSetGetStatus: 1})); + assert(nodeStatus, () => tojson(nodeStatus)); + assert(nodeStatus.initialSyncStatus, () => tojson(nodeStatus)); + // Is actually the 'stopTimestamp'. + assert(nodeStatus.initialSyncStatus.initialSyncOplogEnd, () => tojson(nodeStatus)); + const currentStopTs = nodeStatus.initialSyncStatus.initialSyncOplogEnd; + assert.eq(currentStopTs, targetStopTs, () => tojson(nodeStatus)); + + const comparison = bsonWoCompare(currentStopTs, targetStopTs); + + if (comparison == 0) { + // We reached the stopTs we wanted. + return true; + } + + // We should never not exceed that timestamp. + assert.lte(currentStopTs, targetStopTs, () => tojson(nodeStatus)); + return false; +}); + +// Now that the stopTimestamp is far enough to include the drop, we also need to allow +// the fetcher to actually replicate those entries. +jsTestLog("Resuming batch application on the secondary"); +dropFailPoint.off(); + +jsTestLog("Waiting for initial sync to complete"); +rst.waitForState(initSyncNode, ReplSetTest.State.SECONDARY); // will time out on error + +rst.awaitReplication(); +rst.stopSet(); +})(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 4120b38cd6d..39dee1b5f29 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -59,10 +59,13 @@ #include "mongo/db/transaction_participant.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/fail_point.h" namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(allowExternalReadsForReverseOplogScanRule); + const auto kTermField = "term"_sd; // Older client code before FCV 5.1 could still send 'ntoreturn' to mean a 'limit' or 'batchSize'. @@ -424,6 +427,37 @@ public: opCtx->lockState()->skipAcquireTicket(); } + // If this read represents a reverse oplog scan, we want to bypass oplog visibility + // rules in the case of secondaries. We normally only read from these nodes at batch + // boundaries, but in this specific case we should fetch all new entries, to be + // consistent with any catalog changes that might be observable before the batch is + // finalized. This special rule for reverse oplog scans is needed by replication + // initial sync, for the purposes of calculating the stopTimestamp correctly. + boost::optional pinReadSourceBlock; + if (isOplogNss) { + auto reverseScan = false; + + auto cmdSort = findCommand->getSort(); + if (!cmdSort.isEmpty()) { + BSONElement natural = cmdSort[query_request_helper::kNaturalSortField]; + if (natural) { + reverseScan = natural.safeNumberInt() < 0; + } + } + + auto isInternal = (opCtx->getClient()->session() && + (opCtx->getClient()->session()->getTags() & + transport::Session::kInternalClient)); + + if (MONGO_unlikely(allowExternalReadsForReverseOplogScanRule.shouldFail())) { + isInternal = true; + } + + if (reverseScan && isInternal) { + pinReadSourceBlock.emplace(opCtx->recoveryUnit()); + } + } + // Acquire locks. If the query is on a view, we release our locks and convert the query // request into an aggregation command. boost::optional ctx; diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp index a6ed6f8a0ed..91a09be2e93 100644 --- a/src/mongo/db/repl/oplog_applier_utils.cpp +++ b/src/mongo/db/repl/oplog_applier_utils.cpp @@ -40,9 +40,12 @@ #include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/stats/counters.h" +#include "mongo/util/fail_point.h" #include "mongo/logv2/log.h" +MONGO_FAIL_POINT_DEFINE(hangAfterApplyingCollectionDropOplogEntry); + namespace mongo { namespace repl { CachedCollectionProperties::CollectionProperties @@ -267,6 +270,15 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( incrementOpsAppliedStats(); return status; }); + if (op.getCommandType() == mongo::repl::OplogEntry::CommandType::kDrop) { + hangAfterApplyingCollectionDropOplogEntry.executeIf( + [&](const BSONObj&) { + hangAfterApplyingCollectionDropOplogEntry.pauseWhileSet(); + LOGV2(5863600, + "Hanging due to 'hangAfterApplyingCollectionDropOplogEntry' failpoint."); + }, + [&](const BSONObj& data) { return (nss.db() == data["dbName"].str()); }); + } return status; } diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 2e641cd94d0..5be6ce96050 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -465,6 +465,18 @@ public: return ReadSource::kNoTimestamp; }; + /** + * Pinning informs callers not to change the ReadSource on this RecoveryUnit. Callers are + * expected to first check isReadSourcePinned before attempting to change the ReadSource. An + * error may occur otherwise. + * See also `PinReadSourceBlock` for a RAII-style solution. + */ + virtual void pinReadSource() {} + virtual void unpinReadSource() {} + virtual bool isReadSourcePinned() const { + return false; + } + /** * Sets whether this operation intends to perform reads that do not need to keep data in the * storage engine cache. This can be useful for operations that do large, one-time scans of @@ -762,4 +774,24 @@ private: uint64_t _mySnapshotId; }; +/** + * RAII-style class to manage pinning and unpinning the readSource. + */ +class PinReadSourceBlock { + PinReadSourceBlock(const PinReadSourceBlock&) = delete; + PinReadSourceBlock& operator=(const PinReadSourceBlock&) = delete; + +public: + explicit PinReadSourceBlock(RecoveryUnit* recoveryUnit) : _recoveryUnit(recoveryUnit) { + _recoveryUnit->pinReadSource(); + } + + ~PinReadSourceBlock() { + _recoveryUnit->unpinReadSource(); + } + +private: + RecoveryUnit* const _recoveryUnit; +}; + } // namespace mongo diff --git a/src/mongo/db/storage/snapshot_helper.cpp b/src/mongo/db/storage/snapshot_helper.cpp index 73b8a5eee25..db0d1e3c207 100644 --- a/src/mongo/db/storage/snapshot_helper.cpp +++ b/src/mongo/db/storage/snapshot_helper.cpp @@ -141,6 +141,17 @@ ReadSourceChange shouldChangeReadSource(OperationContext* opCtx, const Namespace } const auto existing = opCtx->recoveryUnit()->getTimestampReadSource(); + if (opCtx->recoveryUnit()->isReadSourcePinned()) { + LOGV2_DEBUG(5863601, + 2, + "Not changing readSource as it is pinned", + "current"_attr = RecoveryUnit::toString(existing), + "rejected"_attr = readAtLastApplied + ? RecoveryUnit::toString(RecoveryUnit::ReadSource::kLastApplied) + : RecoveryUnit::toString(RecoveryUnit::ReadSource::kNoTimestamp)); + return {boost::none, false}; + } + if (existing == RecoveryUnit::ReadSource::kNoTimestamp) { // Shifting from reading without a timestamp to reading with a timestamp can be dangerous // because writes will appear to vanish. This case is intended for new reads on secondaries diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 3e05d6b0f55..f95dc90b7bf 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -955,6 +955,8 @@ void WiredTigerRecoveryUnit::setRoundUpPreparedTimestamps(bool value) { void WiredTigerRecoveryUnit::setTimestampReadSource(ReadSource readSource, boost::optional provided) { + tassert(5863604, "Cannot change ReadSource as it is pinned.", !isReadSourcePinned()); + LOGV2_DEBUG(22416, 3, "setting timestamp read source", @@ -976,6 +978,20 @@ RecoveryUnit::ReadSource WiredTigerRecoveryUnit::getTimestampReadSource() const return _timestampReadSource; } +void WiredTigerRecoveryUnit::pinReadSource() { + LOGV2_DEBUG(5863602, 3, "Pinning read source on WT recovery unit"); + _readSourcePinned = true; +} + +void WiredTigerRecoveryUnit::unpinReadSource() { + LOGV2_DEBUG(5863603, 3, "Unpinning WT recovery unit read source"); + _readSourcePinned = false; +} + +bool WiredTigerRecoveryUnit::isReadSourcePinned() const { + return _readSourcePinned; +} + void WiredTigerRecoveryUnit::beginIdle() { // Close all cursors, we don't want to keep any old cached cursors around. if (_session) { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index 6d129a7d532..70b13ffa3f2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -154,6 +154,12 @@ public: ReadSource getTimestampReadSource() const override; + void pinReadSource() override; + + void unpinReadSource() override; + + bool isReadSourcePinned() const override; + virtual void setOrderedCommit(bool orderedCommit) override { _orderedCommit = orderedCommit; } @@ -292,6 +298,8 @@ private: // When 'true', data read from disk should not be kept in the storage engine cache. bool _readOnce = false; + bool _readSourcePinned = false; + // The behavior of handling prepare conflicts. PrepareConflictBehavior _prepareConflictBehavior{PrepareConflictBehavior::kEnforce}; // Dictates whether to round up prepare and commit timestamp of a prepared transaction. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp index d8b5ba12141..a6cb4495b12 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp @@ -876,5 +876,15 @@ DEATH_TEST_F(WiredTigerRecoveryUnitTestFixture, } } +DEATH_TEST_F(WiredTigerRecoveryUnitTestFixture, + MayNotChangeReadSourceWhilePinned, + "Cannot change ReadSource as it is pinned.") { + + // Storage engine operations require at least Global IS. + Lock::GlobalLock lk(clientAndCtx1.second.get(), MODE_IS); + ru1->pinReadSource(); + ru1->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap); +} + } // namespace } // namespace mongo -- cgit v1.2.1