diff options
7 files changed, 141 insertions, 19 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 597c6c4e4ac..1aaec206c81 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -298,7 +298,8 @@ last-continuous: test_file: jstests/replsets/tenant_migration_concurrent_writes_on_donor_util.js - ticket: SERVER-69348 test_file: jstests/sharding/read_write_concern_defaults_application.js - + - ticket: SERVER-68932 + test_file: jstests/sharding/resharding_critical_section_metrics.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/sharding/resharding_critical_section_metrics.js b/jstests/sharding/resharding_critical_section_metrics.js new file mode 100644 index 00000000000..15192f49b37 --- /dev/null +++ b/jstests/sharding/resharding_critical_section_metrics.js @@ -0,0 +1,97 @@ +/** + * Tests that resharding reads and writes during critical section metrics are incremented. + * + * @tags: [ + * requires_fcv_61 + * ] + */ +(function() { +'use strict'; + +load("jstests/libs/fail_point_util.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); +load('jstests/libs/parallel_shell_helpers.js'); + +const reshardingTest = new ReshardingTest(); +reshardingTest.setup(); + +const donorName = reshardingTest.donorShardNames[0]; +const recipientName = reshardingTest.recipientShardNames[0]; +const donorShard = reshardingTest.getReplSetForShard(donorName).getPrimary(); +const sourceCollection = reshardingTest.createShardedCollection({ + ns: 'reshardingDb.coll', + shardKeyPattern: {oldKey: 1}, + chunks: [ + {min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorName}, + ] +}); +const mongos = sourceCollection.getMongo(); + +const kWritesDuringCriticalSection = 'countWritesDuringCriticalSection'; +const kReadsDuringCriticalSection = 'countReadsDuringCriticalSection'; + +function attemptFromParallelShell(fn) { + const join = startParallelShell(funWithArgs((fn) => { + db = db.getSiblingDB('reshardingDb'); + fn(db.coll); + }, fn), mongos.port); + return join; +} + +function attemptWriteFromParallelShell() { + return attemptFromParallelShell((coll) => { + assert.commandWorked(coll.insert({_id: 0, oldKey: 0, newKey: 0})); + }); +} + +function attemptReadFromParallelShell() { + return attemptFromParallelShell((coll) => { + coll.find({}).toArray(); + }); +} + +function getActiveSectionMetric(fieldName) { + const stats = donorShard.getDB('admin').serverStatus({}); + const active = stats.shardingStatistics.resharding.active; + return active[fieldName]; +} + +function assertIncrementsActiveSectionMetricSoon(fn, metricFieldName) { + const before = getActiveSectionMetric(metricFieldName); + fn(); + assert.soon(() => { + const after = getActiveSectionMetric(metricFieldName); + return after > before; + }); +} + +const hangWhileBlockingReads = + configureFailPoint(donorShard, "reshardingPauseDonorAfterBlockingReads"); + +let waitForWrite; +let waitForRead; + +reshardingTest.withReshardingInBackground({ + newShardKeyPattern: {newKey: 1}, + newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientName}], +}, + (tempNs) => {}, + { + postDecisionPersistedFn: () => { + hangWhileBlockingReads.wait(); + assertIncrementsActiveSectionMetricSoon(() => { + waitForWrite = + attemptWriteFromParallelShell(); + }, kWritesDuringCriticalSection); + assertIncrementsActiveSectionMetricSoon(() => { + waitForRead = attemptReadFromParallelShell(); + }, kReadsDuringCriticalSection); + hangWhileBlockingReads.off(); + } + }); + +waitForWrite(); +waitForRead(); + +reshardingTest.teardown(); +})(); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index f599e59b5ab..3ec02bf34fe 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -67,6 +67,7 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(reshardingPauseDonorBeforeCatalogCacheRefresh); +MONGO_FAIL_POINT_DEFINE(reshardingPauseDonorAfterBlockingReads); MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsAfterTransitionToDonatingOplogEntries); MONGO_FAIL_POINT_DEFINE(removeDonorDocFailpoint); @@ -803,6 +804,7 @@ void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenTrans _metadata.getSourceNss(), _critSecReason, ShardingCatalogClient::kLocalWriteConcern); + reshardingPauseDonorAfterBlockingReads.pauseWhileSet(opCtx.get()); } { diff --git a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp index 86cc76261a8..8291dd8a654 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp @@ -41,26 +41,44 @@ namespace mongo { namespace resharding_metrics { namespace { -void onCriticalSectionErrorThrows(OperationContext* opCtx, const StaleConfigInfo& info) { - const auto& operationType = info.getDuringOperationType(); - if (!operationType) { - return; - } - AutoGetCollection autoColl(opCtx, info.getNss(), MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, info.getNss()); + +boost::optional<UUID> tryGetReshardingUUID(OperationContext* opCtx, const NamespaceString& nss) { + // We intentionally use AutoGetDb and acquire the collection lock manually here instead of using + // AutoGetCollection. AutoGetCollection will call checkShardVersionOrThrow() to verify that the + // shard version on the opCtx is compatible with the shard version on the collection, however + // this verification will throw if the critical section is held. Since the critical section is + // always held on this code path by definition, this check must be bypassed. As a consequence, + // if the metadata is not known (because this is a secondary that stepped up during the critical + // section), the metrics will not be incremented. The resharding metrics already do not attempt + // to restore the number of reads/writes done on a previous primary during a critical section, + // so this is considered acceptable. + AutoGetDb autoDb(opCtx, nss.dbName(), MODE_IS); + Lock::CollectionLock collLock(opCtx, nss, MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, nss); auto metadata = csr->getCurrentMetadataIfKnown(); if (!metadata || !metadata->isSharded()) { - return; + return boost::none; } const auto& reshardingFields = metadata->getReshardingFields(); if (!reshardingFields) { + return boost::none; + } + return reshardingFields->getReshardingUUID(); +} + +void onCriticalSectionErrorThrows(OperationContext* opCtx, const StaleConfigInfo& info) { + const auto& operationType = info.getDuringOperationType(); + if (!operationType) { + return; + } + auto reshardingId = tryGetReshardingUUID(opCtx, info.getNss()); + if (!reshardingId) { return; } auto stateMachine = resharding::tryGetReshardingStateMachine<ReshardingDonorService, ReshardingDonorService::DonorStateMachine, - ReshardingDonorDocument>( - opCtx, reshardingFields->getReshardingUUID()); + ReshardingDonorDocument>(opCtx, *reshardingId); if (!stateMachine) { return; } diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp index e579ca18f87..35fa6066173 100644 --- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp +++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp @@ -32,6 +32,7 @@ #include "mongo/db/curop.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/resharding/resharding_metrics_helpers.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_statistics.h" @@ -72,8 +73,8 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi if (auto staleInfo = status->extraInfo<StaleConfigInfo>()) { ShardingStatistics::get(_opCtx).countStaleConfigErrors.addAndFetch(1); - bool stableLocalVersion = - !staleInfo->getCriticalSectionSignal() && staleInfo->getVersionWanted(); + bool inCriticalSection = staleInfo->getCriticalSectionSignal().has_value(); + bool stableLocalVersion = !inCriticalSection && staleInfo->getVersionWanted(); if (stableLocalVersion && ChunkVersion::isIgnoredVersion(staleInfo->getVersionReceived())) { // Shard is recovered, but the router didn't sent a shard version, therefore we just @@ -87,6 +88,10 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi return; } + if (inCriticalSection) { + resharding_metrics::onCriticalSectionError(_opCtx, *staleInfo); + } + auto handleMismatchStatus = onShardVersionMismatchNoExcept( _opCtx, staleInfo->getNss(), staleInfo->getVersionReceived()); if (!handleMismatchStatus.isOK()) diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index 9c65d80b4b3..0c64c2ee65c 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -315,7 +315,7 @@ void ShardingDataTransformInstanceMetrics::onWriteToStashedCollections() { void ShardingDataTransformInstanceMetrics::onReadDuringCriticalSection() { _readsDuringCriticalSection.fetchAndAdd(1); - _cumulativeMetrics->onWriteDuringCriticalSection(); + _cumulativeMetrics->onReadDuringCriticalSection(); } void ShardingDataTransformInstanceMetrics::onCloningTotalRemoteBatchRetrieval( diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 1033abb70e0..ac6955774f3 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1828,8 +1828,8 @@ Future<void> ExecCommandDatabase::_commandExec() { serverGlobalParams.clusterRole != ClusterRole::ConfigServer && !_refreshedCollection) { if (auto sce = s.extraInfo<StaleConfigInfo>()) { - bool stableLocalVersion = - !sce->getCriticalSectionSignal() && sce->getVersionWanted(); + bool inCriticalSection = sce->getCriticalSectionSignal().has_value(); + bool stableLocalVersion = !inCriticalSection && sce->getVersionWanted(); if (stableLocalVersion && ChunkVersion::isIgnoredVersion(sce->getVersionReceived())) { @@ -1845,7 +1845,7 @@ Future<void> ExecCommandDatabase::_commandExec() { return s; } - if (sce->getCriticalSectionSignal()) { + if (inCriticalSection) { _execContext->behaviors->handleReshardingCriticalSectionMetrics(opCtx, *sce); } @@ -1853,8 +1853,7 @@ Future<void> ExecCommandDatabase::_commandExec() { const auto refreshed = _execContext->behaviors->refreshCollection(opCtx, *sce); if (refreshed) { _refreshedCollection = true; - if (!opCtx->isContinuingMultiDocumentTransaction() && - !sce->getCriticalSectionSignal()) { + if (!opCtx->isContinuingMultiDocumentTransaction() && !inCriticalSection) { _resetLockerStateAfterShardingUpdate(opCtx); return _commandExec(); } |